Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[serve] Add log_to_stderr option to logger and improve internal logging #33597

Merged
merged 3 commits into from
Mar 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions python/ray/serve/_private/application_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,21 +161,25 @@ def update(self):
return
try:
ray.get(finished[0])
logger.info("Deploy task for app {self.name} ran successfully.")
except RayTaskError as e:
self.status = ApplicationStatus.DEPLOY_FAILED
# NOTE(zcin): we should use str(e) instead of traceback.format_exc()
# here because the full details of the error is not displayed
# properly with traceback.format_exc(). RayTaskError has its own
# custom __str__ function.
self.app_msg = f"Deployment failed:\n{str(e)}"
self.app_msg = f"Deploying app '{self.name}' failed:\n{str(e)}"
self.deploy_obj_ref = None
logger.warning(self.app_msg)
return
except RuntimeEnvSetupError:
self.status = ApplicationStatus.DEPLOY_FAILED
self.app_msg = (
f"Runtime env setup failed:\n{traceback.format_exc()}"
f"Runtime env setup for app '{self.name}' "
f"failed:\n{traceback.format_exc()}"
)
self.deploy_obj_ref = None
logger.warning(self.app_msg)
return
deployments_statuses = (
self.deployment_state_manager.get_deployment_statuses(
Expand Down
45 changes: 27 additions & 18 deletions python/ray/serve/_private/deployment_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,9 +309,10 @@ def start(self, deployment_info: DeploymentInfo, version: DeploymentVersion):
deployment_info.deployment_config.is_cross_language
)

logger.debug(
logger.info(
f"Starting replica {self.replica_tag} for deployment "
f"{self.deployment_name}."
f"{self.deployment_name}.",
extra={"log_to_stderr": False},
)

actor_def = deployment_info.actor_def
Expand Down Expand Up @@ -429,7 +430,7 @@ def recover(self):
Recover states in DeploymentReplica instance by fetching running actor
status
"""
logger.debug(
logger.info(
f"Recovering replica {self.replica_tag} for deployment "
f"{self.deployment_name}."
)
Expand Down Expand Up @@ -567,11 +568,13 @@ def _check_active_health_check(self) -> ReplicaHealthCheckResponse:
response = ReplicaHealthCheckResponse.ACTOR_CRASHED
except RayError as e:
# Health check failed due to application-level exception.
logger.info(f"Health check for replica {self._replica_tag} failed: {e}")
logger.warning(
f"Health check for replica {self._replica_tag} failed: {e}"
)
response = ReplicaHealthCheckResponse.APP_FAILURE
elif time.time() - self._last_health_check_time > self._health_check_timeout_s:
# Health check hasn't returned and the timeout is up, consider it failed.
logger.info(
logger.warning(
"Didn't receive health check response for replica "
f"{self._replica_tag} after "
f"{self._health_check_timeout_s}s, marking it unhealthy."
Expand Down Expand Up @@ -636,15 +639,15 @@ def check_health(self) -> bool:
self._consecutive_health_check_failures
>= REPLICA_HEALTH_CHECK_UNHEALTHY_THRESHOLD
):
logger.info(
logger.warning(
f"Replica {self._replica_tag} failed the health "
f"check {self._consecutive_health_check_failures}"
"times in a row, marking it unhealthy."
)
self._healthy = False
elif response is ReplicaHealthCheckResponse.ACTOR_CRASHED:
# Actor crashed, mark the replica unhealthy immediately.
logger.info(
logger.warning(
f"Actor for replica {self._replica_tag} crashed, marking "
"it unhealthy immediately."
)
Expand Down Expand Up @@ -800,6 +803,11 @@ def stop(self, graceful: bool = True) -> None:

Should handle the case where the replica is already stopped.
"""
logger.info(
f"Stopping replica {self.replica_tag} for deployment "
f"{self.deployment_name}.",
extra={"log_to_stderr": False},
)
timeout_s = self._actor.graceful_stop()
if not graceful:
timeout_s = 0
Expand All @@ -814,7 +822,7 @@ def check_stopped(self) -> bool:
if timeout_passed:
# Graceful period passed, kill it forcefully.
# This will be called repeatedly until the replica shuts down.
logger.debug(
logger.info(
f"Replica {self.replica_tag} did not shut down after grace "
"period, force-killing it. "
)
Expand Down Expand Up @@ -1058,7 +1066,7 @@ def recover_target_state_from_checkpoint(
self, target_state_checkpoint: DeploymentTargetState
):
logger.info(
"Recovering target state for deployment " f"{self._name} from checkpoint.."
f"Recovering target state for deployment {self._name} from checkpoint."
)
self._target_state = target_state_checkpoint

Expand All @@ -1070,10 +1078,9 @@ def recover_current_state_from_replica_actor_names(
"recovering current state from replica actor names."
)

logger.debug(
logger.info(
"Recovering current state for deployment "
f"{self._name} from {len(replica_actor_names)} actors in "
"current ray cluster.."
f"{self._name} from {len(replica_actor_names)} total actors."
)
# All current states use default value, only attach running replicas.
for replica_actor_name in replica_actor_names:
Expand Down Expand Up @@ -1138,7 +1145,7 @@ def _set_target_state_deleting(self) -> None:
self._curr_status_info = DeploymentStatusInfo(
self._name, DeploymentStatus.UPDATING
)
logger.debug(f"Deleting {self._name}.")
logger.info(f"Deleting deployment {self._name}.")

def _set_target_state(self, target_info: DeploymentInfo) -> None:
"""Set the target state for the deployment to the provided info."""
Expand All @@ -1155,7 +1162,7 @@ def _set_target_state(self, target_info: DeploymentInfo) -> None:
self._replica_constructor_retry_counter = 0
self._backoff_time_s = 1

logger.debug(f"Deploying new version of {self._name}: {target_state.version}.")
logger.info(f"Deploying new version of deployment {self._name}.")

def deploy(self, deployment_info: DeploymentInfo) -> bool:
"""Deploy the deployment.
Expand Down Expand Up @@ -1375,7 +1382,7 @@ def _scale_deployment_replicas(self) -> bool:
self._last_retry = time.time()
logger.info(
f"Adding {to_add} replica{'s' if to_add > 1 else ''} "
f"to deployment '{self._name}'."
f"to deployment {self._name}."
)
for _ in range(to_add):
replica_name = ReplicaName(self._name, get_random_letters())
Expand Down Expand Up @@ -1526,6 +1533,10 @@ def _check_startup_replicas(
# set.
self._replicas.add(ReplicaState.RUNNING, replica)
transitioned_to_running = True
logger.info(
f"Replica {replica.replica_tag} started successfully.",
extra={"log_to_stderr": False},
)
elif start_status == ReplicaStartupStatus.FAILED:
# Replica reconfigure (deploy / upgrade) failed
if self._replica_constructor_retry_counter >= 0:
Expand Down Expand Up @@ -1761,9 +1772,7 @@ def _get_all_node_ids(self):
return get_all_node_ids(self._gcs_client)

def _deploy_driver(self) -> bool:
"""
Deploy the driver deployment to each node
"""
"""Deploy the driver deployment to each node."""
all_nodes = self._get_all_node_ids()
self.target_info.deployment_config.num_replicas = len(all_nodes)
deployed_nodes = set()
Expand Down
17 changes: 13 additions & 4 deletions python/ray/serve/_private/http_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,8 @@ async def _send_request_to_handle(handle, scope, receive, send) -> str:
)
logger.warning(
f"Client from {scope['client']} disconnected, cancelling the "
"request."
"request.",
extra={"log_to_stderr": False},
)
# This will make the .result() to raise cancelled error.
assignment_task.cancel()
Expand Down Expand Up @@ -137,7 +138,7 @@ async def _send_request_to_handle(handle, scope, receive, send) -> str:
await Response(error_message, status_code=500).send(scope, receive, send)
return "500"
except RayActorError:
logger.debug(
logger.info(
"Request failed due to replica failure. There are "
f"{HTTP_REQUEST_MAX_RETRIES - retries} retries "
"remaining."
Expand Down Expand Up @@ -181,7 +182,9 @@ def endpoint_exists(self, endpoint: EndpointTag) -> bool:
return endpoint in self.handles

def update_routes(self, endpoints: Dict[EndpointTag, EndpointInfo]) -> None:
logger.debug(f"Got updated endpoints: {endpoints}.")
logger.info(
f"Got updated endpoints: {endpoints}.", extra={"log_to_stderr": False}
)

existing_handles = set(self.handles.keys())
routes = []
Expand All @@ -195,6 +198,11 @@ def update_routes(self, endpoints: Dict[EndpointTag, EndpointInfo]) -> None:
self.handles[endpoint] = self._get_handle(endpoint)

# Clean up any handles that are no longer used.
if len(existing_handles) > 0:
logger.info(
f"Deleting {len(existing_handles)} unused handles.",
extra={"log_to_stderr": False},
)
for endpoint in existing_handles:
del self.handles[endpoint]

Expand Down Expand Up @@ -401,7 +409,8 @@ async def __call__(self, scope, receive, send):
method=scope["method"],
status=str(status_code),
latency_ms=latency_ms,
)
),
extra={"log_to_stderr": False},
)
if status_code != "200":
self.request_error_counter.inc(
Expand Down
3 changes: 2 additions & 1 deletion python/ray/serve/_private/http_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,8 @@ def _start_proxies_if_needed(self) -> None:
"Starting HTTP proxy with name '{}' on node '{}' "
"listening on '{}:{}'".format(
name, node_id, self._config.host, self._config.port
)
),
extra={"log_to_stderr": False},
)
proxy = HTTPProxyActor.options(
num_cpus=self._config.num_cpus,
Expand Down
61 changes: 33 additions & 28 deletions python/ray/serve/_private/logging_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,20 @@ def access_log_msg(*, method: str, status: str, latency_ms: float):
return f"{method.upper()} {status.upper()} {latency_ms:.1f}ms"


def log_to_stderr_filter(record: logging.LogRecord) -> bool:
"""Filters log records based on a parameter in the `extra` dictionary."""
if not hasattr(record, "log_to_stderr") or record.log_to_stderr is None:
return True

return record.log_to_stderr


def configure_component_logger(
*,
component_name: str,
component_id: str,
component_type: Optional[str] = None,
log_level: int = logging.INFO,
log_to_stream: bool = True,
log_to_file: bool = True,
max_bytes: Optional[int] = None,
backup_count: Optional[int] = None,
):
Expand Down Expand Up @@ -84,32 +90,31 @@ def record_factory(*args, **kwargs):

logging.setLogRecordFactory(record_factory)

if log_to_stream:
stream_handler = logging.StreamHandler()
stream_handler.setFormatter(ServeFormatter(component_name, component_id))
logger.addHandler(stream_handler)

if log_to_file:
logs_dir = os.path.join(
ray._private.worker._global_node.get_logs_dir_path(), "serve"
)
os.makedirs(logs_dir, exist_ok=True)
if max_bytes is None:
max_bytes = ray._private.worker._global_node.max_bytes
if backup_count is None:
backup_count = ray._private.worker._global_node.backup_count
if component_type is not None:
component_name = f"{component_type}_{component_name}"
log_file_name = LOG_FILE_FMT.format(
component_name=component_name, component_id=component_id
)
file_handler = logging.handlers.RotatingFileHandler(
os.path.join(logs_dir, log_file_name),
maxBytes=max_bytes,
backupCount=backup_count,
)
file_handler.setFormatter(ServeFormatter(component_name, component_id))
logger.addHandler(file_handler)
stream_handler = logging.StreamHandler()
stream_handler.setFormatter(ServeFormatter(component_name, component_id))
stream_handler.addFilter(log_to_stderr_filter)
logger.addHandler(stream_handler)

logs_dir = os.path.join(
ray._private.worker._global_node.get_logs_dir_path(), "serve"
)
os.makedirs(logs_dir, exist_ok=True)
if max_bytes is None:
max_bytes = ray._private.worker._global_node.max_bytes
if backup_count is None:
backup_count = ray._private.worker._global_node.backup_count
if component_type is not None:
component_name = f"{component_type}_{component_name}"
log_file_name = LOG_FILE_FMT.format(
component_name=component_name, component_id=component_id
)
file_handler = logging.handlers.RotatingFileHandler(
os.path.join(logs_dir, log_file_name),
maxBytes=max_bytes,
backupCount=backup_count,
)
file_handler.setFormatter(ServeFormatter(component_name, component_id))
logger.addHandler(file_handler)


class LoggingContext:
Expand Down
22 changes: 3 additions & 19 deletions python/ray/serve/_private/replica.py
Original file line number Diff line number Diff line change
Expand Up @@ -363,21 +363,6 @@ def user_health_check():
metrics_process_func=process_remote_func,
)

# NOTE(edoakes): we used to recommend that users use the "ray" logger
# and tagged the logs with metadata as below. We now recommend using
# the "ray.serve" 'component logger' (as of Ray 1.13). This is left to
# maintain backwards compatibility with users who were using the
# existing logger. We can consider removing it in Ray 2.0.
ray_logger = logging.getLogger("ray")
for handler in ray_logger.handlers:
handler.setFormatter(
logging.Formatter(
handler.formatter._fmt
+ f" component=serve deployment={self.deployment_name} "
f"replica={self.replica_tag}"
)
)

async def check_health(self):
await self.user_health_check()

Expand Down Expand Up @@ -444,10 +429,9 @@ async def invoke_single(self, request_item: Query) -> Tuple[Any, bool]:
Returns the user-provided output and a boolean indicating if the
request succeeded (user code didn't raise an exception).
"""
logger.debug(
"Replica {} started executing request {}".format(
self.replica_tag, request_item.metadata.request_id
)
logger.info(
f"Started executing request {request_item.metadata.request_id}",
extra={"log_to_stderr": False},
)

args, kwargs = parse_request_item(request_item)
Expand Down
Loading