Skip to content

Commit

Permalink
[Projects] Support being a follower (#839)
Browse files Browse the repository at this point in the history
  • Loading branch information
Hedingber committed May 10, 2021
1 parent a390777 commit 1558004
Show file tree
Hide file tree
Showing 28 changed files with 1,888 additions and 128 deletions.
73 changes: 64 additions & 9 deletions mlrun/api/api/endpoints/projects.py
Expand Up @@ -12,33 +12,78 @@


# curl -d '{"name": "p1", "description": "desc", "users": ["u1", "u2"]}' http://localhost:8080/project
@router.post("/projects", response_model=schemas.Project)
@router.post(
"/projects",
responses={
HTTPStatus.CREATED.value: {"model": schemas.Project},
HTTPStatus.ACCEPTED.value: {},
},
)
def create_project(
project: schemas.Project, db_session: Session = Depends(deps.get_db_session),
project: schemas.Project,
response: Response,
projects_role: typing.Optional[schemas.ProjectsRole] = Header(
None, alias=schemas.HeaderNames.projects_role
),
db_session: Session = Depends(deps.get_db_session),
):
return get_project_member().create_project(db_session, project)
project, is_running_in_background = get_project_member().create_project(
db_session, project, projects_role, wait_for_completion=False
)
if is_running_in_background:
return Response(status_code=HTTPStatus.ACCEPTED.value)
response.status_code = HTTPStatus.CREATED.value
return project


# curl -d '{"name": "p1", "description": "desc", "users": ["u1", "u2"]}' -X UPDATE http://localhost:8080/project
@router.put("/projects/{name}", response_model=schemas.Project)
@router.put(
"/projects/{name}",
responses={
HTTPStatus.OK.value: {"model": schemas.Project},
HTTPStatus.ACCEPTED.value: {},
},
)
def store_project(
project: schemas.Project,
name: str,
projects_role: typing.Optional[schemas.ProjectsRole] = Header(
None, alias=schemas.HeaderNames.projects_role
),
db_session: Session = Depends(deps.get_db_session),
):
return get_project_member().store_project(db_session, name, project)
project, is_running_in_background = get_project_member().store_project(
db_session, name, project, projects_role, wait_for_completion=False
)
if is_running_in_background:
return Response(status_code=HTTPStatus.ACCEPTED.value)
return project


@router.patch("/projects/{name}", response_model=schemas.Project)
@router.patch(
"/projects/{name}",
responses={
HTTPStatus.OK.value: {"model": schemas.Project},
HTTPStatus.ACCEPTED.value: {},
},
)
def patch_project(
project: dict,
name: str,
patch_mode: schemas.PatchMode = Header(
schemas.PatchMode.replace, alias=schemas.HeaderNames.patch_mode
),
projects_role: typing.Optional[schemas.ProjectsRole] = Header(
None, alias=schemas.HeaderNames.projects_role
),
db_session: Session = Depends(deps.get_db_session),
):
return get_project_member().patch_project(db_session, name, project, patch_mode)
project, is_running_in_background = get_project_member().patch_project(
db_session, name, project, patch_mode, projects_role, wait_for_completion=False
)
if is_running_in_background:
return Response(status_code=HTTPStatus.ACCEPTED.value)
return project


# curl http://localhost:8080/project/<name>
Expand All @@ -47,15 +92,25 @@ def get_project(name: str, db_session: Session = Depends(deps.get_db_session)):
return get_project_member().get_project(db_session, name)


@router.delete("/projects/{name}", status_code=HTTPStatus.NO_CONTENT.value)
@router.delete(
"/projects/{name}",
responses={HTTPStatus.NO_CONTENT.value: {}, HTTPStatus.ACCEPTED.value: {}},
)
def delete_project(
name: str,
deletion_strategy: schemas.DeletionStrategy = Header(
schemas.DeletionStrategy.default(), alias=schemas.HeaderNames.deletion_strategy
),
projects_role: typing.Optional[schemas.ProjectsRole] = Header(
None, alias=schemas.HeaderNames.projects_role
),
db_session: Session = Depends(deps.get_db_session),
):
get_project_member().delete_project(db_session, name, deletion_strategy)
is_running_in_background = get_project_member().delete_project(
db_session, name, deletion_strategy, projects_role, wait_for_completion=False
)
if is_running_in_background:
return Response(status_code=HTTPStatus.ACCEPTED.value)
return Response(status_code=HTTPStatus.NO_CONTENT.value)


Expand Down
6 changes: 3 additions & 3 deletions mlrun/api/crud/projects.py
Expand Up @@ -4,15 +4,15 @@

import mlrun.api.crud
import mlrun.api.schemas
import mlrun.api.utils.projects.remotes.member
import mlrun.api.utils.projects.remotes.follower
import mlrun.api.utils.singletons.db
import mlrun.errors
import mlrun.utils.singleton
from mlrun.utils import logger


class Projects(
mlrun.api.utils.projects.remotes.member.Member,
mlrun.api.utils.projects.remotes.follower.Member,
metaclass=mlrun.utils.singleton.AbstractSingleton,
):
def create_project(
Expand Down Expand Up @@ -51,7 +51,7 @@ def delete_project(
deletion_strategy: mlrun.api.schemas.DeletionStrategy = mlrun.api.schemas.DeletionStrategy.default(),
):
logger.debug("Deleting project", name=name, deletion_strategy=deletion_strategy)
if deletion_strategy == mlrun.api.schemas.DeletionStrategy.cascade:
if deletion_strategy.is_cascading():
# delete runtime resources
mlrun.api.crud.Runtimes().delete_runtimes(
session, label_selector=f"mlrun/project={name}", force=True
Expand Down
2 changes: 1 addition & 1 deletion mlrun/api/crud/runs.py
Expand Up @@ -2,7 +2,7 @@

import mlrun.api.api.utils
import mlrun.api.schemas
import mlrun.api.utils.projects.remotes.member
import mlrun.api.utils.projects.remotes.follower
import mlrun.api.utils.singletons.db
import mlrun.config
import mlrun.errors
Expand Down
2 changes: 1 addition & 1 deletion mlrun/api/crud/runtimes.py
Expand Up @@ -6,7 +6,7 @@

import mlrun.api.api.utils
import mlrun.api.schemas
import mlrun.api.utils.projects.remotes.member
import mlrun.api.utils.projects.remotes.follower
import mlrun.api.utils.singletons.db
import mlrun.config
import mlrun.errors
Expand Down
6 changes: 6 additions & 0 deletions mlrun/api/db/base.py
Expand Up @@ -183,6 +183,12 @@ def get_schedule(self, session, project: str, name: str) -> schemas.ScheduleReco
def delete_schedule(self, session, project: str, name: str):
pass

@abstractmethod
def generate_projects_summaries(
self, session, projects: List[str]
) -> List[schemas.ProjectSummary]:
pass

@abstractmethod
def list_projects(
self,
Expand Down
5 changes: 5 additions & 0 deletions mlrun/api/db/filedb/db.py
Expand Up @@ -134,6 +134,11 @@ def list_functions(self, session, name=None, project="", tag="", labels=None):
def store_schedule(self, session, data):
return self._transform_run_db_error(self.db.store_schedule, data)

def generate_projects_summaries(
self, session, projects: List[str]
) -> List[schemas.ProjectSummary]:
raise NotImplementedError()

def list_projects(
self,
session,
Expand Down
12 changes: 6 additions & 6 deletions mlrun/api/db/sqldb/db.py
Expand Up @@ -11,7 +11,7 @@
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.orm import Session, aliased

import mlrun.api.utils.projects.remotes.member
import mlrun.api.utils.projects.remotes.follower
import mlrun.errors
from mlrun.api import schemas
from mlrun.api.db.base import DBInterface
Expand Down Expand Up @@ -59,7 +59,7 @@
unversioned_tagged_object_uid_prefix = "unversioned-"


class SQLDB(mlrun.api.utils.projects.remotes.member.Member, DBInterface):
class SQLDB(mlrun.api.utils.projects.remotes.follower.Member, DBInterface):
def __init__(self, dsn):
self.dsn = dsn
self._cache = {
Expand Down Expand Up @@ -825,14 +825,14 @@ def delete_project(
logger.debug(
"Deleting project from DB", name=name, deletion_strategy=deletion_strategy
)
if deletion_strategy == schemas.DeletionStrategy.restrict:
if deletion_strategy.is_restricted():
project_record = self._get_project_record(
session, name, raise_on_not_found=False
)
if not project_record:
return
self._verify_project_has_no_related_resources(session, name)
elif deletion_strategy == schemas.DeletionStrategy.cascade:
elif deletion_strategy.is_cascading():
self._delete_project_related_resources(session, name)
else:
raise mlrun.errors.MLRunInvalidArgumentError(
Expand All @@ -857,7 +857,7 @@ def list_projects(
# calculating the project summary data is done by doing cross project queries (and not per project) so we're
# building it outside of the loop
if format_ == mlrun.api.schemas.Format.summary:
projects = self._generate_projects_summaries(session, project_names)
projects = self.generate_projects_summaries(session, project_names)
else:
for project_record in project_records:
if format_ == mlrun.api.schemas.Format.name_only:
Expand Down Expand Up @@ -972,7 +972,7 @@ def _get_project_resources_counters(self, session: Session):

return self._cache["project_resources_counters"]["result"]

def _generate_projects_summaries(
def generate_projects_summaries(
self, session: Session, projects: List[str]
) -> List[mlrun.api.schemas.ProjectSummary]:
(
Expand Down
2 changes: 2 additions & 0 deletions mlrun/api/schemas/__init__.py
Expand Up @@ -15,6 +15,7 @@
HeaderNames,
OrderType,
PatchMode,
ProjectsRole,
SortField,
)
from .feature_store import (
Expand Down Expand Up @@ -59,6 +60,7 @@
from .pipeline import PipelinesOutput, PipelinesPagination
from .project import (
Project,
ProjectDesiredState,
ProjectMetadata,
ProjectsOutput,
ProjectSpec,
Expand Down
36 changes: 33 additions & 3 deletions mlrun/api/schemas/constants.py
Expand Up @@ -12,6 +12,13 @@ class Format(str, Enum):
summary = "summary"


class ProjectsRole(str, Enum):
iguazio = "iguazio"
mlrun = "mlrun"
nuclio = "nuclio"
nop = "nop"


class PatchMode(str, Enum):
replace = "replace"
additive = "additive"
Expand All @@ -29,16 +36,38 @@ def to_mergedeep_strategy(self) -> mergedeep.Strategy:

class DeletionStrategy(str, Enum):
restrict = "restrict"
restricted = "restricted"
cascade = "cascade"
cascading = "cascading"

@staticmethod
def default():
return DeletionStrategy.restrict
return DeletionStrategy.restricted

def is_restricted(self):
if self.value in [DeletionStrategy.restrict, DeletionStrategy.restricted]:
return True
return False

def is_cascading(self):
if self.value in [DeletionStrategy.cascade, DeletionStrategy.cascading]:
return True
return False

def to_nuclio_deletion_strategy(self) -> str:
if self.value == DeletionStrategy.restrict:
if self.is_restricted():
return "restricted"
elif self.is_cascading():
return "cascading"
else:
raise mlrun.errors.MLRunInvalidArgumentError(
f"Unknown deletion strategy: {self.value}"
)

def to_iguazio_deletion_strategy(self) -> str:
if self.is_restricted():
return "restricted"
elif self.value == DeletionStrategy.cascade:
elif self.is_cascading():
return "cascading"
else:
raise mlrun.errors.MLRunInvalidArgumentError(
Expand All @@ -50,6 +79,7 @@ def to_nuclio_deletion_strategy(self) -> str:


class HeaderNames:
projects_role = "x-projects-role"
patch_mode = f"{headers_prefix}patch-mode"
deletion_strategy = f"{headers_prefix}deletion-strategy"
secret_store_token = f"{headers_prefix}secret-store-token"
Expand Down
20 changes: 19 additions & 1 deletion mlrun/api/schemas/project.py
Expand Up @@ -17,10 +17,28 @@ class Config:
extra = pydantic.Extra.allow


class ProjectDesiredState(str, enum.Enum):
online = "online"
offline = "offline"
archived = "archived"


class ProjectState(str, enum.Enum):
unknown = "unknown"
creating = "creating"
deleting = "deleting"
online = "online"
offline = "offline"
archived = "archived"

@staticmethod
def terminal_states():
return [
ProjectState.online,
ProjectState.offline,
ProjectState.archived,
]


class ProjectStatus(ObjectStatus):
state: typing.Optional[ProjectState]
Expand All @@ -38,7 +56,7 @@ class ProjectSpec(pydantic.BaseModel):
source: typing.Optional[str] = None
subpath: typing.Optional[str] = None
origin_url: typing.Optional[str] = None
desired_state: typing.Optional[ProjectState] = ProjectState.online
desired_state: typing.Optional[ProjectDesiredState] = ProjectDesiredState.online

class Config:
extra = pydantic.Extra.allow
Expand Down

0 comments on commit 1558004

Please sign in to comment.