Skip to content

Commit

Permalink
[ProjectSummaries] Add pipelines_completed_recent_count, pipelines_fa…
Browse files Browse the repository at this point in the history
…iled_recent_count (#5523)
  • Loading branch information
roei3000b committed May 8, 2024
1 parent 2bd1a5d commit 08071fd
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 10 deletions.
2 changes: 2 additions & 0 deletions mlrun/common/schemas/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,8 @@ class ProjectSummary(pydantic.BaseModel):
runs_failed_recent_count: int
runs_running_count: int
schedules_count: int
pipelines_completed_recent_count: typing.Optional[int] = None
pipelines_failed_recent_count: typing.Optional[int] = None
pipelines_running_count: typing.Optional[int] = None


Expand Down
7 changes: 7 additions & 0 deletions mlrun/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,13 @@ def stable_statuses():
RunStatuses.error,
]

@staticmethod
def failed_statuses():
return [
RunStatuses.failed,
RunStatuses.error,
]

@staticmethod
def transient_statuses():
return [
Expand Down
67 changes: 59 additions & 8 deletions server/api/crud/projects.py
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,8 @@ async def generate_projects_summaries(
project_to_recent_completed_runs_count,
project_to_recent_failed_runs_count,
project_to_running_runs_count,
project_to_recent_completed_pipelines_count,
project_to_recent_failed_pipelines_count,
project_to_running_pipelines_count,
) = await self._get_project_resources_counters()
project_summaries = []
Expand All @@ -317,8 +319,14 @@ async def generate_projects_summaries(
project, 0
),
runs_running_count=project_to_running_runs_count.get(project, 0),
# project_to_running_pipelines_count is a defaultdict so it will return None if using dict.get()
# project_.*_pipelines_count is a defaultdict so it will return None if using dict.get()
# and the key wasn't set yet, so we need to use the [] operator to get the default value of the dict
pipelines_completed_recent_count=project_to_recent_completed_pipelines_count[
project
],
pipelines_failed_recent_count=project_to_recent_failed_pipelines_count[
project
],
pipelines_running_count=project_to_running_pipelines_count[project],
)
)
Expand All @@ -335,6 +343,8 @@ async def _get_project_resources_counters(
dict[str, int],
dict[str, int],
dict[str, typing.Union[int, None]],
dict[str, typing.Union[int, None]],
dict[str, typing.Union[int, None]],
]:
now = datetime.datetime.now()
if (
Expand All @@ -359,7 +369,11 @@ async def _get_project_resources_counters(
project_to_recent_failed_runs_count,
project_to_running_runs_count,
) = results[0]
project_to_running_pipelines_count = results[1]
(
project_to_recent_completed_pipelines_count,
project_to_recent_failed_pipelines_count,
project_to_running_pipelines_count,
) = results[1]
self._cache["project_resources_counters"]["result"] = (
project_to_files_count,
project_to_schedule_count,
Expand All @@ -368,6 +382,8 @@ async def _get_project_resources_counters(
project_to_recent_completed_runs_count,
project_to_recent_failed_runs_count,
project_to_running_runs_count,
project_to_recent_completed_pipelines_count,
project_to_recent_failed_pipelines_count,
project_to_running_pipelines_count,
)
ttl_time = datetime.datetime.now() + datetime.timedelta(
Expand All @@ -390,13 +406,23 @@ def _list_pipelines(

async def _calculate_pipelines_counters(
self,
) -> dict[str, typing.Union[int, None]]:
) -> (
dict[str, typing.Union[int, None]],
dict[str, typing.Union[int, None]],
dict[str, typing.Union[int, None]],
):
# creating defaultdict instead of a regular dict, because it possible that not all projects have pipelines
# and we want to return 0 for those projects, or None if we failed to get the information
project_to_running_pipelines_count = collections.defaultdict(lambda: 0)
project_to_recent_completed_pipelines_count = collections.defaultdict(lambda: 0)
project_to_recent_failed_pipelines_count = collections.defaultdict(lambda: 0)
if not mlrun.mlconf.resolve_kfp_url():
# If KFP is not configured, return dict with 0 counters (no running pipelines)
return project_to_running_pipelines_count
return (
project_to_recent_completed_pipelines_count,
project_to_recent_failed_pipelines_count,
project_to_running_pipelines_count,
)

try:
next_page_token = ""
Expand All @@ -417,7 +443,24 @@ async def _calculate_pipelines_counters(
not in mlrun.run.RunStatuses.stable_statuses()
):
project_to_running_pipelines_count[pipeline["project"]] += 1

elif "finished_at" in pipeline:
finished_at = datetime.datetime.strptime(
pipeline["finished_at"], "%Y-%m-%d %H:%M:%S%z"
)
if finished_at > datetime.datetime.now().astimezone(
tz=datetime.timezone.utc
) - datetime.timedelta(days=1):
if pipeline["status"] in mlrun.run.RunStatuses.succeeded:
project_to_recent_completed_pipelines_count[
pipeline["project"]
] += 1
elif (
pipeline["status"]
in mlrun.run.RunStatuses.failed_statuses()
):
project_to_recent_failed_pipelines_count[
pipeline["project"]
] += 1
if not next_page_token:
break

Expand All @@ -427,9 +470,17 @@ async def _calculate_pipelines_counters(
"Failed to list pipelines. Pipelines counters will be set to None",
exc=mlrun.errors.err_to_str(exc),
)
return collections.defaultdict(lambda: None)

return project_to_running_pipelines_count
# this function should return project_to_recent_completed_pipelines_count,
# project_to_recent_failed_pipelines_count, project_to_running_pipelines_count,
# in case of exception we want to return 3 * defaultdict of None because this function
# returns 3 values
return [collections.defaultdict(lambda: None)] * 3

return (
project_to_recent_completed_pipelines_count,
project_to_recent_failed_pipelines_count,
project_to_running_pipelines_count,
)

@staticmethod
def _wait_for_nuclio_project_deletion(
Expand Down
4 changes: 2 additions & 2 deletions tests/api/api/test_projects.py
Original file line number Diff line number Diff line change
Expand Up @@ -1717,9 +1717,9 @@ def _mock_pipelines(project_name):
def list_pipelines_return_value(*args, **kwargs):
next_page_token = "some-token"
if kwargs["page_token"] == "":
return (None, next_page_token, pipelines[: len(pipelines) // 2])
return None, next_page_token, pipelines[: len(pipelines) // 2]
elif kwargs["page_token"] == next_page_token:
return (None, None, pipelines[len(pipelines) // 2 :])
return None, None, pipelines[len(pipelines) // 2 :]

server.api.crud.Pipelines().list_pipelines = unittest.mock.Mock(
side_effect=list_pipelines_return_value
Expand Down

0 comments on commit 08071fd

Please sign in to comment.