Skip to content

Commit

Permalink
rest: get rid of workflow hard deletion
Browse files Browse the repository at this point in the history
  • Loading branch information
mvidalgarcia committed Oct 27, 2020
1 parent 78bc116 commit b55785e
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 93 deletions.
5 changes: 1 addition & 4 deletions docs/openapi.json
Expand Up @@ -934,7 +934,7 @@
"type": "string"
},
{
"description": "Optional. Additional input parameters and operational options for workflow execution. Possible parameters are `CACHE=on/off`, passed to disable caching of results in serial workflows, `all_runs=True/False` deletes all runs of a given workflow if status is set to deleted, `workspace=True/False` which deletes the workspace of a workflow and finally `hard_delete=True` which removes completely the workflow data from the database and the workspace from the shared filesystem.",
"description": "Optional. Additional input parameters and operational options for workflow execution. Possible parameters are `CACHE=on/off`, passed to disable caching of results in serial workflows, `all_runs=True/False` deletes all runs of a given workflow if status is set to deleted and `workspace=True/False` which deletes the workspace of a workflow.",
"in": "body",
"name": "parameters",
"required": false,
Expand All @@ -946,9 +946,6 @@
"all_runs": {
"type": "boolean"
},
"hard_delete": {
"type": "boolean"
},
"workspace": {
"type": "boolean"
}
Expand Down
16 changes: 3 additions & 13 deletions reana_workflow_controller/rest/utils.py
Expand Up @@ -184,7 +184,7 @@ def remove_workflow_jobs_from_cache(workflow):
Session.commit()


def delete_workflow(workflow, all_runs=False, hard_delete=False, workspace=False):
def delete_workflow(workflow, all_runs=False, workspace=False):
"""Delete workflow."""
if workflow.status in [
RunStatus.created,
Expand All @@ -206,13 +206,9 @@ def delete_workflow(workflow, all_runs=False, hard_delete=False, workspace=False
.all()
)
for workflow in to_be_deleted:
if hard_delete:
if workspace:
remove_workflow_workspace(workflow.workspace_path)
_delete_workflow_row_from_db(workflow)
else:
if workspace:
remove_workflow_workspace(workflow.workspace_path)
_mark_workflow_as_deleted_in_db(workflow)
_mark_workflow_as_deleted_in_db(workflow)
remove_workflow_jobs_from_cache(workflow)

return (
Expand All @@ -237,12 +233,6 @@ def delete_workflow(workflow, all_runs=False, hard_delete=False, workspace=False
)


def _delete_workflow_row_from_db(workflow):
"""Remove workflow row from database."""
Session.query(Workflow).filter_by(id_=workflow.id_).delete()
Session.commit()


def _mark_workflow_as_deleted_in_db(workflow):
"""Mark workflow as deleted."""
workflow.status = RunStatus.deleted
Expand Down
13 changes: 4 additions & 9 deletions reana_workflow_controller/rest/workflows_status.py
Expand Up @@ -373,10 +373,8 @@ def set_workflow_status(workflow_id_or_name): # noqa
workflow execution. Possible parameters are `CACHE=on/off`, passed
to disable caching of results in serial workflows,
`all_runs=True/False` deletes all runs of a given workflow
if status is set to deleted, `workspace=True/False` which deletes
the workspace of a workflow and finally `hard_delete=True` which
removes completely the workflow data from the database and the
workspace from the shared filesystem.
if status is set to deleted and `workspace=True/False` which deletes
the workspace of a workflow.
required: false
schema:
type: object
Expand All @@ -387,8 +385,6 @@ def set_workflow_status(workflow_id_or_name): # noqa
type: boolean
workspace:
type: boolean
hard_delete:
type: boolean
responses:
200:
description: >-
Expand Down Expand Up @@ -526,9 +522,8 @@ def set_workflow_status(workflow_id_or_name): # noqa
)
elif status == DELETED:
all_runs = True if request.json.get("all_runs") else False
hard_delete = True if request.json.get("hard_delete") else False
workspace = True if hard_delete or request.json.get("workspace") else False
return delete_workflow(workflow, all_runs, hard_delete, workspace)
workspace = True if request.json.get("workspace") else False
return delete_workflow(workflow, all_runs, workspace)
if status == STOP:
stop_workflow(workflow)
return (
Expand Down
56 changes: 17 additions & 39 deletions tests/test_utils.py
Expand Up @@ -33,30 +33,20 @@
pytest.param(RunStatus.running, marks=pytest.mark.xfail),
],
)
@pytest.mark.parametrize("hard_delete", [True, False])
def test_delete_workflow(
app, session, default_user, sample_yadage_workflow_in_db, status, hard_delete
app, session, default_user, sample_yadage_workflow_in_db, status
):
"""Test deletion of a workflow in all possible statuses."""
sample_yadage_workflow_in_db.status = status
session.add(sample_yadage_workflow_in_db)
session.commit()

delete_workflow(sample_yadage_workflow_in_db, hard_delete=hard_delete)
if not hard_delete:
assert sample_yadage_workflow_in_db.status == RunStatus.deleted
else:
assert (
session.query(Workflow)
.filter_by(id_=sample_yadage_workflow_in_db.id_)
.all()
== []
)
delete_workflow(sample_yadage_workflow_in_db)
assert sample_yadage_workflow_in_db.status == RunStatus.deleted


@pytest.mark.parametrize("hard_delete", [True, False])
def test_delete_all_workflow_runs(
app, session, default_user, yadage_workflow_with_name, hard_delete
app, session, default_user, yadage_workflow_with_name
):
"""Test deletion of all runs of a given workflow."""
# add 5 workflows in the database with the same name
Expand All @@ -81,23 +71,14 @@ def test_delete_all_workflow_runs(
.filter_by(name=yadage_workflow_with_name["name"])
.first()
)
delete_workflow(first_workflow, all_runs=True, hard_delete=hard_delete)
if not hard_delete:
for workflow in (
session.query(Workflow).filter_by(name=first_workflow.name).all()
):
if not_deleted_one == workflow.id_:
assert workflow.status == RunStatus.running
else:
assert workflow.status == RunStatus.deleted
else:
# the one running should not be deleted
assert (
len(session.query(Workflow).filter_by(name=first_workflow.name).all()) == 1
)
delete_workflow(first_workflow, all_runs=True)
for workflow in session.query(Workflow).filter_by(name=first_workflow.name).all():
if not_deleted_one == workflow.id_:
assert workflow.status == RunStatus.running
else:
assert workflow.status == RunStatus.deleted


@pytest.mark.parametrize("hard_delete", [True, False])
@pytest.mark.parametrize("workspace", [True, False])
def test_workspace_deletion(
app,
Expand All @@ -106,7 +87,6 @@ def test_workspace_deletion(
sample_yadage_workflow_in_db,
tmp_shared_volume_path,
workspace,
hard_delete,
):
"""Test workspace deletion."""
workflow = sample_yadage_workflow_in_db
Expand Down Expand Up @@ -134,8 +114,8 @@ def test_workspace_deletion(
# check that the workflow workspace exists
assert os.path.exists(absolute_workflow_workspace)
assert os.path.exists(cache_dir_path)
delete_workflow(workflow, hard_delete=hard_delete, workspace=workspace)
if hard_delete or workspace:
delete_workflow(workflow, workspace=workspace)
if workspace:
assert not os.path.exists(absolute_workflow_workspace)

# check that all cache entries for jobs
Expand All @@ -156,14 +136,12 @@ def test_deletion_of_workspace_of_an_already_deleted_workflow(

# check that the workflow workspace exists
assert os.path.exists(absolute_workflow_workspace)
delete_workflow(sample_yadage_workflow_in_db, hard_delete=False, workspace=False)
delete_workflow(sample_yadage_workflow_in_db, workspace=False)
assert os.path.exists(absolute_workflow_workspace)

delete_workflow(sample_yadage_workflow_in_db, hard_delete=False, workspace=True)
delete_workflow(sample_yadage_workflow_in_db, workspace=True)
assert not os.path.exists(absolute_workflow_workspace)

delete_workflow(sample_yadage_workflow_in_db, hard_delete=True, workspace=True)


def test_delete_recursive_wildcard(tmp_shared_volume_path):
"""Test recursive wildcard deletion of files."""
Expand Down Expand Up @@ -195,11 +173,11 @@ def test_workspace_permissions(
):
"""Test workspace dir permissions."""
create_workflow_workspace(sample_yadage_workflow_in_db.workspace_path)
expeted_worspace_permissions = "drwxrwxr-x"
expected_worspace_permissions = "drwxrwxr-x"
absolute_workflow_workspace = os.path.join(
tmp_shared_volume_path, sample_yadage_workflow_in_db.workspace_path
)
workspace_permissions = stat.filemode(os.stat(absolute_workflow_workspace).st_mode)
assert os.path.exists(absolute_workflow_workspace)
assert workspace_permissions == expeted_worspace_permissions
delete_workflow(sample_yadage_workflow_in_db, hard_delete=True, workspace=True)
assert workspace_permissions == expected_worspace_permissions
delete_workflow(sample_yadage_workflow_in_db, workspace=True)
39 changes: 11 additions & 28 deletions tests/test_views.py
Expand Up @@ -997,9 +997,8 @@ def test_start_workflow_kubernetes_failure(
pytest.param(RunStatus.running, marks=pytest.mark.xfail),
],
)
@pytest.mark.parametrize("hard_delete", [True, False])
def test_delete_workflow(
app, session, default_user, sample_yadage_workflow_in_db, status, hard_delete
app, session, default_user, sample_yadage_workflow_in_db, status
):
"""Test deletion of a workflow in all possible statuses."""
sample_yadage_workflow_in_db.status = status
Expand All @@ -1013,22 +1012,13 @@ def test_delete_workflow(
),
query_string={"user": default_user.id_, "status": "deleted"},
content_type="application/json",
data=json.dumps({"hard_delete": hard_delete}),
data=json.dumps({}),
)
if not hard_delete:
assert sample_yadage_workflow_in_db.status == RunStatus.deleted
else:
assert (
session.query(Workflow)
.filter_by(id_=sample_yadage_workflow_in_db.id_)
.all()
== []
)
assert sample_yadage_workflow_in_db.status == RunStatus.deleted


@pytest.mark.parametrize("hard_delete", [True, False])
def test_delete_all_workflow_runs(
app, session, default_user, yadage_workflow_with_name, hard_delete
app, session, default_user, yadage_workflow_with_name
):
"""Test deletion of all runs of a given workflow."""
# add 5 workflows in the database with the same name
Expand Down Expand Up @@ -1057,18 +1047,12 @@ def test_delete_all_workflow_runs(
),
query_string={"user": default_user.id_, "status": "deleted"},
content_type="application/json",
data=json.dumps({"hard_delete": hard_delete, "all_runs": True}),
data=json.dumps({"all_runs": True}),
)
if not hard_delete:
for workflow in (
session.query(Workflow).filter_by(name=first_workflow.name).all()
):
assert workflow.status == RunStatus.deleted
else:
assert session.query(Workflow).filter_by(name=first_workflow.name).all() == []
for workflow in session.query(Workflow).filter_by(name=first_workflow.name).all():
assert workflow.status == RunStatus.deleted


@pytest.mark.parametrize("hard_delete", [True, False])
@pytest.mark.parametrize("workspace", [True, False])
def test_workspace_deletion(
app,
Expand All @@ -1077,7 +1061,6 @@ def test_workspace_deletion(
yadage_workflow_with_name,
tmp_shared_volume_path,
workspace,
hard_delete,
):
"""Test workspace deletion."""
with app.test_client() as client:
Expand Down Expand Up @@ -1116,9 +1099,9 @@ def test_workspace_deletion(
),
query_string={"user": default_user.id_, "status": "deleted"},
content_type="application/json",
data=json.dumps({"hard_delete": hard_delete, "workspace": workspace}),
data=json.dumps({"workspace": workspace}),
)
if hard_delete or workspace:
if workspace:
assert not os.path.exists(absolute_workflow_workspace)

# check that all cache entries for jobs
Expand Down Expand Up @@ -1161,11 +1144,11 @@ def test_deletion_of_workspace_of_an_already_deleted_workflow(
),
query_string={"user": default_user.id_, "status": "deleted"},
content_type="application/json",
data=json.dumps({"hard_delete": False, "workspace": False}),
data=json.dumps({"workspace": False}),
)
assert os.path.exists(absolute_workflow_workspace)

delete_workflow(workflow, hard_delete=False, workspace=True)
delete_workflow(workflow, workspace=True)
assert not os.path.exists(absolute_workflow_workspace)


Expand Down

0 comments on commit b55785e

Please sign in to comment.