Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rest: allows to delete queued workflows #296

Merged
merged 1 commit into from Feb 7, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 4 additions & 0 deletions reana_workflow_controller/errors.py
Expand Up @@ -31,3 +31,7 @@ class REANAInteractiveSessionError(Exception):

class REANAExternalCallError(Exception):
"""Error when connecting to an external service."""


class REANAWorkflowStatusError(Exception):
"""Error when trying to change workflow status."""
50 changes: 29 additions & 21 deletions reana_workflow_controller/rest/utils.py
Expand Up @@ -32,7 +32,8 @@
WORKFLOW_TIME_FORMAT)
from reana_workflow_controller.errors import (REANAExternalCallError,
REANAWorkflowControllerError,
REANAWorkflowDeletionError)
REANAWorkflowDeletionError,
REANAWorkflowStatusError)
from reana_workflow_controller.workflow_run_manager import \
KubernetesWorkflowRunManager

Expand Down Expand Up @@ -66,6 +67,8 @@ def _start_workflow_db(workflow, parameters):
raise REANAWorkflowControllerError(failure_message)
elif workflow.status not in \
[WorkflowStatus.created, WorkflowStatus.queued]:
if workflow.status == WorkflowStatus.deleted:
raise REANAWorkflowStatusError(failure_message)
raise REANAWorkflowControllerError(failure_message)

try:
Expand Down Expand Up @@ -180,27 +183,32 @@ def delete_workflow(workflow,
WorkflowStatus.finished,
WorkflowStatus.stopped,
WorkflowStatus.deleted,
WorkflowStatus.failed]:
to_be_deleted = [workflow]
if all_runs:
to_be_deleted += Session.query(Workflow).\
filter(Workflow.name == workflow.name,
Workflow.status != WorkflowStatus.running).all()
for workflow in to_be_deleted:
if hard_delete:
remove_workflow_workspace(workflow.workspace_path)
_delete_workflow_row_from_db(workflow)
else:
if workspace:
WorkflowStatus.failed,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you confirm what is happening with the reana-server scheduler container? Because we have this operation to delete from the DB, but the workflow will still be present in the RabbitMQ queue... (which one can check with rabbitmqctl list_queues inside the message-broker component).

I am afraid that reana-server will continuously try to start the workflow, fail, requeue, try to start, fail, requeue... infinitely.

If that is the case, I feel we need to find a nice way of doing this operation atomically (RabbitMQ and DB).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR is solving this.

WorkflowStatus.queued]:
try:
to_be_deleted = [workflow]
if all_runs:
to_be_deleted += Session.query(Workflow).\
filter(Workflow.name == workflow.name,
Workflow.status != WorkflowStatus.running).all()
for workflow in to_be_deleted:
if hard_delete:
remove_workflow_workspace(workflow.workspace_path)
_mark_workflow_as_deleted_in_db(workflow)
remove_workflow_jobs_from_cache(workflow)

return jsonify({'message': 'Workflow successfully deleted',
'workflow_id': workflow.id_,
'workflow_name': get_workflow_name(workflow),
'status': workflow.status.name,
'user': str(workflow.owner_id)}), 200
_delete_workflow_row_from_db(workflow)
else:
if workspace:
remove_workflow_workspace(workflow.workspace_path)
_mark_workflow_as_deleted_in_db(workflow)
remove_workflow_jobs_from_cache(workflow)

return jsonify({'message': 'Workflow successfully deleted',
'workflow_id': workflow.id_,
'workflow_name': get_workflow_name(workflow),
'status': workflow.status.name,
'user': str(workflow.owner_id)}), 200
except Exception as e:
logging.error(traceback.format_exc())
return jsonify({"message": str(e)}), 500
elif workflow.status == WorkflowStatus.running:
raise REANAWorkflowDeletionError(
'Workflow {0}.{1} cannot be deleted as it'
Expand Down
5 changes: 4 additions & 1 deletion reana_workflow_controller/rest/workflows_status.py
Expand Up @@ -16,7 +16,8 @@

from reana_workflow_controller.config import WORKFLOW_TIME_FORMAT
from reana_workflow_controller.errors import (REANAExternalCallError,
REANAWorkflowControllerError)
REANAWorkflowControllerError,
REANAWorkflowStatusError)
from reana_workflow_controller.rest.utils import (build_workflow_logs,
delete_workflow,
get_current_job_progress,
Expand Down Expand Up @@ -483,6 +484,8 @@ def set_workflow_status(workflow_id_or_name): # noqa
format(workflow_id_or_name)}), 404
except REANAWorkflowControllerError as e:
return jsonify({"message": str(e)}), 409
except REANAWorkflowStatusError as e:
return jsonify({"message": str(e)}), 404
except KeyError as e:
return jsonify({"message": str(e)}), 400
except NotImplementedError as e:
Expand Down