Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Projects] Support different deletion strategies #618

Merged
merged 7 commits into from
Dec 27, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 6 additions & 2 deletions mlrun/api/api/endpoints/projects.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,13 @@ def get_project(name: str, db_session: Session = Depends(deps.get_db_session)):

@router.delete("/projects/{name}", status_code=HTTPStatus.NO_CONTENT.value)
def delete_project(
name: str, db_session: Session = Depends(deps.get_db_session),
name: str,
deletion_strategy: schemas.DeletionStrategy = Header(
schemas.DeletionStrategy.default(), alias=schemas.HeaderNames.deletion_strategy
),
db_session: Session = Depends(deps.get_db_session),
):
get_project_member().delete_project(db_session, name)
get_project_member().delete_project(db_session, name, deletion_strategy)
return Response(status_code=HTTPStatus.NO_CONTENT.value)


Expand Down
7 changes: 6 additions & 1 deletion mlrun/api/db/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,12 @@ def patch_project(
pass

@abstractmethod
def delete_project(self, session, name: str):
def delete_project(
self,
session,
name: str,
deletion_strategy: schemas.DeletionStrategy = schemas.DeletionStrategy.default(),
):
pass

@abstractmethod
Expand Down
7 changes: 6 additions & 1 deletion mlrun/api/db/filedb/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,12 @@ def get_project(
) -> schemas.Project:
raise NotImplementedError()

def delete_project(self, session, name: str):
def delete_project(
self,
session,
name: str,
deletion_strategy: schemas.DeletionStrategy = schemas.DeletionStrategy.default(),
):
raise NotImplementedError()

def create_feature_set(
Expand Down
95 changes: 79 additions & 16 deletions mlrun/api/db/sqldb/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,12 @@ def delete_log(self, session: Session, project: str, uid: str):

def _delete_logs(self, session: Session, project: str):
logger.debug("Removing logs from db", project=project)
for log in self._query(session, Log, project=project):
for log in self._list_logs(session, project):
self.delete_log(session, project, log.uid)

def _list_logs(self, session: Session, project: str):
return self._query(session, Log, project=project).all()

def store_run(self, session, run_data, uid, project="", iter=0):
project = project or config.default_project
logger.debug(
Expand Down Expand Up @@ -393,9 +396,12 @@ def delete_function(self, session: Session, project: str, name: str):
self._delete(session, Function, project=project, name=name)

def _delete_functions(self, session: Session, project: str):
for function in self._query(session, Function, project=project):
for function in self._list_project_functions(session, project):
self.delete_function(session, project, function.name)

def _list_project_functions(self, session: Session, project: str):
return self._query(session, Function, project=project).all()

def _delete_resources_tags(self, session: Session, project: str):
for tagged_class in _tagged:
self._delete(session, tagged_class, project=project)
Expand Down Expand Up @@ -696,20 +702,28 @@ def get_project(

return self._transform_project_record_to_schema(session, project_record)

def delete_project(self, session: Session, name: str):
logger.debug("Deleting project from DB", name=name)
self.del_artifacts(session, project=name)
self._delete_logs(session, name)
self.del_runs(session, project=name)
self._delete_schedules(session, name)
self._delete_functions(session, name)
self._delete_feature_sets(session, name)
self._delete_feature_vectors(session, name)

# resources deletion should remove their tags and labels as well, but doing another try in case there are
# orphan resources
self._delete_resources_tags(session, name)
self._delete_resources_labels(session, name)
def delete_project(
self,
session: Session,
name: str,
deletion_strategy: schemas.DeletionStrategy = schemas.DeletionStrategy.default(),
):
logger.debug(
"Deleting project from DB", name=name, deletion_strategy=deletion_strategy
)
if deletion_strategy == schemas.DeletionStrategy.restrict:
project_record = self._get_project_record(
session, name, raise_on_not_found=False
)
if not project_record:
return
self._verify_project_has_no_related_resources(session, name)
elif deletion_strategy == schemas.DeletionStrategy.cascade:
self._delete_project_related_resources(session, name)
else:
raise mlrun.errors.MLRunInvalidArgumentError(
f"Unknown deletion strategy: {deletion_strategy}"
)
self._delete(session, Project, name=name)

def list_projects(
Expand Down Expand Up @@ -796,6 +810,55 @@ def _get_project_record(

return project_record

def _verify_project_has_no_related_resources(self, session: Session, project: str):
artifacts = self._find_artifacts(session, project, "*")
self._verify_empty_list_of_project_related_resources(
project, artifacts, "artifacts"
)
logs = self._list_logs(session, project)
self._verify_empty_list_of_project_related_resources(project, logs, "logs")
runs = self._find_runs(session, None, project, []).all()
self._verify_empty_list_of_project_related_resources(project, runs, "runs")
schedules = self.list_schedules(session, project=project)
self._verify_empty_list_of_project_related_resources(
project, schedules, "schedules"
)
functions = self._list_project_functions(session, project)
self._verify_empty_list_of_project_related_resources(
project, functions, "functions"
)
feature_sets = self.list_feature_sets(session, project).feature_sets
self._verify_empty_list_of_project_related_resources(
project, feature_sets, "feature_sets"
)
feature_vectors = self.list_feature_vectors(session, project).feature_vectors
self._verify_empty_list_of_project_related_resources(
project, feature_vectors, "feature_vectors"
)

def _delete_project_related_resources(self, session: Session, name: str):
self.del_artifacts(session, project=name)
self._delete_logs(session, name)
self.del_runs(session, project=name)
self._delete_schedules(session, name)
self._delete_functions(session, name)
self._delete_feature_sets(session, name)
self._delete_feature_vectors(session, name)

# resources deletion should remove their tags and labels as well, but doing another try in case there are
# orphan resources
self._delete_resources_tags(session, name)
self._delete_resources_labels(session, name)

@staticmethod
def _verify_empty_list_of_project_related_resources(
project: str, resources: List, resource_name: str
):
if resources:
raise mlrun.errors.MLRunPreconditionFailedError(
f"Project {project} can not be deleted since related resources found: {resource_name}"
)

def _get_record_by_name_tag_and_uid(
self, session, cls, project: str, name: str, tag: str = None, uid: str = None,
):
Expand Down
2 changes: 1 addition & 1 deletion mlrun/api/schemas/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
BackgroundTaskSpec,
BackgroundTaskStatus,
)
from .constants import Format, PatchMode, HeaderNames
from .constants import Format, PatchMode, HeaderNames, DeletionStrategy
from .feature_store import (
Feature,
FeatureRecord,
Expand Down
20 changes: 20 additions & 0 deletions mlrun/api/schemas/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,28 @@ def to_mergedeep_strategy(self) -> mergedeep.Strategy:
)


class DeletionStrategy(str, Enum):
restrict = "restrict"
cascade = "cascade"

@staticmethod
def default():
return DeletionStrategy.restrict

def to_nuclio_deletion_strategy(self) -> str:
if self.value == DeletionStrategy.restrict:
return "restricted"
elif self.value == DeletionStrategy.cascade:
return "cascading"
else:
raise mlrun.errors.MLRunInvalidArgumentError(
f"Unknown deletion strategy: {self.value}"
)


headers_prefix = "x-mlrun-"


class HeaderNames:
patch_mode = f"{headers_prefix}patch-mode"
deletion_strategy = f"{headers_prefix}deletion-strategy"
28 changes: 24 additions & 4 deletions mlrun/api/utils/clients/nuclio.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import mlrun.api.schemas
import mlrun.api.utils.projects.remotes.member
import mlrun.errors
import mlrun.utils.singleton
from mlrun.utils import logger

Expand Down Expand Up @@ -72,14 +73,33 @@ def patch_project(
]
self._put_project_to_nuclio(response_body)

def delete_project(self, session: sqlalchemy.orm.Session, name: str):
logger.debug("Deleting project in Nuclio", name=name)
def delete_project(
self,
session: sqlalchemy.orm.Session,
name: str,
deletion_strategy: mlrun.api.schemas.DeletionStrategy = mlrun.api.schemas.DeletionStrategy.default(),
):
logger.debug(
"Deleting project in Nuclio", name=name, deletion_strategy=deletion_strategy
)
body = self._generate_request_body(
mlrun.api.schemas.Project(
metadata=mlrun.api.schemas.ProjectMetadata(name=name)
)
)
self._send_request_to_api("DELETE", "projects", json=body)
headers = {
"x-nuclio-delete-project-strategy": deletion_strategy.to_nuclio_deletion_strategy(),
}
try:
self._send_request_to_api("DELETE", "projects", json=body, headers=headers)
except requests.HTTPError as exc:
if exc.response.status_code != http.HTTPStatus.NOT_FOUND.value:
raise
logger.debug(
"Project not found in Nuclio. Considering deletion as successful",
name=name,
deletion_strategy=deletion_strategy,
)

def get_project(
self, session: sqlalchemy.orm.Session, name: str
Expand Down Expand Up @@ -153,7 +173,7 @@ def _send_request_to_api(self, method, path, **kwargs):
{"error": error, "error_stack_trace": error_stack_trace}
)
logger.warning("Request to nuclio failed", **log_kwargs)
response.raise_for_status()
mlrun.errors.raise_for_status(response)
return response

@staticmethod
Expand Down
9 changes: 7 additions & 2 deletions mlrun/api/utils/projects/leader.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,13 @@ def patch_project(
self._run_on_all_followers("patch_project", session, name, project, patch_mode)
return self.get_project(session, name)

def delete_project(self, session: sqlalchemy.orm.Session, name: str):
self._run_on_all_followers("delete_project", session, name)
def delete_project(
self,
session: sqlalchemy.orm.Session,
name: str,
deletion_strategy: mlrun.api.schemas.DeletionStrategy = mlrun.api.schemas.DeletionStrategy.default(),
):
self._run_on_all_followers("delete_project", session, name, deletion_strategy)

def get_project(
self, session: sqlalchemy.orm.Session, name: str
Expand Down
7 changes: 6 additions & 1 deletion mlrun/api/utils/projects/member.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,12 @@ def patch_project(
pass

@abc.abstractmethod
def delete_project(self, session: sqlalchemy.orm.Session, name: str):
def delete_project(
self,
session: sqlalchemy.orm.Session,
name: str,
deletion_strategy: mlrun.api.schemas.DeletionStrategy.default(),
):
pass

@abc.abstractmethod
Expand Down
7 changes: 6 additions & 1 deletion mlrun/api/utils/projects/remotes/member.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,12 @@ def patch_project(
pass

@abc.abstractmethod
def delete_project(self, session: sqlalchemy.orm.Session, name: str):
def delete_project(
self,
session: sqlalchemy.orm.Session,
name: str,
deletion_strategy: mlrun.api.schemas.DeletionStrategy = mlrun.api.schemas.DeletionStrategy.default(),
):
pass

@abc.abstractmethod
Expand Down
7 changes: 6 additions & 1 deletion mlrun/api/utils/projects/remotes/nop.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,12 @@ def patch_project(
mergedeep.merge(existing_project_dict, project, strategy=strategy)
self._projects[name] = mlrun.api.schemas.Project(**existing_project_dict)

def delete_project(self, session: sqlalchemy.orm.Session, name: str):
def delete_project(
self,
session: sqlalchemy.orm.Session,
name: str,
deletion_strategy: mlrun.api.schemas.DeletionStrategy = mlrun.api.schemas.DeletionStrategy.default(),
):
if name in self._projects:
del self._projects[name]

Expand Down
6 changes: 5 additions & 1 deletion mlrun/db/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,11 @@ def list_functions(self, name=None, project="", tag="", labels=None):
pass

@abstractmethod
def delete_project(self, name: str):
def delete_project(
self,
name: str,
deletion_strategy: schemas.DeletionStrategy = schemas.DeletionStrategy.default(),
):
pass

@abstractmethod
Expand Down
6 changes: 5 additions & 1 deletion mlrun/db/filedb.py
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,11 @@ def list_projects(
def get_project(self, name: str) -> mlrun.api.schemas.Project:
raise NotImplementedError()

def delete_project(self, name: str):
def delete_project(
self,
name: str,
deletion_strategy: mlrun.api.schemas.DeletionStrategy = mlrun.api.schemas.DeletionStrategy.default(),
):
raise NotImplementedError()

def store_project(
Expand Down
13 changes: 11 additions & 2 deletions mlrun/db/httpdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -1005,10 +1005,19 @@ def get_project(self, name: str) -> mlrun.projects.MlrunProject:
response = self.api_call("GET", path, error_message)
return mlrun.projects.MlrunProject.from_dict(response.json())

def delete_project(self, name: str):
def delete_project(
self,
name: str,
deletion_strategy: Union[
str, mlrun.api.schemas.DeletionStrategy
] = mlrun.api.schemas.DeletionStrategy.default(),
):
path = f"projects/{name}"
if isinstance(deletion_strategy, schemas.DeletionStrategy):
deletion_strategy = deletion_strategy.value
headers = {schemas.HeaderNames.deletion_strategy: deletion_strategy}
error_message = f"Failed deleting project {name}"
self.api_call("DELETE", path, error_message)
self.api_call("DELETE", path, error_message, headers=headers)

def store_project(
self,
Expand Down
6 changes: 5 additions & 1 deletion mlrun/db/sqldb.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,11 @@ def create_project(
) -> mlrun.api.schemas.Project:
raise NotImplementedError()

def delete_project(self, name: str):
def delete_project(
self,
name: str,
deletion_strategy: mlrun.api.schemas.DeletionStrategy = mlrun.api.schemas.DeletionStrategy.default(),
):
raise NotImplementedError()

def get_project(
Expand Down
5 changes: 5 additions & 0 deletions mlrun/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,10 @@ class MLRunConflictError(MLRunHTTPStatusError):
error_status_code = HTTPStatus.CONFLICT.value


class MLRunPreconditionFailedError(MLRunHTTPStatusError):
error_status_code = HTTPStatus.PRECONDITION_FAILED.value


class MLRunIncompatibleVersionError(MLRunHTTPStatusError):
error_status_code = HTTPStatus.BAD_REQUEST.value

Expand All @@ -94,4 +98,5 @@ class MLRunIncompatibleVersionError(MLRunHTTPStatusError):
HTTPStatus.FORBIDDEN.value: MLRunAccessDeniedError,
HTTPStatus.NOT_FOUND.value: MLRunNotFoundError,
HTTPStatus.CONFLICT.value: MLRunConflictError,
HTTPStatus.PRECONDITION_FAILED.value: MLRunPreconditionFailedError,
}