Skip to content

Commit

Permalink
[Projects] Support different deletion strategies (#618)
Browse files Browse the repository at this point in the history
  • Loading branch information
Hedingber committed Dec 27, 2020
1 parent a4d25e2 commit 829e3d4
Show file tree
Hide file tree
Showing 19 changed files with 258 additions and 38 deletions.
8 changes: 6 additions & 2 deletions mlrun/api/api/endpoints/projects.py
Expand Up @@ -49,9 +49,13 @@ def get_project(name: str, db_session: Session = Depends(deps.get_db_session)):

@router.delete("/projects/{name}", status_code=HTTPStatus.NO_CONTENT.value)
def delete_project(
name: str, db_session: Session = Depends(deps.get_db_session),
name: str,
deletion_strategy: schemas.DeletionStrategy = Header(
schemas.DeletionStrategy.default(), alias=schemas.HeaderNames.deletion_strategy
),
db_session: Session = Depends(deps.get_db_session),
):
get_project_member().delete_project(db_session, name)
get_project_member().delete_project(db_session, name, deletion_strategy)
return Response(status_code=HTTPStatus.NO_CONTENT.value)


Expand Down
7 changes: 6 additions & 1 deletion mlrun/api/db/base.py
Expand Up @@ -216,7 +216,12 @@ def patch_project(
pass

@abstractmethod
def delete_project(self, session, name: str):
def delete_project(
self,
session,
name: str,
deletion_strategy: schemas.DeletionStrategy = schemas.DeletionStrategy.default(),
):
pass

@abstractmethod
Expand Down
7 changes: 6 additions & 1 deletion mlrun/api/db/filedb/db.py
Expand Up @@ -166,7 +166,12 @@ def get_project(
) -> schemas.Project:
raise NotImplementedError()

def delete_project(self, session, name: str):
def delete_project(
self,
session,
name: str,
deletion_strategy: schemas.DeletionStrategy = schemas.DeletionStrategy.default(),
):
raise NotImplementedError()

def create_feature_set(
Expand Down
95 changes: 79 additions & 16 deletions mlrun/api/db/sqldb/db.py
Expand Up @@ -86,9 +86,12 @@ def delete_log(self, session: Session, project: str, uid: str):

def _delete_logs(self, session: Session, project: str):
logger.debug("Removing logs from db", project=project)
for log in self._query(session, Log, project=project):
for log in self._list_logs(session, project):
self.delete_log(session, project, log.uid)

def _list_logs(self, session: Session, project: str):
return self._query(session, Log, project=project).all()

def store_run(self, session, run_data, uid, project="", iter=0):
project = project or config.default_project
logger.debug(
Expand Down Expand Up @@ -393,9 +396,12 @@ def delete_function(self, session: Session, project: str, name: str):
self._delete(session, Function, project=project, name=name)

def _delete_functions(self, session: Session, project: str):
for function in self._query(session, Function, project=project):
for function in self._list_project_functions(session, project):
self.delete_function(session, project, function.name)

def _list_project_functions(self, session: Session, project: str):
return self._query(session, Function, project=project).all()

def _delete_resources_tags(self, session: Session, project: str):
for tagged_class in _tagged:
self._delete(session, tagged_class, project=project)
Expand Down Expand Up @@ -696,20 +702,28 @@ def get_project(

return self._transform_project_record_to_schema(session, project_record)

def delete_project(self, session: Session, name: str):
logger.debug("Deleting project from DB", name=name)
self.del_artifacts(session, project=name)
self._delete_logs(session, name)
self.del_runs(session, project=name)
self._delete_schedules(session, name)
self._delete_functions(session, name)
self._delete_feature_sets(session, name)
self._delete_feature_vectors(session, name)

# resources deletion should remove their tags and labels as well, but doing another try in case there are
# orphan resources
self._delete_resources_tags(session, name)
self._delete_resources_labels(session, name)
def delete_project(
self,
session: Session,
name: str,
deletion_strategy: schemas.DeletionStrategy = schemas.DeletionStrategy.default(),
):
logger.debug(
"Deleting project from DB", name=name, deletion_strategy=deletion_strategy
)
if deletion_strategy == schemas.DeletionStrategy.restrict:
project_record = self._get_project_record(
session, name, raise_on_not_found=False
)
if not project_record:
return
self._verify_project_has_no_related_resources(session, name)
elif deletion_strategy == schemas.DeletionStrategy.cascade:
self._delete_project_related_resources(session, name)
else:
raise mlrun.errors.MLRunInvalidArgumentError(
f"Unknown deletion strategy: {deletion_strategy}"
)
self._delete(session, Project, name=name)

def list_projects(
Expand Down Expand Up @@ -796,6 +810,55 @@ def _get_project_record(

return project_record

def _verify_project_has_no_related_resources(self, session: Session, project: str):
artifacts = self._find_artifacts(session, project, "*")
self._verify_empty_list_of_project_related_resources(
project, artifacts, "artifacts"
)
logs = self._list_logs(session, project)
self._verify_empty_list_of_project_related_resources(project, logs, "logs")
runs = self._find_runs(session, None, project, []).all()
self._verify_empty_list_of_project_related_resources(project, runs, "runs")
schedules = self.list_schedules(session, project=project)
self._verify_empty_list_of_project_related_resources(
project, schedules, "schedules"
)
functions = self._list_project_functions(session, project)
self._verify_empty_list_of_project_related_resources(
project, functions, "functions"
)
feature_sets = self.list_feature_sets(session, project).feature_sets
self._verify_empty_list_of_project_related_resources(
project, feature_sets, "feature_sets"
)
feature_vectors = self.list_feature_vectors(session, project).feature_vectors
self._verify_empty_list_of_project_related_resources(
project, feature_vectors, "feature_vectors"
)

def _delete_project_related_resources(self, session: Session, name: str):
self.del_artifacts(session, project=name)
self._delete_logs(session, name)
self.del_runs(session, project=name)
self._delete_schedules(session, name)
self._delete_functions(session, name)
self._delete_feature_sets(session, name)
self._delete_feature_vectors(session, name)

# resources deletion should remove their tags and labels as well, but doing another try in case there are
# orphan resources
self._delete_resources_tags(session, name)
self._delete_resources_labels(session, name)

@staticmethod
def _verify_empty_list_of_project_related_resources(
project: str, resources: List, resource_name: str
):
if resources:
raise mlrun.errors.MLRunPreconditionFailedError(
f"Project {project} can not be deleted since related resources found: {resource_name}"
)

def _get_record_by_name_tag_and_uid(
self, session, cls, project: str, name: str, tag: str = None, uid: str = None,
):
Expand Down
2 changes: 1 addition & 1 deletion mlrun/api/schemas/__init__.py
Expand Up @@ -8,7 +8,7 @@
BackgroundTaskSpec,
BackgroundTaskStatus,
)
from .constants import Format, PatchMode, HeaderNames
from .constants import Format, PatchMode, HeaderNames, DeletionStrategy
from .feature_store import (
Feature,
FeatureRecord,
Expand Down
20 changes: 20 additions & 0 deletions mlrun/api/schemas/constants.py
Expand Up @@ -25,8 +25,28 @@ def to_mergedeep_strategy(self) -> mergedeep.Strategy:
)


class DeletionStrategy(str, Enum):
restrict = "restrict"
cascade = "cascade"

@staticmethod
def default():
return DeletionStrategy.restrict

def to_nuclio_deletion_strategy(self) -> str:
if self.value == DeletionStrategy.restrict:
return "restricted"
elif self.value == DeletionStrategy.cascade:
return "cascading"
else:
raise mlrun.errors.MLRunInvalidArgumentError(
f"Unknown deletion strategy: {self.value}"
)


headers_prefix = "x-mlrun-"


class HeaderNames:
patch_mode = f"{headers_prefix}patch-mode"
deletion_strategy = f"{headers_prefix}deletion-strategy"
28 changes: 24 additions & 4 deletions mlrun/api/utils/clients/nuclio.py
Expand Up @@ -8,6 +8,7 @@

import mlrun.api.schemas
import mlrun.api.utils.projects.remotes.member
import mlrun.errors
import mlrun.utils.singleton
from mlrun.utils import logger

Expand Down Expand Up @@ -72,14 +73,33 @@ def patch_project(
]
self._put_project_to_nuclio(response_body)

def delete_project(self, session: sqlalchemy.orm.Session, name: str):
logger.debug("Deleting project in Nuclio", name=name)
def delete_project(
self,
session: sqlalchemy.orm.Session,
name: str,
deletion_strategy: mlrun.api.schemas.DeletionStrategy = mlrun.api.schemas.DeletionStrategy.default(),
):
logger.debug(
"Deleting project in Nuclio", name=name, deletion_strategy=deletion_strategy
)
body = self._generate_request_body(
mlrun.api.schemas.Project(
metadata=mlrun.api.schemas.ProjectMetadata(name=name)
)
)
self._send_request_to_api("DELETE", "projects", json=body)
headers = {
"x-nuclio-delete-project-strategy": deletion_strategy.to_nuclio_deletion_strategy(),
}
try:
self._send_request_to_api("DELETE", "projects", json=body, headers=headers)
except requests.HTTPError as exc:
if exc.response.status_code != http.HTTPStatus.NOT_FOUND.value:
raise
logger.debug(
"Project not found in Nuclio. Considering deletion as successful",
name=name,
deletion_strategy=deletion_strategy,
)

def get_project(
self, session: sqlalchemy.orm.Session, name: str
Expand Down Expand Up @@ -153,7 +173,7 @@ def _send_request_to_api(self, method, path, **kwargs):
{"error": error, "error_stack_trace": error_stack_trace}
)
logger.warning("Request to nuclio failed", **log_kwargs)
response.raise_for_status()
mlrun.errors.raise_for_status(response)
return response

@staticmethod
Expand Down
9 changes: 7 additions & 2 deletions mlrun/api/utils/projects/leader.py
Expand Up @@ -83,8 +83,13 @@ def patch_project(
self._run_on_all_followers("patch_project", session, name, project, patch_mode)
return self.get_project(session, name)

def delete_project(self, session: sqlalchemy.orm.Session, name: str):
self._run_on_all_followers("delete_project", session, name)
def delete_project(
self,
session: sqlalchemy.orm.Session,
name: str,
deletion_strategy: mlrun.api.schemas.DeletionStrategy = mlrun.api.schemas.DeletionStrategy.default(),
):
self._run_on_all_followers("delete_project", session, name, deletion_strategy)

def get_project(
self, session: sqlalchemy.orm.Session, name: str
Expand Down
7 changes: 6 additions & 1 deletion mlrun/api/utils/projects/member.py
Expand Up @@ -47,7 +47,12 @@ def patch_project(
pass

@abc.abstractmethod
def delete_project(self, session: sqlalchemy.orm.Session, name: str):
def delete_project(
self,
session: sqlalchemy.orm.Session,
name: str,
deletion_strategy: mlrun.api.schemas.DeletionStrategy.default(),
):
pass

@abc.abstractmethod
Expand Down
7 changes: 6 additions & 1 deletion mlrun/api/utils/projects/remotes/member.py
Expand Up @@ -33,7 +33,12 @@ def patch_project(
pass

@abc.abstractmethod
def delete_project(self, session: sqlalchemy.orm.Session, name: str):
def delete_project(
self,
session: sqlalchemy.orm.Session,
name: str,
deletion_strategy: mlrun.api.schemas.DeletionStrategy = mlrun.api.schemas.DeletionStrategy.default(),
):
pass

@abc.abstractmethod
Expand Down
7 changes: 6 additions & 1 deletion mlrun/api/utils/projects/remotes/nop.py
Expand Up @@ -40,7 +40,12 @@ def patch_project(
mergedeep.merge(existing_project_dict, project, strategy=strategy)
self._projects[name] = mlrun.api.schemas.Project(**existing_project_dict)

def delete_project(self, session: sqlalchemy.orm.Session, name: str):
def delete_project(
self,
session: sqlalchemy.orm.Session,
name: str,
deletion_strategy: mlrun.api.schemas.DeletionStrategy = mlrun.api.schemas.DeletionStrategy.default(),
):
if name in self._projects:
del self._projects[name]

Expand Down
6 changes: 5 additions & 1 deletion mlrun/db/base.py
Expand Up @@ -118,7 +118,11 @@ def list_functions(self, name=None, project="", tag="", labels=None):
pass

@abstractmethod
def delete_project(self, name: str):
def delete_project(
self,
name: str,
deletion_strategy: schemas.DeletionStrategy = schemas.DeletionStrategy.default(),
):
pass

@abstractmethod
Expand Down
6 changes: 5 additions & 1 deletion mlrun/db/filedb.py
Expand Up @@ -432,7 +432,11 @@ def list_projects(
def get_project(self, name: str) -> mlrun.api.schemas.Project:
raise NotImplementedError()

def delete_project(self, name: str):
def delete_project(
self,
name: str,
deletion_strategy: mlrun.api.schemas.DeletionStrategy = mlrun.api.schemas.DeletionStrategy.default(),
):
raise NotImplementedError()

def store_project(
Expand Down
13 changes: 11 additions & 2 deletions mlrun/db/httpdb.py
Expand Up @@ -1005,10 +1005,19 @@ def get_project(self, name: str) -> mlrun.projects.MlrunProject:
response = self.api_call("GET", path, error_message)
return mlrun.projects.MlrunProject.from_dict(response.json())

def delete_project(self, name: str):
def delete_project(
self,
name: str,
deletion_strategy: Union[
str, mlrun.api.schemas.DeletionStrategy
] = mlrun.api.schemas.DeletionStrategy.default(),
):
path = f"projects/{name}"
if isinstance(deletion_strategy, schemas.DeletionStrategy):
deletion_strategy = deletion_strategy.value
headers = {schemas.HeaderNames.deletion_strategy: deletion_strategy}
error_message = f"Failed deleting project {name}"
self.api_call("DELETE", path, error_message)
self.api_call("DELETE", path, error_message, headers=headers)

def store_project(
self,
Expand Down
6 changes: 5 additions & 1 deletion mlrun/db/sqldb.py
Expand Up @@ -205,7 +205,11 @@ def create_project(
) -> mlrun.api.schemas.Project:
raise NotImplementedError()

def delete_project(self, name: str):
def delete_project(
self,
name: str,
deletion_strategy: mlrun.api.schemas.DeletionStrategy = mlrun.api.schemas.DeletionStrategy.default(),
):
raise NotImplementedError()

def get_project(
Expand Down
5 changes: 5 additions & 0 deletions mlrun/errors.py
Expand Up @@ -84,6 +84,10 @@ class MLRunConflictError(MLRunHTTPStatusError):
error_status_code = HTTPStatus.CONFLICT.value


class MLRunPreconditionFailedError(MLRunHTTPStatusError):
error_status_code = HTTPStatus.PRECONDITION_FAILED.value


class MLRunIncompatibleVersionError(MLRunHTTPStatusError):
error_status_code = HTTPStatus.BAD_REQUEST.value

Expand All @@ -94,4 +98,5 @@ class MLRunIncompatibleVersionError(MLRunHTTPStatusError):
HTTPStatus.FORBIDDEN.value: MLRunAccessDeniedError,
HTTPStatus.NOT_FOUND.value: MLRunNotFoundError,
HTTPStatus.CONFLICT.value: MLRunConflictError,
HTTPStatus.PRECONDITION_FAILED.value: MLRunPreconditionFailedError,
}

0 comments on commit 829e3d4

Please sign in to comment.