From 08071fdc14cac05d8c6f373a7190d7156fec8e7a Mon Sep 17 00:00:00 2001 From: roei3000b <40743125+roei3000b@users.noreply.github.com> Date: Wed, 8 May 2024 14:54:33 +0300 Subject: [PATCH] [ProjectSummaries] Add pipelines_completed_recent_count, pipelines_failed_recent_count (#5523) --- mlrun/common/schemas/project.py | 2 + mlrun/run.py | 7 ++++ server/api/crud/projects.py | 67 +++++++++++++++++++++++++++++---- tests/api/api/test_projects.py | 4 +- 4 files changed, 70 insertions(+), 10 deletions(-) diff --git a/mlrun/common/schemas/project.py b/mlrun/common/schemas/project.py index f799ef85c1e..b53cf779240 100644 --- a/mlrun/common/schemas/project.py +++ b/mlrun/common/schemas/project.py @@ -114,6 +114,8 @@ class ProjectSummary(pydantic.BaseModel): runs_failed_recent_count: int runs_running_count: int schedules_count: int + pipelines_completed_recent_count: typing.Optional[int] = None + pipelines_failed_recent_count: typing.Optional[int] = None pipelines_running_count: typing.Optional[int] = None diff --git a/mlrun/run.py b/mlrun/run.py index df36b0ae2b9..6175666001a 100644 --- a/mlrun/run.py +++ b/mlrun/run.py @@ -94,6 +94,13 @@ def stable_statuses(): RunStatuses.error, ] + @staticmethod + def failed_statuses(): + return [ + RunStatuses.failed, + RunStatuses.error, + ] + @staticmethod def transient_statuses(): return [ diff --git a/server/api/crud/projects.py b/server/api/crud/projects.py index 36fdaa16173..5c6433c4f69 100644 --- a/server/api/crud/projects.py +++ b/server/api/crud/projects.py @@ -299,6 +299,8 @@ async def generate_projects_summaries( project_to_recent_completed_runs_count, project_to_recent_failed_runs_count, project_to_running_runs_count, + project_to_recent_completed_pipelines_count, + project_to_recent_failed_pipelines_count, project_to_running_pipelines_count, ) = await self._get_project_resources_counters() project_summaries = [] @@ -317,8 +319,14 @@ async def generate_projects_summaries( project, 0 ), runs_running_count=project_to_running_runs_count.get(project, 0), - # project_to_running_pipelines_count is a defaultdict so it will return None if using dict.get() + # project_.*_pipelines_count is a defaultdict so it will return None if using dict.get() # and the key wasn't set yet, so we need to use the [] operator to get the default value of the dict + pipelines_completed_recent_count=project_to_recent_completed_pipelines_count[ + project + ], + pipelines_failed_recent_count=project_to_recent_failed_pipelines_count[ + project + ], pipelines_running_count=project_to_running_pipelines_count[project], ) ) @@ -335,6 +343,8 @@ async def _get_project_resources_counters( dict[str, int], dict[str, int], dict[str, typing.Union[int, None]], + dict[str, typing.Union[int, None]], + dict[str, typing.Union[int, None]], ]: now = datetime.datetime.now() if ( @@ -359,7 +369,11 @@ async def _get_project_resources_counters( project_to_recent_failed_runs_count, project_to_running_runs_count, ) = results[0] - project_to_running_pipelines_count = results[1] + ( + project_to_recent_completed_pipelines_count, + project_to_recent_failed_pipelines_count, + project_to_running_pipelines_count, + ) = results[1] self._cache["project_resources_counters"]["result"] = ( project_to_files_count, project_to_schedule_count, @@ -368,6 +382,8 @@ async def _get_project_resources_counters( project_to_recent_completed_runs_count, project_to_recent_failed_runs_count, project_to_running_runs_count, + project_to_recent_completed_pipelines_count, + project_to_recent_failed_pipelines_count, project_to_running_pipelines_count, ) ttl_time = datetime.datetime.now() + datetime.timedelta( @@ -390,13 +406,23 @@ def _list_pipelines( async def _calculate_pipelines_counters( self, - ) -> dict[str, typing.Union[int, None]]: + ) -> ( + dict[str, typing.Union[int, None]], + dict[str, typing.Union[int, None]], + dict[str, typing.Union[int, None]], + ): # creating defaultdict instead of a regular dict, because it possible that not all projects have pipelines # and we want to return 0 for those projects, or None if we failed to get the information project_to_running_pipelines_count = collections.defaultdict(lambda: 0) + project_to_recent_completed_pipelines_count = collections.defaultdict(lambda: 0) + project_to_recent_failed_pipelines_count = collections.defaultdict(lambda: 0) if not mlrun.mlconf.resolve_kfp_url(): # If KFP is not configured, return dict with 0 counters (no running pipelines) - return project_to_running_pipelines_count + return ( + project_to_recent_completed_pipelines_count, + project_to_recent_failed_pipelines_count, + project_to_running_pipelines_count, + ) try: next_page_token = "" @@ -417,7 +443,24 @@ async def _calculate_pipelines_counters( not in mlrun.run.RunStatuses.stable_statuses() ): project_to_running_pipelines_count[pipeline["project"]] += 1 - + elif "finished_at" in pipeline: + finished_at = datetime.datetime.strptime( + pipeline["finished_at"], "%Y-%m-%d %H:%M:%S%z" + ) + if finished_at > datetime.datetime.now().astimezone( + tz=datetime.timezone.utc + ) - datetime.timedelta(days=1): + if pipeline["status"] in mlrun.run.RunStatuses.succeeded: + project_to_recent_completed_pipelines_count[ + pipeline["project"] + ] += 1 + elif ( + pipeline["status"] + in mlrun.run.RunStatuses.failed_statuses() + ): + project_to_recent_failed_pipelines_count[ + pipeline["project"] + ] += 1 if not next_page_token: break @@ -427,9 +470,17 @@ async def _calculate_pipelines_counters( "Failed to list pipelines. Pipelines counters will be set to None", exc=mlrun.errors.err_to_str(exc), ) - return collections.defaultdict(lambda: None) - - return project_to_running_pipelines_count + # this function should return project_to_recent_completed_pipelines_count, + # project_to_recent_failed_pipelines_count, project_to_running_pipelines_count, + # in case of exception we want to return 3 * defaultdict of None because this function + # returns 3 values + return [collections.defaultdict(lambda: None)] * 3 + + return ( + project_to_recent_completed_pipelines_count, + project_to_recent_failed_pipelines_count, + project_to_running_pipelines_count, + ) @staticmethod def _wait_for_nuclio_project_deletion( diff --git a/tests/api/api/test_projects.py b/tests/api/api/test_projects.py index 702c9c3f9e8..4037276e6c3 100644 --- a/tests/api/api/test_projects.py +++ b/tests/api/api/test_projects.py @@ -1717,9 +1717,9 @@ def _mock_pipelines(project_name): def list_pipelines_return_value(*args, **kwargs): next_page_token = "some-token" if kwargs["page_token"] == "": - return (None, next_page_token, pipelines[: len(pipelines) // 2]) + return None, next_page_token, pipelines[: len(pipelines) // 2] elif kwargs["page_token"] == next_page_token: - return (None, None, pipelines[len(pipelines) // 2 :]) + return None, None, pipelines[len(pipelines) // 2 :] server.api.crud.Pipelines().list_pipelines = unittest.mock.Mock( side_effect=list_pipelines_return_value