Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

api: add endpoint to set status of an analysis #51

Merged
merged 1 commit into from
Nov 17, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 102 additions & 0 deletions docs/openapi.json
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,108 @@
}
},
"summary": "Get workflow status."
},
"put": {
"description": "This resource sets the status of workflow.",
"operationId": "set_workflow_status",
"parameters": [
{
"description": "Required. Organization which the workflow belongs to.",
"in": "query",
"name": "organization",
"required": true,
"type": "string"
},
{
"description": "Required. UUID of workflow owner.",
"in": "query",
"name": "user",
"required": true,
"type": "string"
},
{
"description": "Required. Workflow UUID.",
"in": "path",
"name": "workflow_id",
"required": true,
"type": "string"
},
{
"description": "Required. New status.",
"in": "body",
"name": "status",
"required": true,
"schema": {
"description": "Required. New status.",
"type": "string"
}
}
],
"produces": [
"application/json"
],
"responses": {
"200": {
"description": "Request succeeded. Info about workflow, including the status is returned.",
"examples": {
"application/json": {
"message": "Workflow successfully launched",
"organization": "default_org",
"status": "running",
"user": "00000000-0000-0000-0000-000000000000",
"workflow_id": "256b25f4-4cfb-4684-b7a8-73872ef455a1"
}
},
"schema": {
"properties": {
"message": {
"type": "string"
},
"organization": {
"type": "string"
},
"status": {
"type": "string"
},
"user": {
"type": "string"
},
"workflow_id": {
"type": "string"
}
},
"type": "object"
}
},
"400": {
"description": "Request failed. The incoming data specification seems malformed.",
"examples": {
"application/json": {
"message": "Malformed request."
}
}
},
"403": {
"description": "Request failed. User is not allowed to access workflow.",
"examples": {
"application/json": {
"message": "User 00000000-0000-0000-0000-000000000000 is not allowed to access workflow 256b25f4-4cfb-4684-b7a8-73872ef455a1"
}
}
},
"404": {
"description": "Request failed. Either User or Workflow doesn't exist.",
"examples": {
"application/json": {
"message": "Workflow 256b25f4-4cfb-4684-b7a8-73872ef455a1 doesn't exist"
}
}
},
"500": {
"description": "Request failed. Internal controller error."
}
},
"summary": "Set workflow status."
}
},
"/api/workflows/{workflow_id}/workspace": {
Expand Down
172 changes: 169 additions & 3 deletions reana_workflow_controller/rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,14 @@

from .factory import db
from .fsdb import create_workflow_workspace, list_directory_files
from .models import User, Workflow
from .models import User, Workflow, WorkflowStatus
from .tasks import run_yadage_workflow

START = 'start'
STOP = 'stop'
PAUSE = 'pause'
STATUSES = {START, STOP, PAUSE}

organization_to_queue = {
'alice': 'alice-queue',
'atlas': 'atlas-queue',
Expand Down Expand Up @@ -821,7 +826,6 @@ def run_yadage_workflow_from_spec_endpoint(): # noqa
@restapi_blueprint.route('/workflows/<workflow_id>/status', methods=['GET'])
def get_workflow_status(workflow_id): # noqa
r"""Get workflow status.

---
get:
summary: Get workflow status.
Expand Down Expand Up @@ -918,7 +922,6 @@ def get_workflow_status(workflow_id): # noqa
return jsonify(
{'message': 'User {} is not allowed to access workflow {}'
.format(user_uuid, workflow_id)}), 403

return jsonify({'id': workflow.id_,
'status': workflow.status.name,
'organization': organization,
Expand All @@ -927,3 +930,166 @@ def get_workflow_status(workflow_id): # noqa
return jsonify({"message": str(e)}), 400
except Exception as e:
return jsonify({"message": str(e)}), 500


@restapi_blueprint.route('/workflows/<workflow_id>/status', methods=['PUT'])
def set_workflow_status(workflow_id): # noqa
r"""Set workflow status.

---
put:
summary: Set workflow status.
description: >-
This resource sets the status of workflow.
operationId: set_workflow_status
produces:
- application/json
parameters:
- name: organization
in: query
description: Required. Organization which the workflow belongs to.
required: true
type: string
- name: user
in: query
description: Required. UUID of workflow owner.
required: true
type: string
- name: workflow_id
in: path
description: Required. Workflow UUID.
required: true
type: string
- name: status
in: body
description: Required. New status.
required: true
schema:
type: string
description: Required. New status.
responses:
200:
description: >-
Request succeeded. Info about workflow, including the status is
returned.
schema:
type: object
properties:
message:
type: string
workflow_id:
type: string
organization:
type: string
status:
type: string
user:
type: string
examples:
application/json:
{
"message": "Workflow successfully launched",
"workflow_id": "256b25f4-4cfb-4684-b7a8-73872ef455a1",
"organization": "default_org",
"status": "running",
"user": "00000000-0000-0000-0000-000000000000"
}
400:
description: >-
Request failed. The incoming data specification seems malformed.
examples:
application/json:
{
"message": "Malformed request."
}
403:
description: >-
Request failed. User is not allowed to access workflow.
examples:
application/json:
{
"message": "User 00000000-0000-0000-0000-000000000000
is not allowed to access workflow
256b25f4-4cfb-4684-b7a8-73872ef455a1"
}
404:
description: >-
Request failed. Either User or Workflow doesn't exist.
examples:
application/json:
{
"message": "User 00000000-0000-0000-0000-000000000000 doesn't
exist"
}
application/json:
{
"message": "Workflow 256b25f4-4cfb-4684-b7a8-73872ef455a1
doesn't exist"
}
500:
description: >-
Request failed. Internal controller error.
"""

try:
organization = request.args['organization']
user_uuid = request.args['user']
workflow = Workflow.query.filter(Workflow.id_ == workflow_id).first()
status = request.json
if not (status in STATUSES):
return jsonify({'message': 'Status {0} is not one of: {1}'.
format(status, ", ".join(STATUSES))}), 400
if not workflow:
return jsonify({'message': 'Workflow {} does not exist'.
format(workflow_id)}), 404
if not str(workflow.owner_id) == user_uuid:
return jsonify(
{'message': 'User {} is not allowed to access workflow {}'
.format(user_uuid, workflow_id)}), 403
if status == START:
return start_workflow(organization, user_uuid, workflow)
else:
raise NotImplemented("Status {} is not supported yet"
.format(status))
except KeyError as e:
return jsonify({"message": str(e)}), 400
except Exception as e:
return jsonify({"message": str(e)}), 500


def start_workflow(organization, user_uuid, workflow):
"""Start a workflow."""
workflow.status = WorkflowStatus.running
db.session.commit()
if workflow.type_ == 'yadage':
return run_yadage_workflow_from_spec(organization,
user_uuid,
workflow)
elif workflow.type_ == 'cwl':
pass


def run_yadage_workflow_from_spec(organization, user_uuid, workflow):
"""Run a yadage workflow."""
try:
kwargs = {
"workflow_uuid": str(workflow.id_),
"workflow_workspace": workflow.workspace_path,
"workflow_json": workflow.specification,
"parameters": workflow.parameters
}
queue = organization_to_queue[organization]
if not os.environ.get("TESTS"):
resultobject = run_yadage_workflow.apply_async(
kwargs=kwargs,
queue='yadage-{}'.format(queue)
)
return jsonify({'message': 'Workflow successfully launched',
'workflow_id': workflow.id_,
'status': workflow.status.name,
'organization': organization,
'user': user_uuid}), 200

except(KeyError, ValueError):
traceback.print_exc()
abort(400)
Loading