/
scheduler.py
107 lines (94 loc) · 4.38 KB
/
scheduler.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
# -*- coding: utf-8 -*-
#
# This file is part of REANA.
# Copyright (C) 2018 CERN.
#
# REANA is free software; you can redistribute it and/or modify it
# under the terms of the MIT License; see LICENSE file for more details.
"""REANA Server Workflow Execution Scheduler."""
import json
import logging
from bravado.exception import HTTPBadGateway
from reana_commons.consumer import BaseConsumer
from reana_commons.tasks import reana_ready
from reana_db.database import Session
from reana_db.models import Workflow
from reana_server.api_client import (current_rwc_api_client,
current_workflow_submission_publisher)
class WorkflowExecutionScheduler(BaseConsumer):
"""Scheduler of workflow execution.
Class responsible for consuming from the workflow-submission queue
and scheduling workflows for execution based on policies and system
availability.
"""
def __init__(self, **kwargs):
"""Initialise the WorkflowExecutionScheduler class."""
super(WorkflowExecutionScheduler, self).__init__(
queue='workflow-submission', **kwargs)
def get_consumers(self, Consumer, channel):
"""Implement providing kombu.Consumers with queues/callbacks."""
return [Consumer(queues=self.queue, callbacks=[self.on_message],
accept=[self.message_default_format])]
def requeue_workflow(self, **kwargs):
"""Send a workflow back to the queue.
We do not use ``message.requeue()`` because it cannot be used after
``message.ack()``, and we cannot wait to validate all the checks
(``reana_ready()`` and calling RWC) without calling ``message.ack()``
and getting a new call to on_message with the same workflow.
"""
try:
current_workflow_submission_publisher.publish_workflow_submission(
kwargs['user'], kwargs['workflow_id_or_name'],
kwargs['parameters']
)
logging.error(
f'Requeueing workflow '
f'{kwargs["workflow_id_or_name"]} ...')
except KeyError:
logging.error(
f'Wrong parameters to requeue workflow:\n'
f'{kwargs}\n'
f'Did reana_commons.publisher.WorkflowSubmissionPublisher\'s '
f'method publish_workflow_submission change its signature?',
exc_info=True)
except Exception:
logging.error('An error has occurred while requeueing worfklow',
exc_info=True)
def on_message(self, workflow_submission, message):
"""On new workflow_submission event handler."""
message.ack()
workflow_submission = json.loads(workflow_submission)
if reana_ready():
logging.info('Starting queued workflow: {}'.
format(workflow_submission))
workflow_submission['status'] = 'start'
try:
started = False
response, http_response = current_rwc_api_client.api.\
set_workflow_status(**workflow_submission).result()
http_response_json = http_response.json()
if http_response.status_code == 200:
started = True
logging.info(
f'Workflow '
f'{http_response_json["workflow_id"]} '
f'successfully started.')
else:
logging.error(f'RWC returned an unexpected status code:\n'
f'{http_response_json}')
except HTTPBadGateway as api_e:
logging.error(f'Workflow failed to start because '
f'RWC got an error while calling an external'
f'service (i.e. DB):\n'
f'{api_e}', exc_info=True)
except Exception as e:
logging.error(f'Something went wrong while calling RWC :\n'
f'{e}', exc_info=True)
finally:
if not started:
self.requeue_workflow(**workflow_submission)
else:
logging.info(f'REANA not ready to run workflow '
f'{workflow_submission["workflow_id_or_name"]}, '
f'requeueing ...')
self.requeue_workflow(**workflow_submission)