diff --git a/python/ray/serve/_private/application_state.py b/python/ray/serve/_private/application_state.py index 253aceefc0b67..50fdb29839b0e 100644 --- a/python/ray/serve/_private/application_state.py +++ b/python/ray/serve/_private/application_state.py @@ -161,21 +161,25 @@ def update(self): return try: ray.get(finished[0]) + logger.info("Deploy task for app {self.name} ran successfully.") except RayTaskError as e: self.status = ApplicationStatus.DEPLOY_FAILED # NOTE(zcin): we should use str(e) instead of traceback.format_exc() # here because the full details of the error is not displayed # properly with traceback.format_exc(). RayTaskError has its own # custom __str__ function. - self.app_msg = f"Deployment failed:\n{str(e)}" + self.app_msg = f"Deploying app '{self.name}' failed:\n{str(e)}" self.deploy_obj_ref = None + logger.warning(self.app_msg) return except RuntimeEnvSetupError: self.status = ApplicationStatus.DEPLOY_FAILED self.app_msg = ( - f"Runtime env setup failed:\n{traceback.format_exc()}" + f"Runtime env setup for app '{self.name}' " + f"failed:\n{traceback.format_exc()}" ) self.deploy_obj_ref = None + logger.warning(self.app_msg) return deployments_statuses = ( self.deployment_state_manager.get_deployment_statuses( diff --git a/python/ray/serve/_private/deployment_state.py b/python/ray/serve/_private/deployment_state.py index 8ed2d2a79c895..5cabcdec5d40d 100644 --- a/python/ray/serve/_private/deployment_state.py +++ b/python/ray/serve/_private/deployment_state.py @@ -309,9 +309,10 @@ def start(self, deployment_info: DeploymentInfo, version: DeploymentVersion): deployment_info.deployment_config.is_cross_language ) - logger.debug( + logger.info( f"Starting replica {self.replica_tag} for deployment " - f"{self.deployment_name}." + f"{self.deployment_name}.", + extra={"log_to_stderr": False}, ) actor_def = deployment_info.actor_def @@ -429,7 +430,7 @@ def recover(self): Recover states in DeploymentReplica instance by fetching running actor status """ - logger.debug( + logger.info( f"Recovering replica {self.replica_tag} for deployment " f"{self.deployment_name}." ) @@ -567,11 +568,13 @@ def _check_active_health_check(self) -> ReplicaHealthCheckResponse: response = ReplicaHealthCheckResponse.ACTOR_CRASHED except RayError as e: # Health check failed due to application-level exception. - logger.info(f"Health check for replica {self._replica_tag} failed: {e}") + logger.warning( + f"Health check for replica {self._replica_tag} failed: {e}" + ) response = ReplicaHealthCheckResponse.APP_FAILURE elif time.time() - self._last_health_check_time > self._health_check_timeout_s: # Health check hasn't returned and the timeout is up, consider it failed. - logger.info( + logger.warning( "Didn't receive health check response for replica " f"{self._replica_tag} after " f"{self._health_check_timeout_s}s, marking it unhealthy." @@ -636,7 +639,7 @@ def check_health(self) -> bool: self._consecutive_health_check_failures >= REPLICA_HEALTH_CHECK_UNHEALTHY_THRESHOLD ): - logger.info( + logger.warning( f"Replica {self._replica_tag} failed the health " f"check {self._consecutive_health_check_failures}" "times in a row, marking it unhealthy." @@ -644,7 +647,7 @@ def check_health(self) -> bool: self._healthy = False elif response is ReplicaHealthCheckResponse.ACTOR_CRASHED: # Actor crashed, mark the replica unhealthy immediately. - logger.info( + logger.warning( f"Actor for replica {self._replica_tag} crashed, marking " "it unhealthy immediately." ) @@ -800,6 +803,11 @@ def stop(self, graceful: bool = True) -> None: Should handle the case where the replica is already stopped. """ + logger.info( + f"Stopping replica {self.replica_tag} for deployment " + f"{self.deployment_name}.", + extra={"log_to_stderr": False}, + ) timeout_s = self._actor.graceful_stop() if not graceful: timeout_s = 0 @@ -814,7 +822,7 @@ def check_stopped(self) -> bool: if timeout_passed: # Graceful period passed, kill it forcefully. # This will be called repeatedly until the replica shuts down. - logger.debug( + logger.info( f"Replica {self.replica_tag} did not shut down after grace " "period, force-killing it. " ) @@ -1058,7 +1066,7 @@ def recover_target_state_from_checkpoint( self, target_state_checkpoint: DeploymentTargetState ): logger.info( - "Recovering target state for deployment " f"{self._name} from checkpoint.." + f"Recovering target state for deployment {self._name} from checkpoint." ) self._target_state = target_state_checkpoint @@ -1070,10 +1078,9 @@ def recover_current_state_from_replica_actor_names( "recovering current state from replica actor names." ) - logger.debug( + logger.info( "Recovering current state for deployment " - f"{self._name} from {len(replica_actor_names)} actors in " - "current ray cluster.." + f"{self._name} from {len(replica_actor_names)} total actors." ) # All current states use default value, only attach running replicas. for replica_actor_name in replica_actor_names: @@ -1138,7 +1145,7 @@ def _set_target_state_deleting(self) -> None: self._curr_status_info = DeploymentStatusInfo( self._name, DeploymentStatus.UPDATING ) - logger.debug(f"Deleting {self._name}.") + logger.info(f"Deleting deployment {self._name}.") def _set_target_state(self, target_info: DeploymentInfo) -> None: """Set the target state for the deployment to the provided info.""" @@ -1155,7 +1162,7 @@ def _set_target_state(self, target_info: DeploymentInfo) -> None: self._replica_constructor_retry_counter = 0 self._backoff_time_s = 1 - logger.debug(f"Deploying new version of {self._name}: {target_state.version}.") + logger.info(f"Deploying new version of deployment {self._name}.") def deploy(self, deployment_info: DeploymentInfo) -> bool: """Deploy the deployment. @@ -1375,7 +1382,7 @@ def _scale_deployment_replicas(self) -> bool: self._last_retry = time.time() logger.info( f"Adding {to_add} replica{'s' if to_add > 1 else ''} " - f"to deployment '{self._name}'." + f"to deployment {self._name}." ) for _ in range(to_add): replica_name = ReplicaName(self._name, get_random_letters()) @@ -1526,6 +1533,10 @@ def _check_startup_replicas( # set. self._replicas.add(ReplicaState.RUNNING, replica) transitioned_to_running = True + logger.info( + f"Replica {replica.replica_tag} started successfully.", + extra={"log_to_stderr": False}, + ) elif start_status == ReplicaStartupStatus.FAILED: # Replica reconfigure (deploy / upgrade) failed if self._replica_constructor_retry_counter >= 0: @@ -1761,9 +1772,7 @@ def _get_all_node_ids(self): return get_all_node_ids(self._gcs_client) def _deploy_driver(self) -> bool: - """ - Deploy the driver deployment to each node - """ + """Deploy the driver deployment to each node.""" all_nodes = self._get_all_node_ids() self.target_info.deployment_config.num_replicas = len(all_nodes) deployed_nodes = set() diff --git a/python/ray/serve/_private/http_proxy.py b/python/ray/serve/_private/http_proxy.py index 39af41a9badad..a4283006bd981 100644 --- a/python/ray/serve/_private/http_proxy.py +++ b/python/ray/serve/_private/http_proxy.py @@ -98,7 +98,8 @@ async def _send_request_to_handle(handle, scope, receive, send) -> str: ) logger.warning( f"Client from {scope['client']} disconnected, cancelling the " - "request." + "request.", + extra={"log_to_stderr": False}, ) # This will make the .result() to raise cancelled error. assignment_task.cancel() @@ -137,7 +138,7 @@ async def _send_request_to_handle(handle, scope, receive, send) -> str: await Response(error_message, status_code=500).send(scope, receive, send) return "500" except RayActorError: - logger.debug( + logger.info( "Request failed due to replica failure. There are " f"{HTTP_REQUEST_MAX_RETRIES - retries} retries " "remaining." @@ -181,7 +182,9 @@ def endpoint_exists(self, endpoint: EndpointTag) -> bool: return endpoint in self.handles def update_routes(self, endpoints: Dict[EndpointTag, EndpointInfo]) -> None: - logger.debug(f"Got updated endpoints: {endpoints}.") + logger.info( + f"Got updated endpoints: {endpoints}.", extra={"log_to_stderr": False} + ) existing_handles = set(self.handles.keys()) routes = [] @@ -195,6 +198,11 @@ def update_routes(self, endpoints: Dict[EndpointTag, EndpointInfo]) -> None: self.handles[endpoint] = self._get_handle(endpoint) # Clean up any handles that are no longer used. + if len(existing_handles) > 0: + logger.info( + f"Deleting {len(existing_handles)} unused handles.", + extra={"log_to_stderr": False}, + ) for endpoint in existing_handles: del self.handles[endpoint] @@ -401,7 +409,8 @@ async def __call__(self, scope, receive, send): method=scope["method"], status=str(status_code), latency_ms=latency_ms, - ) + ), + extra={"log_to_stderr": False}, ) if status_code != "200": self.request_error_counter.inc( diff --git a/python/ray/serve/_private/http_state.py b/python/ray/serve/_private/http_state.py index e293162e5c0f6..676aa183b66db 100644 --- a/python/ray/serve/_private/http_state.py +++ b/python/ray/serve/_private/http_state.py @@ -130,7 +130,8 @@ def _start_proxies_if_needed(self) -> None: "Starting HTTP proxy with name '{}' on node '{}' " "listening on '{}:{}'".format( name, node_id, self._config.host, self._config.port - ) + ), + extra={"log_to_stderr": False}, ) proxy = HTTPProxyActor.options( num_cpus=self._config.num_cpus, diff --git a/python/ray/serve/_private/logging_utils.py b/python/ray/serve/_private/logging_utils.py index 7e231900bee2b..97e9e6222873a 100644 --- a/python/ray/serve/_private/logging_utils.py +++ b/python/ray/serve/_private/logging_utils.py @@ -47,14 +47,20 @@ def access_log_msg(*, method: str, status: str, latency_ms: float): return f"{method.upper()} {status.upper()} {latency_ms:.1f}ms" +def log_to_stderr_filter(record: logging.LogRecord) -> bool: + """Filters log records based on a parameter in the `extra` dictionary.""" + if not hasattr(record, "log_to_stderr") or record.log_to_stderr is None: + return True + + return record.log_to_stderr + + def configure_component_logger( *, component_name: str, component_id: str, component_type: Optional[str] = None, log_level: int = logging.INFO, - log_to_stream: bool = True, - log_to_file: bool = True, max_bytes: Optional[int] = None, backup_count: Optional[int] = None, ): @@ -84,32 +90,31 @@ def record_factory(*args, **kwargs): logging.setLogRecordFactory(record_factory) - if log_to_stream: - stream_handler = logging.StreamHandler() - stream_handler.setFormatter(ServeFormatter(component_name, component_id)) - logger.addHandler(stream_handler) - - if log_to_file: - logs_dir = os.path.join( - ray._private.worker._global_node.get_logs_dir_path(), "serve" - ) - os.makedirs(logs_dir, exist_ok=True) - if max_bytes is None: - max_bytes = ray._private.worker._global_node.max_bytes - if backup_count is None: - backup_count = ray._private.worker._global_node.backup_count - if component_type is not None: - component_name = f"{component_type}_{component_name}" - log_file_name = LOG_FILE_FMT.format( - component_name=component_name, component_id=component_id - ) - file_handler = logging.handlers.RotatingFileHandler( - os.path.join(logs_dir, log_file_name), - maxBytes=max_bytes, - backupCount=backup_count, - ) - file_handler.setFormatter(ServeFormatter(component_name, component_id)) - logger.addHandler(file_handler) + stream_handler = logging.StreamHandler() + stream_handler.setFormatter(ServeFormatter(component_name, component_id)) + stream_handler.addFilter(log_to_stderr_filter) + logger.addHandler(stream_handler) + + logs_dir = os.path.join( + ray._private.worker._global_node.get_logs_dir_path(), "serve" + ) + os.makedirs(logs_dir, exist_ok=True) + if max_bytes is None: + max_bytes = ray._private.worker._global_node.max_bytes + if backup_count is None: + backup_count = ray._private.worker._global_node.backup_count + if component_type is not None: + component_name = f"{component_type}_{component_name}" + log_file_name = LOG_FILE_FMT.format( + component_name=component_name, component_id=component_id + ) + file_handler = logging.handlers.RotatingFileHandler( + os.path.join(logs_dir, log_file_name), + maxBytes=max_bytes, + backupCount=backup_count, + ) + file_handler.setFormatter(ServeFormatter(component_name, component_id)) + logger.addHandler(file_handler) class LoggingContext: diff --git a/python/ray/serve/_private/replica.py b/python/ray/serve/_private/replica.py index 9a190d6ab94cb..18b905e1d8b9a 100644 --- a/python/ray/serve/_private/replica.py +++ b/python/ray/serve/_private/replica.py @@ -363,21 +363,6 @@ def user_health_check(): metrics_process_func=process_remote_func, ) - # NOTE(edoakes): we used to recommend that users use the "ray" logger - # and tagged the logs with metadata as below. We now recommend using - # the "ray.serve" 'component logger' (as of Ray 1.13). This is left to - # maintain backwards compatibility with users who were using the - # existing logger. We can consider removing it in Ray 2.0. - ray_logger = logging.getLogger("ray") - for handler in ray_logger.handlers: - handler.setFormatter( - logging.Formatter( - handler.formatter._fmt - + f" component=serve deployment={self.deployment_name} " - f"replica={self.replica_tag}" - ) - ) - async def check_health(self): await self.user_health_check() @@ -444,10 +429,9 @@ async def invoke_single(self, request_item: Query) -> Tuple[Any, bool]: Returns the user-provided output and a boolean indicating if the request succeeded (user code didn't raise an exception). """ - logger.debug( - "Replica {} started executing request {}".format( - self.replica_tag, request_item.metadata.request_id - ) + logger.info( + f"Started executing request {request_item.metadata.request_id}", + extra={"log_to_stderr": False}, ) args, kwargs = parse_request_item(request_item) diff --git a/python/ray/serve/controller.py b/python/ray/serve/controller.py index 8fa571362e2fd..325eb3ce121b3 100644 --- a/python/ray/serve/controller.py +++ b/python/ray/serve/controller.py @@ -581,7 +581,11 @@ def deploy_apps( updated_versions, ) - deploy_obj_ref = run_graph.options( + logger.info( + "Starting deploy_serve_application " + f"task for application {app_config.name}." + ) + deploy_obj_ref = deploy_serve_application.options( runtime_env=app_config.runtime_env ).remote( app_config.import_path, @@ -907,20 +911,19 @@ def exclude_lightweight_update_options(dict): @ray.remote(num_cpus=0, max_calls=1) -def run_graph( +def deploy_serve_application( import_path: str, - graph_env: Dict, + runtime_env: Dict, deployment_override_options: List[Dict], deployment_versions: Dict, name: str = SERVE_DEFAULT_APP_NAME, route_prefix: str = "/", ): - """ - Build application object from user config + """Deploy Serve application from a user-provided config. Args: - import_path: Serve deployment graph's import path - graph_env: runtime env to run the deployment graph in + import_path: import path to top-level bound deployment. + runtime_env: runtime_env for the application. deployment_override_options: Dictionary of options that overrides deployment options set in the graph's code itself. deployment_versions: Versions of each deployment, each of which is @@ -934,27 +937,26 @@ def run_graph( from ray import serve from ray.serve.api import build - # Import and build the graph - graph = import_attr(import_path) - app = build(graph, name) + # Import and build the application. + app = build(import_attr(import_path), name) - # Override options for each deployment + # Override options for each deployment. for options in deployment_override_options: deployment_name = options["name"] - # Merge graph-level and deployment-level runtime_envs + # Merge app-level and deployment-level runtime_envs. if "ray_actor_options" in options: # If specified, get ray_actor_options from config ray_actor_options = options["ray_actor_options"] or {} else: - # Otherwise, get options from graph code (and default to {} if code - # sets options to None) + # Otherwise, get options from application code (and default to {} + # if the code sets options to None). ray_actor_options = ( app.deployments[deployment_name].ray_actor_options or {} ) deployment_env = ray_actor_options.get("runtime_env", {}) merged_env = override_runtime_envs_except_env_vars( - graph_env, deployment_env + runtime_env, deployment_env ) ray_actor_options.update({"runtime_env": merged_env}) options["ray_actor_options"] = ray_actor_options @@ -962,7 +964,7 @@ def run_graph( # Update the deployment's options app.deployments[deployment_name].set_options(**options, _internal=True) - # Run the graph locally on the cluster + # Run the application locally on the cluster. serve.run(app, name=name, route_prefix=route_prefix) except KeyboardInterrupt: # Error is raised when this task is canceled with ray.cancel(), which diff --git a/python/ray/serve/tests/test_logging.py b/python/ray/serve/tests/test_logging.py index 825085c650b2b..561dceaa28b25 100644 --- a/python/ray/serve/tests/test_logging.py +++ b/python/ray/serve/tests/test_logging.py @@ -96,64 +96,42 @@ def check_log(replica_tag: str, method_name: str, fail: bool = False): ) -@pytest.mark.skip("TODO(edoakes): temporarily unblocking merge.") -def test_http_access_log(serve_instance): - prefix = "/test" - - @serve.deployment(route_prefix=prefix) - def fn(req): - if "throw" in req.query_params: - raise RuntimeError("blah blah blah") - - return "hi" - - serve.run(fn.bind()) - - f = io.StringIO() - with redirect_stderr(f): - - def check_log(fail: bool = False): - s = f.getvalue() - return all( - [ - "http_proxy" in s, - "127.0.0.1" in s, - prefix in s, - ("500" if fail else "200") in s, - "ms" in s, - ] - ) - - requests.get(f"http://127.0.0.1:8000{prefix}").raise_for_status() - wait_for_condition(check_log, timeout=30) - - with pytest.raises(requests.exceptions.RequestException): - requests.get(f"http://127.0.0.1:8000{prefix}?throw=True").raise_for_status() - - wait_for_condition(check_log, timeout=30, fail=True) - - def test_user_logs(serve_instance): logger = logging.getLogger("ray.serve") - msg = "user log message" + stderr_msg = "user log message" + log_file_msg = "in file only" name = "user_fn" @serve.deployment(name=name) def fn(*args): - logger.info("user log message") - return serve.get_replica_context().replica_tag + logger.info(stderr_msg) + logger.info(log_file_msg, extra={"log_to_stderr": False}) + return serve.get_replica_context().replica_tag, logger.handlers[1].baseFilename handle = serve.run(fn.bind()) f = io.StringIO() with redirect_stderr(f): + replica_tag, log_file_name = ray.get(handle.remote()) - def check_log(replica_tag: str): + def check_stderr_log(replica_tag: str): s = f.getvalue() - return all([name in s, replica_tag in s, msg in s]) + return all( + [name in s, replica_tag in s, stderr_msg in s, log_file_msg not in s] + ) - replica_tag = ray.get(handle.remote()) - wait_for_condition(check_log, replica_tag=replica_tag) + # Only the stderr_msg should be logged to stderr. + wait_for_condition(check_stderr_log, replica_tag=replica_tag) + + def check_log_file(replica_tag: str): + with open(log_file_name, "r") as f: + s = f.read() + return all( + [name in s, replica_tag in s, stderr_msg in s, log_file_msg in s] + ) + + # Both messages should be logged to the file. + wait_for_condition(check_log_file, replica_tag=replica_tag) def test_disable_access_log(serve_instance): @@ -178,32 +156,6 @@ def __call__(self, *args): assert replica_tag not in f.getvalue() -def test_deprecated_deployment_logger(serve_instance): - # NOTE(edoakes): using this logger is no longer recommended as of Ray 1.13. - # The test is maintained for backwards compatibility. - logger = logging.getLogger("ray") - - @serve.deployment(name="counter") - class Counter: - def __init__(self): - self.count = 0 - - def __call__(self, request): - self.count += 1 - logger.info(f"count: {self.count}") - - serve.run(Counter.bind()) - f = io.StringIO() - with redirect_stderr(f): - requests.get("http://127.0.0.1:8000/counter/") - - def counter_log_success(): - s = f.getvalue() - return "deployment" in s and "replica" in s and "count" in s - - wait_for_condition(counter_log_success) - - def test_context_information_in_logging(serve_instance): """Make sure all context information exist in the log message""" @@ -238,9 +190,7 @@ def __call__(self, req: starlette.requests.Request): # Check the component log expected_log_infos = [ - f"{resp['request_id']} {resp['route']} http_proxy.py", f"{resp['request_id']} {resp['route']} replica.py", - f"{resp2['request_id']} {resp2['route']} http_proxy.py", f"{resp2['request_id']} {resp2['route']} replica.py", ]