Skip to content

Commit

Permalink
[API] Remove projects summary format & optimize counters calculation …
Browse files Browse the repository at this point in the history
…to happen in parallel (#1347)
  • Loading branch information
Hedingber committed Sep 24, 2021
1 parent 2a6349a commit 5287967
Show file tree
Hide file tree
Showing 10 changed files with 367 additions and 240 deletions.
25 changes: 16 additions & 9 deletions mlrun/api/api/endpoints/projects.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import typing

import fastapi
import fastapi.concurrency
import sqlalchemy.orm

import mlrun.api.api.deps
Expand Down Expand Up @@ -218,7 +219,7 @@ def list_projects(
@router.get(
"/project-summaries", response_model=mlrun.api.schemas.ProjectSummariesOutput
)
def list_project_summaries(
async def list_project_summaries(
owner: str = None,
labels: typing.List[str] = fastapi.Query(None, alias="label"),
state: mlrun.api.schemas.ProjectState = None,
Expand All @@ -229,7 +230,8 @@ def list_project_summaries(
mlrun.api.api.deps.get_db_session
),
):
projects_output = get_project_member().list_projects(
projects_output = await fastapi.concurrency.run_in_threadpool(
get_project_member().list_projects,
db_session,
owner,
mlrun.api.schemas.ProjectsFormat.name_only,
Expand All @@ -241,10 +243,12 @@ def list_project_summaries(
allowed_project_names = projects_output.projects
# skip permission check if it's the leader
if not _is_request_from_leader(auth_verifier.auth_info.projects_role):
allowed_project_names = mlrun.api.utils.clients.opa.Client().filter_projects_by_permissions(
projects_output.projects, auth_verifier.auth_info,
allowed_project_names = await fastapi.concurrency.run_in_threadpool(
mlrun.api.utils.clients.opa.Client().filter_projects_by_permissions,
projects_output.projects,
auth_verifier.auth_info,
)
return get_project_member().list_project_summaries(
return await get_project_member().list_project_summaries(
db_session,
owner,
labels,
Expand All @@ -258,7 +262,7 @@ def list_project_summaries(
@router.get(
"/project-summaries/{name}", response_model=mlrun.api.schemas.ProjectSummary
)
def get_project_summary(
async def get_project_summary(
name: str,
db_session: sqlalchemy.orm.Session = fastapi.Depends(
mlrun.api.api.deps.get_db_session
Expand All @@ -267,13 +271,16 @@ def get_project_summary(
mlrun.api.api.deps.AuthVerifierDep
),
):
project_summary = get_project_member().get_project_summary(
project_summary = await get_project_member().get_project_summary(
db_session, name, auth_verifier.auth_info.session
)
# skip permission check if it's the leader
if not _is_request_from_leader(auth_verifier.auth_info.projects_role):
mlrun.api.utils.clients.opa.Client().query_project_permissions(
name, mlrun.api.schemas.AuthorizationAction.read, auth_verifier.auth_info,
await fastapi.concurrency.run_in_threadpool(
mlrun.api.utils.clients.opa.Client().query_project_permissions,
name,
mlrun.api.schemas.AuthorizationAction.read,
auth_verifier.auth_info,
)
return project_summary

Expand Down
152 changes: 119 additions & 33 deletions mlrun/api/crud/projects.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
import asyncio
import collections
import datetime
import typing

import fastapi.concurrency
import humanfriendly
import sqlalchemy.orm

import mlrun.api.crud
Expand All @@ -17,6 +21,12 @@ class Projects(
mlrun.api.utils.projects.remotes.follower.Member,
metaclass=mlrun.utils.singleton.AbstractSingleton,
):
def __init__(self) -> None:
super().__init__()
self._cache = {
"project_resources_counters": {"value": None, "ttl": datetime.datetime.min}
}

def create_project(
self, session: sqlalchemy.orm.Session, project: mlrun.api.schemas.Project
):
Expand Down Expand Up @@ -121,58 +131,134 @@ def list_projects(
session, owner, format_, labels, state, names
)

def list_project_summaries(
async def list_project_summaries(
self,
session: sqlalchemy.orm.Session,
owner: str = None,
labels: typing.List[str] = None,
state: mlrun.api.schemas.ProjectState = None,
names: typing.Optional[typing.List[str]] = None,
) -> mlrun.api.schemas.ProjectSummariesOutput:
project_summaries = mlrun.api.utils.singletons.db.get_db().list_project_summaries(
session, owner, labels, state, names
projects_output = await fastapi.concurrency.run_in_threadpool(
self.list_projects,
session,
owner,
mlrun.api.schemas.ProjectsFormat.name_only,
labels,
state,
names,
)
self._add_pipeline_running_count_to_project_summaries(
session, project_summaries.project_summaries
project_summaries = await self.generate_projects_summaries(
session, projects_output.projects
)
return mlrun.api.schemas.ProjectSummariesOutput(
project_summaries=project_summaries
)
return project_summaries

def get_project_summary(
async def get_project_summary(
self, session: sqlalchemy.orm.Session, name: str
) -> mlrun.api.schemas.ProjectSummary:
project_summary = mlrun.api.utils.singletons.db.get_db().get_project_summary(
session, name
)
self._add_pipeline_running_count_to_project_summaries(
session, [project_summary]
)
return project_summary
# Call get project so we'll explode if project doesn't exists
await fastapi.concurrency.run_in_threadpool(self.get_project, session, name)
project_summaries = await self.generate_projects_summaries(session, [name])
return project_summaries[0]

def generate_projects_summaries(
async def generate_projects_summaries(
self, session: sqlalchemy.orm.Session, projects: typing.List[str]
) -> typing.List[mlrun.api.schemas.ProjectSummary]:
project_summaries = mlrun.api.utils.singletons.db.get_db().generate_projects_summaries(
session, projects
)
self._add_pipeline_running_count_to_project_summaries(
session, project_summaries
)
(
project_to_function_count,
project_to_schedule_count,
project_to_feature_set_count,
project_to_models_count,
project_to_recent_failed_runs_count,
project_to_running_runs_count,
project_to_running_pipelines_count,
) = await self._get_project_resources_counters(session)
project_summaries = []
for project in projects:
project_summaries.append(
mlrun.api.schemas.ProjectSummary(
name=project,
functions_count=project_to_function_count.get(project, 0),
schedules_count=project_to_schedule_count.get(project, 0),
feature_sets_count=project_to_feature_set_count.get(project, 0),
models_count=project_to_models_count.get(project, 0),
runs_failed_recent_count=project_to_recent_failed_runs_count.get(
project, 0
),
runs_running_count=project_to_running_runs_count.get(project, 0),
pipelines_running_count=project_to_running_pipelines_count.get(
project, 0
),
)
)
return project_summaries

def _add_pipeline_running_count_to_project_summaries(
self,
session: sqlalchemy.orm.Session,
project_summaries: typing.List[mlrun.api.schemas.ProjectSummary],
):
_, _, pipelines = mlrun.api.crud.Pipelines().list_pipelines(
session, "*", format_=mlrun.api.schemas.PipelinesFormat.metadata_only
async def _get_project_resources_counters(
self, session: sqlalchemy.orm.Session
) -> typing.Tuple[
typing.Dict[str, int],
typing.Dict[str, int],
typing.Dict[str, int],
typing.Dict[str, int],
typing.Dict[str, int],
typing.Dict[str, int],
typing.Dict[str, int],
]:
now = datetime.datetime.now()
if (
not self._cache["project_resources_counters"]["ttl"]
or self._cache["project_resources_counters"]["ttl"] < now
):
logger.debug(
"Project resources counter cache expired. Calculating",
ttl=self._cache["project_resources_counters"]["ttl"],
)

results = await asyncio.gather(
mlrun.api.utils.singletons.db.get_db().get_project_resources_counters(
session
),
self._calculate_pipelines_counters(session),
)
(
project_to_function_count,
project_to_schedule_count,
project_to_feature_set_count,
project_to_models_count,
project_to_recent_failed_runs_count,
project_to_running_runs_count,
) = results[0]
project_to_running_pipelines_count = results[1]
self._cache["project_resources_counters"]["result"] = (
project_to_function_count,
project_to_schedule_count,
project_to_feature_set_count,
project_to_models_count,
project_to_recent_failed_runs_count,
project_to_running_runs_count,
project_to_running_pipelines_count,
)
ttl_time = datetime.datetime.now() + datetime.timedelta(
seconds=humanfriendly.parse_timespan(
mlrun.mlconf.httpdb.projects.counters_cache_ttl
)
)
self._cache["project_resources_counters"]["ttl"] = ttl_time
return self._cache["project_resources_counters"]["result"]

async def _calculate_pipelines_counters(
self, session: sqlalchemy.orm.Session,
) -> typing.Dict[str, int]:
_, _, pipelines = await fastapi.concurrency.run_in_threadpool(
mlrun.api.crud.Pipelines().list_pipelines,
session,
"*",
format_=mlrun.api.schemas.PipelinesFormat.metadata_only,
)
project_to_running_pipelines_count = collections.defaultdict(int)
for pipeline in pipelines:
if pipeline["status"] not in mlrun.run.RunStatuses.stable_statuses():
project_to_running_pipelines_count[pipeline["project"]] += 1
for project_summary in project_summaries:
project_summary.pipelines_running_count = project_to_running_pipelines_count.get(
project_summary.name, 0
)
return project_summaries
return project_to_running_pipelines_count
22 changes: 10 additions & 12 deletions mlrun/api/db/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,18 +231,16 @@ def get_project(
pass

@abstractmethod
def list_project_summaries(
self,
session,
owner: str = None,
labels: List[str] = None,
state: schemas.ProjectState = None,
names: Optional[List[str]] = None,
) -> schemas.ProjectSummariesOutput:
pass

@abstractmethod
def get_project_summary(self, session, name: str) -> schemas.ProjectSummary:
async def get_project_resources_counters(
self, session
) -> Tuple[
Dict[str, int],
Dict[str, int],
Dict[str, int],
Dict[str, int],
Dict[str, int],
Dict[str, int],
]:
pass

@abstractmethod
Expand Down
23 changes: 11 additions & 12 deletions mlrun/api/db/filedb/db.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Any, Dict, List, Optional
from typing import Any, Dict, List, Optional, Tuple

from mlrun.api import schemas
from mlrun.api.db.base import DBError, DBInterface
Expand Down Expand Up @@ -166,19 +166,18 @@ def list_projects(
self.db.list_projects, owner, format_, labels, state
)

def list_project_summaries(
self,
session,
owner: str = None,
labels: List[str] = None,
state: schemas.ProjectState = None,
names: Optional[List[str]] = None,
) -> schemas.ProjectSummariesOutput:
async def get_project_resources_counters(
self, session
) -> Tuple[
Dict[str, int],
Dict[str, int],
Dict[str, int],
Dict[str, int],
Dict[str, int],
Dict[str, int],
]:
raise NotImplementedError()

def get_project_summary(self, session, name: str) -> schemas.ProjectSummary:
raise NotImplementedError("Get project summary is not supported")

def store_project(self, session, name: str, project: schemas.Project):
raise NotImplementedError()

Expand Down

0 comments on commit 5287967

Please sign in to comment.