Skip to content

Commit

Permalink
[Projects] Add summary format to improve projects screen performance (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
Hedingber committed Apr 1, 2021
1 parent cb4dd7e commit 266dcd2
Show file tree
Hide file tree
Showing 8 changed files with 363 additions and 43 deletions.
204 changes: 166 additions & 38 deletions mlrun/api/db/sqldb/db.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import collections
from copy import deepcopy
from datetime import datetime, timedelta, timezone
from typing import Any, Dict, List

import humanfriendly
import mergedeep
import pytz
from sqlalchemy import and_, func, or_
from sqlalchemy import and_, distinct, func, or_
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.orm import Session

Expand Down Expand Up @@ -58,6 +60,9 @@
class SQLDB(mlrun.api.utils.projects.remotes.member.Member, DBInterface):
def __init__(self, dsn):
self.dsn = dsn
self._cache = {
"project_resources_counters": {"value": None, "ttl": datetime.min}
}

def initialize(self, session):
return
Expand Down Expand Up @@ -836,19 +841,147 @@ def list_projects(
query = self._query(session, Project, owner=owner, state=state)
if labels:
query = self._add_labels_filter(session, query, Project, labels)
project_records = query.all()
project_names = [project_record.name for project_record in project_records]
projects = []
for project_record in query:
if format_ == mlrun.api.schemas.Format.name_only:
projects.append(project_record.name)
elif format_ == mlrun.api.schemas.Format.full:
projects.append(
self._transform_project_record_to_schema(session, project_record)
# calculating the project summary data is done by doing cross project queries (and not per project) so we're
# building it outside of the loop
if format_ == mlrun.api.schemas.Format.summary:
projects = self._generate_projects_summaries(session, project_names)
else:
for project_record in project_records:
if format_ == mlrun.api.schemas.Format.name_only:
projects = project_names
elif format_ == mlrun.api.schemas.Format.full:
projects.append(
self._transform_project_record_to_schema(
session, project_record
)
)
else:
raise NotImplementedError(
f"Provided format is not supported. format={format_}"
)
return schemas.ProjectsOutput(projects=projects)

def _get_project_resources_counters(self, session: Session):
now = datetime.now()
if (
not self._cache["project_resources_counters"]["ttl"]
or self._cache["project_resources_counters"]["ttl"] < now
):
logger.debug(
"Project resources counter cache expired. Calculating",
ttl=self._cache["project_resources_counters"]["ttl"],
)
import mlrun.artifacts

functions_count_per_project = (
session.query(Function.project, func.count(distinct(Function.name)))
.group_by(Function.project)
.all()
)
project_to_function_count = {
result[0]: result[1] for result in functions_count_per_project
}
feature_sets_count_per_project = (
session.query(FeatureSet.project, func.count(distinct(FeatureSet.name)))
.group_by(FeatureSet.project)
.all()
)
project_to_feature_set_count = {
result[0]: result[1] for result in feature_sets_count_per_project
}
# The kind filter is applied post the query to the DB (manually in python code), so counting should be that
# way as well, therefore we're doing it here, and can't do it with sql as the above
# We're using the "latest" which gives us only one version of each artifact key, which is what we want to
# count (artifact count, not artifact versions count)
model_artifacts = self._find_artifacts(
session, None, "latest", kind=mlrun.artifacts.model.ModelArtifact.kind
)
project_to_models_count = collections.defaultdict(int)
for model_artifact in model_artifacts:
project_to_models_count[model_artifact.project] += 1
runs = self._find_runs(session, None, "*", None)
project_to_recent_failed_runs_count = collections.defaultdict(int)
project_to_running_runs_count = collections.defaultdict(int)
# we want to count unique run names, and not all occurrences of all runs, therefore we're keeping set of
# names and only count new names
project_to_recent_failed_run_names = collections.defaultdict(set)
project_to_running_run_names = collections.defaultdict(set)
runs = runs.all()
for run in runs:
run_json = run.struct
if self._is_run_matching_state(
run,
run_json,
mlrun.runtimes.constants.RunStates.non_terminal_states(),
):
if (
run_json.get("metadata", {}).get("name")
and run_json["metadata"]["name"]
not in project_to_running_run_names[run.project]
):
project_to_running_run_names[run.project].add(
run_json["metadata"]["name"]
)
project_to_running_runs_count[run.project] += 1
if self._is_run_matching_state(
run, run_json, mlrun.runtimes.constants.RunStates.error
):
one_day_ago = datetime.now() - timedelta(hours=24)
if run.start_time and run.start_time >= one_day_ago:
if (
run_json.get("metadata", {}).get("name")
and run_json["metadata"]["name"]
not in project_to_recent_failed_run_names[run.project]
):
project_to_recent_failed_run_names[run.project].add(
run_json["metadata"]["name"]
)
project_to_recent_failed_runs_count[run.project] += 1

self._cache["project_resources_counters"]["result"] = (
project_to_function_count,
project_to_feature_set_count,
project_to_models_count,
project_to_recent_failed_runs_count,
project_to_running_runs_count,
)
ttl_time = datetime.now() + timedelta(
seconds=humanfriendly.parse_timespan(
config.httpdb.projects.counters_cache_ttl
)
else:
raise NotImplementedError(
f"Provided format is not supported. format={format_}"
)
self._cache["project_resources_counters"]["ttl"] = ttl_time

return self._cache["project_resources_counters"]["result"]

def _generate_projects_summaries(
self, session: Session, projects: List[str]
) -> List[mlrun.api.schemas.ProjectSummary]:
(
project_to_function_count,
project_to_feature_set_count,
project_to_models_count,
project_to_recent_failed_runs_count,
project_to_running_runs_count,
) = self._get_project_resources_counters(session)
project_summaries = []
for project in projects:
project_summaries.append(
mlrun.api.schemas.ProjectSummary(
name=project,
functions_count=project_to_function_count.get(project, 0),
feature_sets_count=project_to_feature_set_count.get(project, 0),
models_count=project_to_models_count.get(project, 0),
runs_failed_recent_count=project_to_recent_failed_runs_count.get(
project, 0
),
runs_running_count=project_to_running_runs_count.get(project, 0),
)
return schemas.ProjectsOutput(projects=projects)
)
return project_summaries

def _update_project_record_from_project(
self, session: Session, project_record: Project, project: schemas.Project
Expand Down Expand Up @@ -1845,34 +1978,8 @@ def _post_query_runs_filter(
):
continue
if state:
requested_states = as_list(state)
record_state = run.state
json_state = None
if (
run_json
and isinstance(run_json, dict)
and run_json.get("status", {}).get("state")
):
json_state = run_json.get("status", {}).get("state")
if not record_state and not json_state:
if not self._is_run_matching_state(run, run_json, state):
continue
# json_state has precedence over record state
if json_state:
if all(
[
requested_state not in json_state
for requested_state in requested_states
]
):
continue
else:
if all(
[
requested_state not in record_state
for requested_state in requested_states
]
):
continue
if last_update_time_from or last_update_time_to:
if not match_times(
last_update_time_from,
Expand All @@ -1886,6 +1993,27 @@ def _post_query_runs_filter(

return filtered_runs

def _is_run_matching_state(self, run, run_json, state):
requested_states = as_list(state)
record_state = run.state
json_state = None
if (
run_json
and isinstance(run_json, dict)
and run_json.get("status", {}).get("state")
):
json_state = run_json.get("status", {}).get("state")
if not record_state and not json_state:
return False
# json_state has precedence over record state
if json_state:
if json_state in requested_states:
return True
else:
if record_state in requested_states:
return True
return False

def _latest_uid_filter(self, session, query):
# Create a sub query of latest uid (by updated) per (project,key)
subq = (
Expand Down
1 change: 1 addition & 0 deletions mlrun/api/schemas/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
ProjectSpec,
ProjectState,
ProjectStatus,
ProjectSummary,
)
from .runtime_resource import (
GroupedRuntimeResourcesOutput,
Expand Down
1 change: 1 addition & 0 deletions mlrun/api/schemas/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ class Format(str, Enum):
full = "full"
name_only = "name_only"
metadata_only = "metadata_only"
summary = "summary"


class PatchMode(str, Enum):
Expand Down
11 changes: 10 additions & 1 deletion mlrun/api/schemas/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,15 @@ class Project(pydantic.BaseModel):
status: ObjectStatus = ObjectStatus()


class ProjectSummary(pydantic.BaseModel):
name: str
functions_count: int
feature_sets_count: int
models_count: int
runs_failed_recent_count: int
runs_running_count: int


class ProjectsOutput(pydantic.BaseModel):
# use the format query param to control whether the full object will be returned or only the names
projects: typing.List[typing.Union[Project, str]]
projects: typing.List[typing.Union[Project, str, ProjectSummary]]
1 change: 1 addition & 0 deletions mlrun/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@
"leader": "mlrun",
"followers": "",
"periodic_sync_interval": "1 minute",
"counters_cache_ttl": "10 seconds",
},
# The API needs to know what is its k8s svc url so it could enrich it in the jobs it creates
"api_url": "",
Expand Down

0 comments on commit 266dcd2

Please sign in to comment.