Skip to content

Commit

Permalink
api: add endpoint to set status of an analysis
Browse files Browse the repository at this point in the history
  • Loading branch information
Anton Khodak committed Nov 17, 2017
1 parent ff82e7b commit d071e04
Show file tree
Hide file tree
Showing 3 changed files with 369 additions and 3 deletions.
102 changes: 102 additions & 0 deletions docs/openapi.json
Expand Up @@ -273,6 +273,108 @@
}
},
"summary": "Get workflow status."
},
"put": {
"description": "This resource sets the status of workflow.",
"operationId": "set_workflow_status",
"parameters": [
{
"description": "Required. Organization which the workflow belongs to.",
"in": "query",
"name": "organization",
"required": true,
"type": "string"
},
{
"description": "Required. UUID of workflow owner.",
"in": "query",
"name": "user",
"required": true,
"type": "string"
},
{
"description": "Required. Workflow UUID.",
"in": "path",
"name": "workflow_id",
"required": true,
"type": "string"
},
{
"description": "Required. New status.",
"in": "body",
"name": "status",
"required": true,
"schema": {
"description": "Required. New status.",
"type": "string"
}
}
],
"produces": [
"application/json"
],
"responses": {
"200": {
"description": "Request succeeded. Info about workflow, including the status is returned.",
"examples": {
"application/json": {
"message": "Workflow successfully launched",
"organization": "default_org",
"status": "running",
"user": "00000000-0000-0000-0000-000000000000",
"workflow_id": "256b25f4-4cfb-4684-b7a8-73872ef455a1"
}
},
"schema": {
"properties": {
"message": {
"type": "string"
},
"organization": {
"type": "string"
},
"status": {
"type": "string"
},
"user": {
"type": "string"
},
"workflow_id": {
"type": "string"
}
},
"type": "object"
}
},
"400": {
"description": "Request failed. The incoming data specification seems malformed.",
"examples": {
"application/json": {
"message": "Malformed request."
}
}
},
"403": {
"description": "Request failed. User is not allowed to access workflow.",
"examples": {
"application/json": {
"message": "User 00000000-0000-0000-0000-000000000000 is not allowed to access workflow 256b25f4-4cfb-4684-b7a8-73872ef455a1"
}
}
},
"404": {
"description": "Request failed. Either User or Workflow doesn't exist.",
"examples": {
"application/json": {
"message": "Workflow 256b25f4-4cfb-4684-b7a8-73872ef455a1 doesn't exist"
}
}
},
"500": {
"description": "Request failed. Internal controller error."
}
},
"summary": "Set workflow status."
}
},
"/api/workflows/{workflow_id}/workspace": {
Expand Down
172 changes: 169 additions & 3 deletions reana_workflow_controller/rest.py
Expand Up @@ -32,9 +32,14 @@

from .factory import db
from .fsdb import create_workflow_workspace, list_directory_files
from .models import User, Workflow
from .models import User, Workflow, WorkflowStatus
from .tasks import run_yadage_workflow

START = 'start'
STOP = 'stop'
PAUSE = 'pause'
STATUSES = {START, STOP, PAUSE}

organization_to_queue = {
'alice': 'alice-queue',
'atlas': 'atlas-queue',
Expand Down Expand Up @@ -821,7 +826,6 @@ def run_yadage_workflow_from_spec_endpoint(): # noqa
@restapi_blueprint.route('/workflows/<workflow_id>/status', methods=['GET'])
def get_workflow_status(workflow_id): # noqa
r"""Get workflow status.
---
get:
summary: Get workflow status.
Expand Down Expand Up @@ -918,7 +922,6 @@ def get_workflow_status(workflow_id): # noqa
return jsonify(
{'message': 'User {} is not allowed to access workflow {}'
.format(user_uuid, workflow_id)}), 403

return jsonify({'id': workflow.id_,
'status': workflow.status.name,
'organization': organization,
Expand All @@ -927,3 +930,166 @@ def get_workflow_status(workflow_id): # noqa
return jsonify({"message": str(e)}), 400
except Exception as e:
return jsonify({"message": str(e)}), 500


@restapi_blueprint.route('/workflows/<workflow_id>/status', methods=['PUT'])
def set_workflow_status(workflow_id): # noqa
r"""Set workflow status.
---
put:
summary: Set workflow status.
description: >-
This resource sets the status of workflow.
operationId: set_workflow_status
produces:
- application/json
parameters:
- name: organization
in: query
description: Required. Organization which the workflow belongs to.
required: true
type: string
- name: user
in: query
description: Required. UUID of workflow owner.
required: true
type: string
- name: workflow_id
in: path
description: Required. Workflow UUID.
required: true
type: string
- name: status
in: body
description: Required. New status.
required: true
schema:
type: string
description: Required. New status.
responses:
200:
description: >-
Request succeeded. Info about workflow, including the status is
returned.
schema:
type: object
properties:
message:
type: string
workflow_id:
type: string
organization:
type: string
status:
type: string
user:
type: string
examples:
application/json:
{
"message": "Workflow successfully launched",
"workflow_id": "256b25f4-4cfb-4684-b7a8-73872ef455a1",
"organization": "default_org",
"status": "running",
"user": "00000000-0000-0000-0000-000000000000"
}
400:
description: >-
Request failed. The incoming data specification seems malformed.
examples:
application/json:
{
"message": "Malformed request."
}
403:
description: >-
Request failed. User is not allowed to access workflow.
examples:
application/json:
{
"message": "User 00000000-0000-0000-0000-000000000000
is not allowed to access workflow
256b25f4-4cfb-4684-b7a8-73872ef455a1"
}
404:
description: >-
Request failed. Either User or Workflow doesn't exist.
examples:
application/json:
{
"message": "User 00000000-0000-0000-0000-000000000000 doesn't
exist"
}
application/json:
{
"message": "Workflow 256b25f4-4cfb-4684-b7a8-73872ef455a1
doesn't exist"
}
500:
description: >-
Request failed. Internal controller error.
"""

try:
organization = request.args['organization']
user_uuid = request.args['user']
workflow = Workflow.query.filter(Workflow.id_ == workflow_id).first()
status = request.json
if not (status in STATUSES):
return jsonify({'message': 'Status {0} is not one of: {1}'.
format(status, ", ".join(STATUSES))}), 400
if not workflow:
return jsonify({'message': 'Workflow {} does not exist'.
format(workflow_id)}), 404
if not str(workflow.owner_id) == user_uuid:
return jsonify(
{'message': 'User {} is not allowed to access workflow {}'
.format(user_uuid, workflow_id)}), 403
if status == START:
return start_workflow(organization, user_uuid, workflow)
else:
raise NotImplemented("Status {} is not supported yet"
.format(status))
except KeyError as e:
return jsonify({"message": str(e)}), 400
except Exception as e:
return jsonify({"message": str(e)}), 500


def start_workflow(organization, user_uuid, workflow):
"""Start a workflow."""
workflow.status = WorkflowStatus.running
db.session.commit()
if workflow.type_ == 'yadage':
return run_yadage_workflow_from_spec(organization,
user_uuid,
workflow)
elif workflow.type_ == 'cwl':
pass


def run_yadage_workflow_from_spec(organization, user_uuid, workflow):
"""Run a yadage workflow."""
try:
kwargs = {
"workflow_uuid": str(workflow.id_),
"workflow_workspace": workflow.workspace_path,
"workflow_json": workflow.specification,
"parameters": workflow.parameters
}
queue = organization_to_queue[organization]
if not os.environ.get("TESTS"):
resultobject = run_yadage_workflow.apply_async(
kwargs=kwargs,
queue='yadage-{}'.format(queue)
)
return jsonify({'message': 'Workflow successfully launched',
'workflow_id': workflow.id_,
'status': workflow.status.name,
'organization': organization,
'user': user_uuid}), 200

except(KeyError, ValueError):
traceback.print_exc()
abort(400)

0 comments on commit d071e04

Please sign in to comment.