diff --git a/reflex/reflex.py b/reflex/reflex.py index dca9a16506..76f33ff7b1 100644 --- a/reflex/reflex.py +++ b/reflex/reflex.py @@ -669,10 +669,14 @@ def deploy( console.print("Waiting for server to report progress ...") # Display the key events such as build, deploy, etc - server_report_deploy_success = asyncio.get_event_loop().run_until_complete( - hosting.display_deploy_milestones(key, from_iso_timestamp=deploy_requested_at) + server_report_deploy_success = hosting.poll_deploy_milestones( + key, from_iso_timestamp=deploy_requested_at ) - if not server_report_deploy_success: + + if server_report_deploy_success is None: + console.warn("Hosting server timed out.") + console.warn("The deployment may still be in progress. Proceeding ...") + elif not server_report_deploy_success: console.error("Hosting server reports failure.") console.error( f"Check the server logs using `reflex deployments build-logs {key}`" @@ -797,7 +801,7 @@ def get_deployment_status( status = hosting.get_deployment_status(key) # TODO: refactor all these tabulate calls - status.backend.updated_at = hosting.convert_to_local_time( + status.backend.updated_at = hosting.convert_to_local_time_str( status.backend.updated_at or "N/A" ) backend_status = status.backend.dict(exclude_none=True) @@ -806,7 +810,7 @@ def get_deployment_status( console.print(tabulate([table], headers=headers)) # Add a new line in console console.print("\n") - status.frontend.updated_at = hosting.convert_to_local_time( + status.frontend.updated_at = hosting.convert_to_local_time_str( status.frontend.updated_at or "N/A" ) frontend_status = status.frontend.dict(exclude_none=True) diff --git a/reflex/utils/hosting.py b/reflex/utils/hosting.py index 1e05ea4979..0bd506cff0 100644 --- a/reflex/utils/hosting.py +++ b/reflex/utils/hosting.py @@ -9,7 +9,7 @@ import time import uuid import webbrowser -from datetime import datetime +from datetime import datetime, timedelta from http import HTTPStatus from typing import List, Optional @@ -41,12 +41,14 @@ GET_REGIONS_ENDPOINT = f"{config.cp_backend_url}/deployments/regions" # Websocket endpoint to stream logs of a deployment DEPLOYMENT_LOGS_ENDPOINT = f'{config.cp_backend_url.replace("http", "ws")}/deployments' +# The HTTP endpoint to fetch logs of a deployment +POST_DEPLOYMENT_LOGS_ENDPOINT = f"{config.cp_backend_url}/deployments/logs" # Expected server response time to new deployment request. In seconds. DEPLOYMENT_PICKUP_DELAY = 30 # End of deployment workflow message. Used to determine if it is the last message from server. END_OF_DEPLOYMENT_MESSAGES = ["deploy success"] # How many iterations to try and print the deployment event messages from server during deployment. -DEPLOYMENT_EVENT_MESSAGES_RETRIES = 90 +DEPLOYMENT_EVENT_MESSAGES_RETRIES = 120 # Timeout limit for http requests HTTP_REQUEST_TIMEOUT = 60 # seconds @@ -726,21 +728,34 @@ def get_deployment_status(key: str) -> DeploymentStatusResponse: raise Exception("internal errors") from ex -def convert_to_local_time(iso_timestamp: str) -> str: - """Convert the iso timestamp to local time. +def convert_to_local_time_with_tz(iso_timestamp: str) -> datetime | None: + """Helper function to convert the iso timestamp to local time. Args: iso_timestamp: The iso timestamp to convert. Returns: - The converted timestamp string. + The converted timestamp with timezone. """ try: - local_dt = datetime.fromisoformat(iso_timestamp).astimezone() - return local_dt.strftime("%Y-%m-%d %H:%M:%S.%f %Z") - except Exception as ex: + return datetime.fromisoformat(iso_timestamp).astimezone() + except (TypeError, ValueError) as ex: console.error(f"Unable to convert iso timestamp {iso_timestamp} due to {ex}.") + return None + + +def convert_to_local_time_str(iso_timestamp: str) -> str: + """Convert the iso timestamp to local time. + + Args: + iso_timestamp: The iso timestamp to convert. + + Returns: + The converted timestamp string. + """ + if (local_dt := convert_to_local_time_with_tz(iso_timestamp)) is None: return iso_timestamp + return local_dt.strftime("%Y-%m-%d %H:%M:%S.%f %Z") class LogType(str, enum.Enum): @@ -798,7 +813,7 @@ async def get_logs( if v is None: row_to_print[k] = str(v) elif k == "timestamp": - row_to_print[k] = convert_to_local_time(v) + row_to_print[k] = convert_to_local_time_str(v) else: row_to_print[k] = v print(" | ".join(row_to_print.values())) @@ -1006,6 +1021,78 @@ def log_out_on_browser(): ) +def poll_deploy_milestones(key: str, from_iso_timestamp: datetime) -> bool | None: + """Periodically poll the hosting server for deploy milestones. + + Args: + key: The deployment key. + from_iso_timestamp: The timestamp of the deployment request time, this helps with the milestone query. + + Raises: + ValueError: If a non-empty key is not provided. + Exception: If the user is not authenticated. + + Returns: + False if server reports back failure, True otherwise. None if do not receive the end of deployment message. + """ + if not key: + raise ValueError("Non-empty key is required for querying deploy status.") + if not (token := requires_authenticated()): + raise Exception("not authenticated") + + for _ in range(DEPLOYMENT_EVENT_MESSAGES_RETRIES): + try: + response = httpx.post( + POST_DEPLOYMENT_LOGS_ENDPOINT, + json={ + "key": key, + "log_type": LogType.DEPLOY_LOG.value, + "from_iso_timestamp": from_iso_timestamp.astimezone().isoformat(), + }, + headers=authorization_header(token), + ) + response.raise_for_status() + # The return is expected to be a list of dicts + response_json = response.json() + for row in response_json: + console.print( + " | ".join( + [ + convert_to_local_time_str(row["timestamp"]), + row["message"], + ] + ) + ) + # update the from timestamp to the last timestamp of received message + if ( + maybe_timestamp := convert_to_local_time_with_tz(row["timestamp"]) + ) is not None: + console.debug( + f"Updating from {from_iso_timestamp} to {maybe_timestamp}" + ) + # Add a small delta so does not poll the same logs + from_iso_timestamp = maybe_timestamp + timedelta(microseconds=1e5) + else: + console.warn(f"Unable to parse timestamp {row['timestamp']}") + server_message = row["message"].lower() + if "fail" in server_message: + console.debug( + "Received failure message, stop event message streaming" + ) + return False + if any(msg in server_message for msg in END_OF_DEPLOYMENT_MESSAGES): + console.debug( + "Received end of deployment message, stop event message streaming" + ) + return True + time.sleep(1) + except httpx.HTTPError as he: + # This includes HTTP server and client error + console.debug(f"Unable to get more deployment events due to {he}.") + except Exception as ex: + console.warn(f"Unable to parse server response due to {ex}.") + + async def display_deploy_milestones(key: str, from_iso_timestamp: datetime) -> bool: """Display the deploy milestone messages reported back from the hosting server. @@ -1039,7 +1126,7 @@ async def display_deploy_milestones(key: str, from_iso_timestamp: datetime) -> b console.print( " | ".join( [ - convert_to_local_time(row_json["timestamp"]), + convert_to_local_time_str(row_json["timestamp"]), row_json["message"], ] ) diff --git a/tests/test_reflex.py b/tests/test_reflex.py index 44d3980d3a..a0667e3e62 100644 --- a/tests/test_reflex.py +++ b/tests/test_reflex.py @@ -118,7 +118,7 @@ def test_deploy_non_interactive_success( ), ) mocker.patch("reflex.utils.hosting.wait_for_server_to_pick_up_request") - mocker.patch("reflex.utils.hosting.display_deploy_milestones") + mocker.patch("reflex.utils.hosting.poll_deploy_milestones") mocker.patch("reflex.utils.hosting.poll_backend", return_value=True) mocker.patch("reflex.utils.hosting.poll_frontend", return_value=True) # TODO: typer option default not working in test for app name @@ -351,7 +351,7 @@ def test_deploy_interactive( ), ) mocker.patch("reflex.utils.hosting.wait_for_server_to_pick_up_request") - mocker.patch("reflex.utils.hosting.display_deploy_milestones") + mocker.patch("reflex.utils.hosting.poll_deploy_milestones") mocker.patch("reflex.utils.hosting.poll_backend", return_value=True) mocker.patch("reflex.utils.hosting.poll_frontend", return_value=True)