Skip to content

Commit

Permalink
Merge e05adab into d5105e7
Browse files Browse the repository at this point in the history
  • Loading branch information
Dinos Kousidis committed Jan 16, 2019
2 parents d5105e7 + e05adab commit cd05825
Show file tree
Hide file tree
Showing 9 changed files with 292 additions and 40 deletions.
22 changes: 17 additions & 5 deletions reana_commons/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,16 @@
MQ_DEFAULT_EXCHANGE = ''
"""Message queue (RabbitMQ) exchange."""

MQ_DEFAULT_QUEUE = 'jobs-status'
"""Name of the queue where to publish/consume from."""

MQ_DEFAULT_ROUTING_KEY = 'jobs-status'
"""Message queue (RabbitMQ) routing key."""
MQ_DEFAULT_QUEUES = {'jobs-status':
{'routing_key': 'jobs-status',
'exchange': MQ_DEFAULT_EXCHANGE,
'durable': False},
'workflow-submission':
{'routing_key': 'workflow-submission',
'exchange': MQ_DEFAULT_EXCHANGE,
'durable': True}
}
"""Default message queues."""

MQ_PRODUCER_MAX_RETRIES = 3
"""Max retries to send a message."""
Expand All @@ -58,3 +63,10 @@
'reana_job_controller.json')
}
"""REANA Workflow Controller address."""

K8S_MAXIMUM_CONCURRENT_JOBS = 10
"""Upper limit on concurrent jobs running in the cluster."""

REANA_READY_CONDITIONS = {'reana_commons.tasks':
['check_predefined_conditions',
'check_running_job_count']}
23 changes: 8 additions & 15 deletions reana_commons/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,27 +11,28 @@
from kombu.mixins import ConsumerMixin

from reana_commons.config import (MQ_CONNECTION_STRING, MQ_DEFAULT_EXCHANGE,
MQ_DEFAULT_FORMAT, MQ_DEFAULT_QUEUE,
MQ_DEFAULT_ROUTING_KEY)
MQ_DEFAULT_FORMAT, MQ_DEFAULT_QUEUES)


class BaseConsumer(ConsumerMixin):
"""Base RabbitMQ consumer."""

def __init__(self, connection=None, queues=None,
def __init__(self, queue=None, connection=None,
message_default_format=None):
"""Construct a BaseConsumer.
:param connection: A :class:`kombu.Connection`, if not provided a
:class:`kombu.Connection` with the default configuration will
be instantiated.
:param queues: List of :class:`kombu.Queue` where the messages will
be consumed from, if not provided, it will be instantiated with
the default configuration.
:param queue: Name or :class:`kombu.Queue` where the messages will
be consumed from.
:param message_default_format: Defines the format the consuemer is
configured to deserialize the messages to.
"""
self.queues = queues or self._build_default_queues()
self.exchange = self._build_default_exchange()
if not isinstance(queue, Queue):
queue = Queue(queue, **MQ_DEFAULT_QUEUES[queue])
self.queue = queue
self.connection = connection or Connection(MQ_CONNECTION_STRING)
self.message_default_format = message_default_format or \
MQ_DEFAULT_FORMAT
Expand All @@ -40,14 +41,6 @@ def _build_default_exchange(self):
"""Build :class:`kombu.Exchange` with default values."""
return Exchange(MQ_DEFAULT_EXCHANGE, type='direct')

def _build_default_queues(self):
"""Build :class:`kombu.Queue` with default values."""
default_queue = Queue(MQ_DEFAULT_QUEUE,
durable=False,
exchange=self._build_default_exchange(),
routing_key=MQ_DEFAULT_ROUTING_KEY)
return [default_queue]

def get_consumers(self, Consumer, channel):
"""Map consumers to specific queues.
Expand Down
115 changes: 115 additions & 0 deletions reana_commons/openapi_specifications/reana_server.json
Original file line number Diff line number Diff line change
Expand Up @@ -675,6 +675,121 @@
"summary": "Get parameters of a workflow."
}
},
"/api/workflows/{workflow_id_or_name}/start": {
"post": {
"consumes": [
"application/json"
],
"description": "This resource starts the workflow execution process. Resource is expecting a workflow UUID.",
"operationId": "start_workflow",
"parameters": [
{
"description": "Required. Analysis UUID or name.",
"in": "path",
"name": "workflow_id_or_name",
"required": true,
"type": "string"
},
{
"description": "Required. The API access_token of workflow owner.",
"in": "query",
"name": "access_token",
"required": true,
"type": "string"
},
{
"description": "Optional. Additional input parameters and operational options.",
"in": "body",
"name": "parameters",
"required": false,
"schema": {
"type": "object"
}
}
],
"produces": [
"application/json"
],
"responses": {
"200": {
"description": "Request succeeded. Info about a workflow, including the execution status is returned.",
"examples": {
"application/json": {
"id": "256b25f4-4cfb-4684-b7a8-73872ef455a1",
"message": "Workflow submitted",
"status": "queued",
"user": "00000000-0000-0000-0000-000000000000",
"workflow_name": "mytest.1"
}
},
"schema": {
"properties": {
"message": {
"type": "string"
},
"status": {
"type": "string"
},
"user": {
"type": "string"
},
"workflow_id": {
"type": "string"
},
"workflow_name": {
"type": "string"
}
},
"type": "object"
}
},
"400": {
"description": "Request failed. The incoming payload seems malformed.",
"examples": {
"application/json": {
"message": "Malformed request."
}
}
},
"403": {
"description": "Request failed. User is not allowed to access workflow.",
"examples": {
"application/json": {
"message": "User 00000000-0000-0000-0000-000000000000 is not allowed to access workflow 256b25f4-4cfb-4684-b7a8-73872ef455a1"
}
}
},
"404": {
"description": "Request failed. Either User or Workflow does not exist.",
"examples": {
"application/json": {
"message": "Workflow 256b25f4-4cfb-4684-b7a8-73872ef455a1 does not exist"
}
}
},
"409": {
"description": "Request failed. The workflow could not be started due to a conflict.",
"examples": {
"application/json": {
"message": "Workflow 256b25f4-4cfb-4684-b7a8-73872ef455a1 could not be started because it is already running."
}
}
},
"500": {
"description": "Request failed. Internal controller error."
},
"501": {
"description": "Request failed. The specified status change is not implemented.",
"examples": {
"application/json": {
"message": "Status resume is not supported yet."
}
}
}
},
"summary": "Start workflow."
}
},
"/api/workflows/{workflow_id_or_name}/status": {
"get": {
"description": "This resource reports the status of a workflow. Resource is expecting a workflow UUID.",
Expand Down
68 changes: 54 additions & 14 deletions reana_commons/publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,16 @@
from kombu import Connection, Exchange, Queue

from .config import (MQ_CONNECTION_STRING, MQ_DEFAULT_EXCHANGE,
MQ_DEFAULT_FORMAT, MQ_DEFAULT_QUEUE,
MQ_DEFAULT_ROUTING_KEY, MQ_PASS, MQ_PORT,
MQ_DEFAULT_FORMAT, MQ_DEFAULT_QUEUES, MQ_PASS, MQ_PORT,
MQ_PRODUCER_MAX_RETRIES, MQ_URL, MQ_USER)


class WorkflowStatusPublisher():
"""Progress publisher to MQ."""
class BasePublisher():
"""Base publisher to MQ."""

def __init__(self, connection=None, queue=None, routing_key=None,
exchange=None):
"""Initialise the Publisher class.
def __init__(self, queue, routing_key, connection=None,
exchange=None, durable=False):
"""Initialise the BasePublisher class.
:param connection: A :class:`kombu.Connection`, if not provided a
:class:`kombu.Connection` with the default configuration will
Expand All @@ -37,10 +36,11 @@ def __init__(self, connection=None, queue=None, routing_key=None,
be delivered to, if not provided, it will be instantiated with
the default configuration.
"""
self._routing_key = routing_key or MQ_DEFAULT_ROUTING_KEY
self._routing_key = routing_key
self._exchange = Exchange(name=exchange or MQ_DEFAULT_EXCHANGE,
type='direct')
self._queue = Queue(queue or MQ_DEFAULT_QUEUE, durable=False,
self._queue = Queue(queue,
durable=durable,
exchange=self._exchange,
routing_key=self._routing_key)
self._connection = connection or Connection(MQ_CONNECTION_STRING)
Expand Down Expand Up @@ -75,6 +75,27 @@ def _publish(self, msg):
max_retries=MQ_PRODUCER_MAX_RETRIES)
publish(json.dumps(msg), exchange=self._exchange,
routing_key=self._routing_key, declare=[self._queue])
logging.debug('Publisher: message sent: %s', msg)

def close(self):
"""Close connection."""
logging.debug('Publisher: closing queue connection')
self._connection.release()


class WorkflowStatusPublisher(BasePublisher):
"""Progress publisher to MQ."""

def __init__(self, **kwargs):
"""Constructor."""
queue = 'jobs-status'
if 'queue' not in kwargs:
kwargs['queue'] = 'jobs-status'
if 'routing_key' not in kwargs:
kwargs['routing_key'] = MQ_DEFAULT_QUEUES[queue]['routing_key']
if 'durable' not in kwargs:
kwargs['durable'] = MQ_DEFAULT_QUEUES[queue]['durable']
super(WorkflowStatusPublisher, self).__init__(**kwargs)

def publish_workflow_status(self, workflow_uuid, status,
logs='', message=None):
Expand All @@ -95,9 +116,28 @@ def publish_workflow_status(self, workflow_uuid, status,
"message": message
}
self._publish(msg)
logging.debug('Publisher: message sent: %s', msg)

def close(self):
"""Close connection."""
logging.debug('Publisher: closing queue connection')
self._connection.release()

class WorkflowSubmissionPublisher(BasePublisher):
"""Workflow submission publisher."""

def __init__(self, **kwargs):
"""Constructor."""
queue = 'workflow-submission'
super(WorkflowSubmissionPublisher, self).__init__(
queue,
MQ_DEFAULT_QUEUES[queue]['routing_key'],
durable=MQ_DEFAULT_QUEUES[queue]['durable'],
**kwargs)

def publish_workflow_submission(self,
user_id,
workflow_id_or_name,
parameters):
"""Publish workflow submission parameters."""
msg = {
"user": user_id,
"workflow_id_or_name": workflow_id_or_name,
"parameters": parameters
}
self._publish(msg)
49 changes: 49 additions & 0 deletions reana_commons/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,17 @@
# under the terms of the MIT License; see LICENSE file for more details.
"""REANA common Celery tasks."""

import importlib
import logging

from celery import shared_task
from celery.task.control import revoke
from kubernetes.client.rest import ApiException

from reana_commons.api_client import JobControllerAPIClient
from reana_commons.config import K8S_MAXIMUM_CONCURRENT_JOBS
from reana_commons.k8s.api_client import (current_k8s_batchv1_api_client,
current_k8s_corev1_api_client)

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -42,3 +47,47 @@ def stop_workflow(workflow_uuid, job_list):
workflow_uuid
))
log.error(e)


def reana_ready():
"""Check if reana can start new workflows."""
from reana_commons.config import REANA_READY_CONDITIONS
for module_name, condition_list in REANA_READY_CONDITIONS.items():
for condition_name in condition_list:
module = importlib.import_module(module_name)
condition_func = getattr(module, condition_name)
if not condition_func():
return False
return True


def check_predefined_conditions():
"""Check k8s predefined conditions for the nodes."""
try:
node_info = current_k8s_corev1_api_client.list_node()
for node in node_info.items:
# check based on the predefined conditions about the
# node status: MemoryPressure, OutOfDisk, KubeletReady
# DiskPressure, PIDPressure,
for condition in node.status.conditions:
if not condition.status:
return False
except ApiException as e:
log.error('Something went wrong while getting node information.')
log.error(e)
return False
return True


def check_running_job_count():
"""Check upper limit on running jobs."""
try:
job_list = current_k8s_batchv1_api_client.\
list_job_for_all_namespaces()
if len(job_list.items) > K8S_MAXIMUM_CONCURRENT_JOBS:
return False
except ApiException as e:
log.error('Something went wrong while getting running job list.')
log.error(e)
return False
return True
2 changes: 1 addition & 1 deletion reana_commons/version.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,4 @@

from __future__ import absolute_import, print_function

__version__ = "0.5.0.dev20181213"
__version__ = "0.5.0.dev20190116"
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
'pytest-cache>=1.0',
'pytest-cov>=1.8.0',
'pytest-pep8>=1.0.6',
'pytest-reana>=0.5.0.dev20181203',
'pytest-reana>=0.5.0.dev20190116',
'pytest>=3.8'
]

Expand Down
Loading

0 comments on commit cd05825

Please sign in to comment.