Skip to content

Commit

Permalink
consumer: addition of MQ consumer class
Browse files Browse the repository at this point in the history
* ADD consumer class with reconnection strategy based on
  pika usage example:
  https://pika.readthedocs.io/en/latest/examples/blocking_consume_recover_multiple_hosts.html

Signed-off-by: Dinos Kousidis <dinos.kousidis@cern.ch>
  • Loading branch information
dinosk committed Jun 27, 2018
1 parent 3f9312c commit 42d0982
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 42 deletions.
45 changes: 3 additions & 42 deletions reana_workflow_controller/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,11 @@

"""REANA Workflow Controller command line interface."""

import json
import logging

import click
import pika
from reana_commons.database import Session
from reana_commons.models import WorkflowStatus

from reana_workflow_controller.config import (BROKER_PASS, BROKER_PORT,
BROKER_URL, BROKER_USER)
from reana_workflow_controller.tasks import (_update_job_progress,
_update_run_progress,
_update_workflow_status)
from reana_workflow_controller.consumer import Consumer


@click.command('consume-job-queue')
Expand All @@ -44,36 +36,5 @@ def consume_job_queue():
level=logging.INFO,
format='%(asctime)s - %(threadName)s - %(levelname)s: %(message)s'
)

def _callback_job_status(ch, method, properties, body):
body_dict = json.loads(body)
workflow_uuid = body_dict.get('workflow_uuid')
if workflow_uuid:
status = body_dict.get('status')
if status:
status = WorkflowStatus(status)
print(" [x] Received workflow_uuid: {0} status: {1}".
format(workflow_uuid, status))
logs = body_dict.get('logs') or ''
_update_workflow_status(workflow_uuid, status, logs)
if 'message' in body_dict and body_dict.get('message'):
msg = body_dict['message']
if 'progress' in msg:
_update_run_progress(workflow_uuid, msg)
_update_job_progress(workflow_uuid, msg)
Session.commit()

broker_credentials = pika.credentials.PlainCredentials(BROKER_USER,
BROKER_PASS)
connection = pika.BlockingConnection(
pika.ConnectionParameters(BROKER_URL,
BROKER_PORT,
'/',
broker_credentials))
channel = connection.channel()
channel.queue_declare(queue='jobs-status')
channel.basic_consume(_callback_job_status,
queue='jobs-status',
no_ack=True)
logging.info(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
consumer = Consumer()
consumer.consume()
8 changes: 8 additions & 0 deletions reana_workflow_controller/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,4 +72,12 @@
WORKFLOW_TIME_FORMAT = "%Y-%m-%dT%H:%M:%S"
"""Time format for workflow starting time, created time etc."""

EXCHANGE = ''

EXCHANGE_TYPE = ''

STATUS_QUEUE = 'jobs-status'

ROUTING_KEY = 'jobs-status'

POSSIBLE_JOB_STATUSES = ['submitted', 'succeeded', 'failed', 'planned']
93 changes: 93 additions & 0 deletions reana_workflow_controller/consumer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
# -*- coding: utf-8 -*-
#
# This file is part of REANA.
# Copyright (C) 2018 CERN.
#
# REANA is free software; you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software
# Foundation; either version 2 of the License, or (at your option) any later
# version.
#
# REANA is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
# A PARTICULAR PURPOSE. See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with
# REANA; if not, write to the Free Software Foundation, Inc., 59 Temple Place,
# Suite 330, Boston, MA 02111-1307, USA.
#
# In applying this license, CERN does not waive the privileges and immunities
# granted to it by virtue of its status as an Intergovernmental Organization or
# submit itself to any jurisdiction.

"""REANA Workflow Controller MQ Consumer."""

import json

import pika
from reana_commons.database import Session
from reana_commons.models import WorkflowStatus

from .config import (BROKER_PASS, BROKER_PORT, BROKER_URL, BROKER_USER,
STATUS_QUEUE)
from .tasks import (_update_job_progress, _update_run_progress,
_update_workflow_status)


class Consumer:
"""Consumer of MQ job status updates."""

def __init__(self):
"""Constructor."""
self.broker_credentials = pika.PlainCredentials(BROKER_USER,
BROKER_PASS)
self._params = pika.connection.ConnectionParameters(
BROKER_URL, BROKER_PORT, '/', self.broker_credentials)
self._conn = None
self._channel = None

def connect(self):
"""Connect to MQ channel."""
if not self._conn or self._conn.is_closed:
self._conn = pika.BlockingConnection(self._params)
self._channel = self._conn.channel()
self._channel.basic_qos(prefetch_count=1)
self._channel.queue_declare(queue=STATUS_QUEUE)

def on_message(self, channel, method_frame, header_frame, body):
"""On new message event handler."""
self._channel.basic_ack(delivery_tag=method_frame.delivery_tag)
body_dict = json.loads(body)
workflow_uuid = body_dict.get('workflow_uuid')
if workflow_uuid:
status = body_dict.get('status')
if status:
status = WorkflowStatus(status)
print(" [x] Received workflow_uuid: {0} status: {1}".
format(workflow_uuid, status))
logs = body_dict.get('logs') or ''
_update_workflow_status(workflow_uuid, status, logs)
if 'message' in body_dict and body_dict.get('message'):
msg = body_dict['message']
if 'progress' in msg:
_update_run_progress(workflow_uuid, msg)
_update_job_progress(workflow_uuid, msg)
Session.commit()

def consume(self):
"""Start consuming incoming messages."""
while True:
self.connect()
self._channel.basic_consume(self.on_message, STATUS_QUEUE)
try:
self._channel.start_consuming()
except KeyboardInterrupt:
self._channel.stop_consuming()
self._conn.close()
except pika.exceptions.ConnectionClosedByBroker:
# Uncomment this to make the example not attempt recovery
# from server-initiated connection closure, including
# when the node is stopped cleanly
# except pika.exceptions.ConnectionClosedByBroker:
# pass
continue

0 comments on commit 42d0982

Please sign in to comment.