-
Notifications
You must be signed in to change notification settings - Fork 31
/
tracker.py
133 lines (115 loc) · 4.6 KB
/
tracker.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
# -*- coding: utf-8 -*-
#
# This file is part of REANA.
# Copyright (C) 2017 CERN.
#
# REANA is free software; you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software
# Foundation; either version 2 of the License, or (at your option) any later
# version.
#
# REANA is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
# A PARTICULAR PURPOSE. See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with
# REANA; if not, write to the Free Software Foundation, Inc., 59 Temple Place,
# Suite 330, Boston, MA 02111-1307, USA.
#
# In applying this license, CERN does not waive the privileges and immunities
# granted to it by virtue of its status as an Intergovernmental Organization or
# submit itself to any jurisdiction.
import ast
import datetime
import json
import logging
import adage.dagstate as dagstate
import adage.nodestate as nodestate
import jq
import networkx as nx
from yadage.utils import WithJsonRefEncoder
from .utils import publisher
log = logging.getLogger(__name__)
def analyze_progress(adageobj):
dag, rules, applied = adageobj.dag, adageobj.rules, adageobj.applied_rules
successful, failed, running, unsubmittable = 0, 0, 0, 0
nodestates = []
for node in nx.topological_sort(dag):
nodeobj = dag.getNode(node)
is_pure_publishing = nodeobj.task.metadata['wflow_hints'].get(
'is_purepub', False)
if is_pure_publishing:
continue
if nodeobj.state == nodestate.RUNNING:
nodestates.append(
{'state': 'running', 'job_id': nodeobj.resultproxy.job_id}
)
elif dagstate.node_status(nodeobj):
nodestates.append(
{'state': 'succeeded', 'job_id': nodeobj.resultproxy.job_id}
)
elif dagstate.node_ran_and_failed(nodeobj):
nodestates.append(
{'state': 'failed', 'job_id': nodeobj.resultproxy.job_id}
)
elif dagstate.upstream_failure(dag, nodeobj):
nodestates.append(
{'state': 'unsubmittable', 'job_id': None}
)
else:
nodestates.append(
{'state': 'scheduled', 'job_id': None}
)
return nodestates
class REANATracker(object):
def __init__(self, identifier=None):
self.workflow_id = identifier
log.info('initializing REANA workflow tracker for id {}'.format(
self.workflow_id))
def initialize(self, adageobj):
self.track(adageobj)
def track(self, adageobj):
log.info('sending progress information')
serialized = json.dumps(adageobj.json(), cls=WithJsonRefEncoder,
sort_keys=True)
purejson = json.loads(serialized)
progress = {
"engine_specific": None,
"failed": {"total": 0, "job_ids": []},
"total": {"total": 0, "job_ids": []},
"running": {"total": 0, "job_ids": []},
"finished": {"total": 0, "job_ids": []}
}
progress['engine_specific'] = jq.jq('{dag: {edges: .dag.edges, nodes: \
[.dag.nodes[]|{metadata: {name: .task.metadata.name}, id: .id, \
jobid: .proxy.proxydetails.job_id}]}}').transform(purejson)
for node in analyze_progress(adageobj):
key = {
'running': 'running',
'succeeded': 'finished',
'failed': 'failed',
'unsubmittable': 'planned',
'scheduled': 'total',
}[node['state']]
progress[key]['total'] += 1
if isinstance(node['job_id'], str):
job_id_dict = ast.literal_eval(node['job_id'])
job_id = job_id_dict['job_id']
if key in ['running', 'finished', 'failed']:
progress[key]['job_ids'].append(job_id)
log_message = 'this is a tracking log at {}'.format(
datetime.datetime.now().isoformat()
)
log.info('''sending to REANA
uuid: {}
json:
{}
message:
{}
'''.format(self.workflow_id,
json.dumps(progress, indent=4), log_message))
publisher.publish_workflow_status(
self.workflow_id, status=1, logs=log_message,
message={"progress": progress})
def finalize(self, adageobj):
self.track(adageobj)