Skip to content

Commit

Permalink
api: add create workflow endpoint
Browse files Browse the repository at this point in the history
Signed-off-by: Diego Rodriguez <diego.rodriguez@cern.ch>
  • Loading branch information
Diego Rodriguez committed Oct 27, 2017
1 parent 7bb10bb commit 93eaf55
Show file tree
Hide file tree
Showing 6 changed files with 349 additions and 21 deletions.
86 changes: 84 additions & 2 deletions docs/openapi.json
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,86 @@
}
},
"summary": "Returns all workflows."
},
"post": {
"description": "This resource expects a POST call to create a new workflow workspace.",
"operationId": "create_workflow",
"parameters": [
{
"description": "Required. Organization which the worklow belongs to.",
"in": "query",
"name": "organization",
"required": true,
"type": "string"
},
{
"description": "Required. UUID of workflow owner.",
"in": "query",
"name": "user",
"required": true,
"type": "string"
},
{
"description": "JSON object including workflow parameters and workflow specification in JSON format (`yadageschemas.load()` output) with necessary data to instantiate a yadage workflow.",
"in": "body",
"name": "workflow",
"required": true,
"schema": {
"properties": {
"parameters": {
"description": "Workflow parameters.",
"type": "object"
},
"specification": {
"description": "Yadage specification in JSON format.",
"type": "object"
},
"type": {
"description": "Workflow type.",
"type": "string"
}
},
"type": "object"
}
}
],
"produces": [
"application/json"
],
"responses": {
"200": {
"description": "Request succeeded. The file has been added to the workspace.",
"examples": {
"application/json": {
"message": "Workflow workspace has been created.",
"workflow_id": "cdcf48b1-c2f3-4693-8230-b066e088c6ac"
}
},
"schema": {
"properties": {
"message": {
"type": "string"
},
"workflow_id": {
"type": "string"
}
},
"type": "object"
}
},
"400": {
"description": "Request failed. The incoming data specification seems malformed"
},
"404": {
"description": "Request failed. User doesn't exist.",
"examples": {
"application/json": {
"message": "User 00000000-0000-0000-0000-000000000000 doesn't exist"
}
}
}
},
"summary": "Create workflow and its workspace."
}
},
"/api/workflows/{workflow_id}/workspace": {
Expand Down Expand Up @@ -157,8 +237,7 @@
"description": "Request succeeded. The file has been added to the workspace.",
"examples": {
"application/json": {
"message": "Workflow has been added to the workspace",
"workflow_id": "cdcf48b1-c2f3-4693-8230-b066e088c6ac"
"message": "The file input.csv has been successfully transferred."
}
},
"schema": {
Expand All @@ -175,6 +254,9 @@
},
"400": {
"description": "Request failed. The incoming data specification seems malformed"
},
"500": {
"description": "Request failed. Internal controller error."
}
},
"summary": "Adds a file to the workflow workspace."
Expand Down
38 changes: 37 additions & 1 deletion reana_workflow_controller/fsdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,45 @@ def __new__(cls):
return REANAFS.__instance


def get_user_analyses_dir(org, user):
"""Build the analyses directory path for certain user and organization.
:param org: Organization which user is part of.
:param user: Working directory owner.
:return: Path to the user's analyses directory.
"""
return path.join(org, user, 'analyses')


def create_user_space(user_id, org):
"""Create analyses directory for `user_id`."""
reana_fs = REANAFS()
user_analyses_dir = path.join(org, user_id, 'analyses')
user_analyses_dir = get_user_analyses_dir(org, user_id)
if not reana_fs.exists(user_analyses_dir):
reana_fs.makedirs(user_analyses_dir)


def create_workflow_workspace(org, user, workflow_uuid):
"""Create analysis and workflow workspaces.
A directory structure will be created where `/:analysis_uuid` represents
the analysis workspace and `/:analysis_uuid/workspace` the workflow
workspace.
:param org: Organization which user is part of.
:param user: Workspaces owner.
:param workflow_uuid: Analysis UUID.
:return: Workflow and analysis workspace path.
"""
reana_fs = REANAFS()
analysis_workspace = path.join(get_user_analyses_dir(org, user),
workflow_uuid)

if not reana_fs.exists(analysis_workspace):
reana_fs.makedirs(analysis_workspace)

workflow_workspace = path.join(analysis_workspace, 'workspace')
if not reana_fs.exists(workflow_workspace):
reana_fs.makedirs(workflow_workspace)

return workflow_workspace, analysis_workspace
9 changes: 8 additions & 1 deletion reana_workflow_controller/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

import enum

from sqlalchemy_utils.types import UUIDType
from sqlalchemy_utils.types import JSONType, UUIDType

from .factory import db

Expand Down Expand Up @@ -69,13 +69,20 @@ class Workflow(db.Model):
workspace_path = db.Column(db.String(255))
status = db.Column(db.Enum(WorkflowStatus), default=WorkflowStatus.created)
owner_id = db.Column(UUIDType, db.ForeignKey('user.id_'), nullable=False)
specification = db.Column(JSONType)
parameters = db.Column(JSONType)
type_ = db.Column(db.String(30))

def __init__(self, id_, workspace_path, owner_id,
specification, parameters, type_,
status=WorkflowStatus.created):
"""Initialize workflow model."""
self.id_ = id_
self.workspace_path = workspace_path
self.owner_id = owner_id
self.specification = specification
self.parameters = parameters
self.type_ = type_
self.status = status

def __repr__(self):
Expand Down
150 changes: 138 additions & 12 deletions reana_workflow_controller/rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,16 @@

"""REANA Workflow Controller REST API."""

import os
import traceback
from uuid import uuid4

from flask import Blueprint, abort, jsonify, request
from werkzeug.utils import secure_filename

from .factory import db
from .models import User
from .fsdb import create_workflow_workspace
from .models import User, Workflow
from .tasks import run_yadage_workflow

organization_to_queue = {
Expand Down Expand Up @@ -169,6 +172,109 @@ def get_workflows(): # noqa
return jsonify({"message": str(e)}), 500


@restapi_blueprint.route('/workflows', methods=['POST'])
def create_workflow(): # noqa
r"""Create workflow and its workspace.
---
post:
summary: Create workflow and its workspace.
description: >-
This resource expects a POST call to create a new workflow workspace.
operationId: create_workflow
produces:
- application/json
parameters:
- name: organization
in: query
description: Required. Organization which the worklow belongs to.
required: true
type: string
- name: user
in: query
description: Required. UUID of workflow owner.
required: true
type: string
- name: workflow
in: body
description: >-
JSON object including workflow parameters and workflow
specification in JSON format (`yadageschemas.load()` output)
with necessary data to instantiate a yadage workflow.
required: true
schema:
type: object
properties:
parameters:
type: object
description: Workflow parameters.
specification:
type: object
description: >-
Yadage specification in JSON format.
type:
type: string
description: Workflow type.
responses:
200:
description: >-
Request succeeded. The file has been added to the workspace.
schema:
type: object
properties:
message:
type: string
workflow_id:
type: string
examples:
application/json:
{
"message": "Workflow workspace has been created.",
"workflow_id": "cdcf48b1-c2f3-4693-8230-b066e088c6ac"
}
400:
description: >-
Request failed. The incoming data specification seems malformed
404:
description: >-
Request failed. User doesn't exist.
examples:
application/json:
{
"message": "User 00000000-0000-0000-0000-000000000000 doesn't
exist"
}
"""
try:
organization = request.args['organization']
user_uuid = request.args['user']
user = User.query.filter(User.id_ == user_uuid).first()
if not user:
return jsonify(
{'message': 'User {} does not exist'.format(user)}), 404

workflow_uuid = str(uuid4())
workflow_workspace, _ = create_workflow_workspace(
user_uuid,
organization,
workflow_uuid)
# add spec and params to DB as JSON
workflow = Workflow(id_=workflow_uuid,
workspace_path=workflow_workspace,
owner_id=request.args['user'],
specification=request.json['specification'],
parameters=request.json['parameters'],
type_=request.json['type'])
db.session.add(workflow)
db.session.commit()
return jsonify({'message': 'Workflow workspace created',
'workflow_id': workflow_uuid}), 201
except KeyError as e:
return jsonify({"message": str(e)}), 400
except Exception as e:
return jsonify({"message": str(e)}), 500


@restapi_blueprint.route('/workflows/<workflow_id>/workspace',
methods=['POST'])
def seed_workflow_workspace(workflow_id):
Expand Down Expand Up @@ -225,19 +331,29 @@ def seed_workflow_workspace(workflow_id):
examples:
application/json:
{
"message": "Workflow has been added to the workspace",
"workflow_id": "cdcf48b1-c2f3-4693-8230-b066e088c6ac"
"message": "The file input.csv has been successfully
transferred.",
}
400:
description: >-
Request failed. The incoming data specification seems malformed
500:
description: >-
Request failed. Internal controller error.
"""
try:
file_ = request.files['file_content'].stream.read()
file_ = request.files['file_content']
file_name = secure_filename(request.args['file_name'])
if not file_name:
raise ValueError('The file transferred needs to have name.')

workflow = Workflow.query.filter(Workflow.id_ == workflow_id).first()
file_.save(os.path.join(workflow.workspace, file_name))
return jsonify({'message': 'File successfully transferred'}), 200
except KeyError as e:
return jsonify({"message": str(e)}), 400
except ValueError as e:
return jsonify({"message": str(e)}), 400
except Exception as e:
return jsonify({"message": str(e)}), 500

Expand Down Expand Up @@ -316,9 +432,18 @@ def run_yadage_workflow_from_remote_endpoint(): # noqa
"""
try:
if request.json:
# get workflow UUID from client in order to retrieve its workspace
# from DB
workflow_workspace = ''
kwargs = {
"workflow_workspace": workflow_workspace,
"workflow": request.json['workflow'],
"toplevel": request.json['toplevel'],
"parameters": request.json['preset_pars']
}
queue = organization_to_queue[request.args.get('organization')]
resultobject = run_yadage_workflow.apply_async(
args=[request.json],
kwargs=kwargs,
queue='yadage-{}'.format(queue)
)
return jsonify({'message': 'Workflow successfully launched',
Expand Down Expand Up @@ -393,17 +518,18 @@ def run_yadage_workflow_from_spec_endpoint(): # noqa
"""
try:
# hardcoded until confirmation from `yadage`
nparallel = 10
if request.json:
arguments = {
"nparallel": nparallel,
"workflow": request.json['workflow_spec'],
"toplevel": "", # ignored when spec submited
"preset_pars": request.json['parameters']
# get workflow UUID from client in order to retrieve its workspace
# from DB
workflow_workspace = ''
kwargs = {
"workflow_workspace": workflow_workspace,
"workflow_json": request.json['workflow_spec'],
"parameters": request.json['parameters']
}
queue = organization_to_queue[request.args.get('organization')]
resultobject = run_yadage_workflow.apply_async(
args=[arguments],
kwargs=kwargs,
queue='yadage-{}'.format(queue)
)
return jsonify({'message': 'Workflow successfully launched',
Expand Down
Loading

0 comments on commit 93eaf55

Please sign in to comment.