-
Notifications
You must be signed in to change notification settings - Fork 17
/
wfprocessor.py
618 lines (453 loc) · 21.9 KB
/
wfprocessor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
__copyright__ = "Copyright 2017-2019, http://radical.rutgers.edu"
__author__ = "RADICAL Team <radical@rutgers.edu>"
__license__ = "MIT"
import os
import threading as mt
import radical.utils as ru
from ..states import INITIAL, FINAL
from ..states import SUSPENDED, SCHEDULED, SCHEDULING, DONE, FAILED
from ..task import Task
from ..exceptions import EnTKError
# ------------------------------------------------------------------------------
#
class WFprocessor(object):
"""
An WFprocessor (workflow processor) takes the responsibility of dispatching
tasks from the various pipelines of the workflow according to their relative
order to the TaskManager. All state updates are relflected in the AppManager
as we operate on the reference of the same workflow object. The WFprocessor
also retrieves completed tasks from the TaskManager and updates states of
PST accordingly.
:Arguments:
:sid: (str) session id used by the profiler and loggers
:workflow: (set) REFERENCE of the AppManager's workflow
:resubmit_failed: (bool) True if failed tasks should be resubmitted
:zmq_info: (dict) zmq queue addresses
"""
# --------------------------------------------------------------------------
#
def __init__(self,
sid,
workflow,
resubmit_failed,
zmq_info):
# Mandatory arguments
self._sid = sid
self._resubmit_failed = resubmit_failed
# Assign validated workflow
self._workflow = workflow
# Create logger and profiler at their specific locations using the sid
self._path = os.getcwd() + '/' + self._sid
self._uid = ru.generate_id('wfprocessor.%(counter)04d', ru.ID_CUSTOM)
name = 'radical.entk.%s' % self._uid
self._logger = ru.Logger (name, path=self._path)
self._prof = ru.Profiler(name, path=self._path)
self._report = ru.Reporter(name)
# Defaults
self._wfp_process = None
self._enqueue_thread = None
self._dequeue_thread = None
self._enqueue_thread_terminate = None
self._dequeue_thread_terminate = None
self._logger.info('Created WFProcessor object: %s', self._uid)
self._prof.prof('create_wfp', uid=self._uid)
self._setup_zmq(zmq_info)
# --------------------------------------------------------------------------
#
def _setup_zmq(self, zmq_info):
sid = self._sid
self._zmq_queue = {
'put' : ru.zmq.Putter(sid, url=zmq_info['put'], path=sid),
'get' : ru.zmq.Getter(sid, url=zmq_info['get'], path=sid)}
# --------------------------------------------------------------------------
#
def _advance(self, obj, obj_type, new_state):
'''
transition `obj` of type `obj_type` into state `new_state`
'''
# NOTE: this is a local operation, no queue communication is involved
# (other than the `_advance()` in the TaskManager classes, which
if obj_type == 'Task' : msg = obj.parent_stage['uid']
elif obj_type == 'Stage': msg = obj.parent_pipeline['uid']
else : msg = None
obj.state = new_state
self._prof.prof('advance', uid=obj.uid, state=obj.state, msg=msg)
self._logger.info('Transition %s to state %s', obj.uid, new_state)
self._report.ok('Update: ')
self._report.info('%s state: %s\n' % (obj.luid, obj.state))
if obj_type == 'Task' and obj.state == FAILED:
self._report.error('task %s failed: %s\n%s\n'
% (obj.uid, obj.exception, obj.exception_detail))
self._logger.info('Transition %s to state %s', obj.uid, new_state)
# --------------------------------------------------------------------------
# Getter
#
@property
def workflow(self):
return self._workflow
# --------------------------------------------------------------------------
# Private Methods
#
def _create_workload(self):
# We iterate through all pipelines to collect tasks from
# stages that are pending scheduling. Once collected, these tasks
# will be communicated to the tmgr in bulk.
# Initial empty list to store executable tasks across different
# pipelines
workload = list()
# The executable tasks can belong to different pipelines, and
# hence different stages. Empty list to store the stages so that
# we can update the state of stages accordingly
scheduled_stages = list()
for pipe in self._workflow:
with pipe.lock:
# If Pipeline is in the final state or suspended, we
# skip processing it.
if pipe.state in FINAL or \
pipe.completed or \
pipe.state == SUSPENDED:
continue
if pipe.state == INITIAL:
# Set state of pipeline to SCHEDULING if it is in INITIAL
self._advance(pipe, 'Pipeline', SCHEDULING)
# Get the next stage of this pipeline to process
exec_stage = pipe.stages[pipe.current_stage - 1]
if not exec_stage.uid:
# TODO: Move parent uid, name assignment to assign_uid()
exec_stage.parent_pipeline['uid'] = pipe.uid
exec_stage.parent_pipeline['name'] = pipe.name
# If its a new stage, update its state
if exec_stage.state == INITIAL:
self._advance(exec_stage, 'Stage', SCHEDULING)
# Get all tasks of a stage in SCHEDULED state
exec_tasks = list()
if exec_stage.state == SCHEDULING:
exec_tasks = exec_stage.tasks
for exec_task in exec_tasks:
state = exec_task.state
if state == INITIAL or \
(state == FAILED and self._resubmit_failed):
# Set state of Tasks in current Stage
# to SCHEDULING
self._advance(exec_task, 'Task', SCHEDULING)
# Store the tasks from different pipelines
# into our workload list. All tasks will
# be submitted in bulk and their states
# will be updated accordingly
workload.append(exec_task)
# We store the stages since the stages the
# above tasks belong to also need to be
# updated. If its a task that failed, the
# stage is already in the correct state
if exec_task.state == FAILED:
continue
if exec_stage not in scheduled_stages:
scheduled_stages.append(exec_stage)
return workload, scheduled_stages
# --------------------------------------------------------------------------
#
def reset_workflow(self):
'''
When a component is restarted we reset all the tasks that did not finish
to the first state. Then they are scheduled again for execution.
'''
self._logger.debug('Resetting workflow execution')
for pipe in self._workflow:
with pipe.lock:
if pipe.state in FINAL or \
pipe.completed or \
pipe.state == SUSPENDED:
continue
curr_stage = pipe.stages[pipe.current_stage - 1]
for task in curr_stage.tasks:
if task.state not in FINAL:
self._advance(task, 'Task', INITIAL)
self._advance(curr_stage, 'Stage', SCHEDULING)
# --------------------------------------------------------------------------
#
def _execute_workload(self, workload, scheduled_stages):
msg = {'type': 'workload',
'body': [task.as_dict() for task in workload]}
# Send the workload to the pending queue
self._zmq_queue['put'].put(qname='pending', msgs=[msg])
self._logger.debug('Workload submitted to Task Manager')
# Update the state of the tasks in the workload
for task in workload:
# Set state of Tasks in current Stage to SCHEDULED
self._advance(task, 'Task', SCHEDULED)
# Update the state of the stages from which tasks have
# been scheduled
if scheduled_stages:
for executable_stage in scheduled_stages:
self._advance(executable_stage, 'Stage', SCHEDULED)
# --------------------------------------------------------------------------
#
def _enqueue(self):
"""
**Purpose**: This is the function that is run in the enqueue thread.
This function extracts Tasks from the workflow that exists in
the WFprocessor object and pushes them to the queues in the pending_q
list.
"""
try:
self._prof.prof('enq_start', uid=self._uid)
self._logger.info('enqueue-thread started')
while not self._enqueue_thread_terminate.is_set():
# Raise an exception while running tests
ru.raise_on(tag='enqueue_fail')
workload, scheduled_stages = self._create_workload()
# If there are tasks to be executed
if workload:
self._execute_workload(workload, scheduled_stages)
self._logger.info('Enqueue thread terminated')
self._prof.prof('enq_stop', uid=self._uid)
except KeyboardInterrupt as ex:
self._logger.exception('Execution interrupted by user (you \
probably hit Ctrl+C), trying to cancel \
enqueuer thread gracefully...')
raise KeyboardInterrupt from ex
except Exception as ex:
self._logger.exception('Error in enqueue-thread')
raise EnTKError(ex) from ex
# --------------------------------------------------------------------------
#
def _update_dequeued_task(self, deq_task):
# Traverse the entire workflow to find out the correct Task
# TODO: Investigate whether we can change the DS of the
# workflow so that we don't have this expensive search
# for each task.
# First search across all pipelines
# Note: deq_task is not the same as the task that exists in this process,
# they are different objects and have different state histories.
for pipe in self._workflow:
with pipe.lock:
# Skip pipelines that have completed or are
# currently suspended
if pipe.completed or pipe.state == SUSPENDED:
continue
# Skip pipelines that don't match the UID
# There will be only one pipeline that matches
if pipe.uid != deq_task.parent_pipeline['uid']:
continue
self._logger.debug('Found parent pipeline: %s', pipe.uid)
# Next search across all stages of a matching
# pipelines
assert pipe.stages
stage = None
for stage in pipe.stages:
# Skip stages that don't match the UID
# There will be only one stage that matches
if stage.uid != deq_task.parent_stage['uid']:
continue
self._logger.debug('Found parent stage: %s', stage.uid)
# Search across all tasks of matching stage
for task in stage.tasks:
# Skip tasks that don't match the UID
# There will be only one task that matches
if task.uid != deq_task.uid:
continue
# due to the possibility of race condition with
# AppManager._update_task(), we ensure that task
# attributes "path" and "rts_uid" are set.
if not task.path and deq_task.path:
task.path = str(deq_task.path)
if not task.rts_uid and deq_task.rts_uid:
task.rts_uid = str(deq_task.rts_uid)
if deq_task.metadata:
task.metadata.update(deq_task.metadata)
# If there is no exit code, we assume success
# We are only concerned about state of task and not
# deq_task
if deq_task.exit_code == 0:
task_state = DONE
elif deq_task.exit_code == 1:
task_state = FAILED
else:
task_state = deq_task.state
task.exception = deq_task.exception
task.exception_detail = deq_task.exception_detail
if task.state == FAILED and self._resubmit_failed:
task_state = INITIAL
self._advance(task, 'Task', task_state)
# Found the task and processed it -- no more
# iterations needed
break
# Found the stage and processed it -- no more
# iterations needed for the current task
break
assert stage
# Check if current stage has completed
# If yes, we need to (i) check for post execs to
# be executed and (ii) check if it is the last
# stage of the pipeline -- update pipeline
# state if yes.
if stage._check_stage_complete(): # pylint: disable=W0212
self._advance(stage, 'Stage', DONE)
# Check if the current stage has a post-exec
# that needs to be executed
if stage.post_exec:
self._execute_post_exec(pipe, stage)
# if pipeline got suspended, advance state accordingly
if pipe.state == SUSPENDED:
self._advance(pipe, 'Pipeline', SUSPENDED)
else:
# otherwise perform normal stage progression
pipe._increment_stage() # pylint: disable=W0212
# If pipeline has completed, advance state to DONE
if pipe.completed:
self._advance(pipe, 'Pipeline', DONE)
# Found the pipeline and processed it -- no more
# iterations needed for the current task
break
# --------------------------------------------------------------------------
#
def _execute_post_exec(self, pipe, stage):
"""
**Purpose**: This method executes the post_exec step of a stage for a
pipeline.
"""
try:
self._logger.info('Executing post-exec for stage %s', stage.uid)
self._prof.prof('post_exec_start', uid=self._uid)
resumed_pipe_uids = stage.post_exec()
self._logger.info('Post-exec executed for stage %s', stage.uid)
self._prof.prof('post_exec_stop', uid=self._uid)
except Exception as ex:
self._logger.exception('post_exec of stage %s failed', stage.uid)
self._prof.prof('post_exec_fail', uid=self._uid)
raise EnTKError(ex) from ex
if resumed_pipe_uids:
for r_pipe in self._workflow:
if r_pipe == pipe:
continue
with r_pipe.lock:
if r_pipe.uid in resumed_pipe_uids:
# Resumed pipelines already have the correct state,
# they just need to be synced with the AppMgr.
r_pipe._increment_stage() # pylint: disable=W0212
if r_pipe.completed:
self._advance(r_pipe, 'Pipeline', DONE)
else:
self._advance(r_pipe, 'Pipeline', r_pipe.state)
# --------------------------------------------------------------------------
#
def _dequeue(self):
"""
**Purpose**: This is the function that is run in the dequeue thread.
This function extracts Tasks from the completed queues and updates the
workflow.
"""
try:
self._prof.prof('deq_start', uid=self._uid)
self._logger.info('Dequeue thread started')
while not self._dequeue_thread_terminate.is_set():
msgs = self._zmq_queue['get'].get_nowait(qname='completed',
timeout=100)
if msgs:
for msg in msgs:
deq_task = Task(from_dict=msg)
self._logger.info('Got finished task %s from queue',
deq_task.uid)
self._update_dequeued_task(deq_task)
self._logger.info('Terminated dequeue thread')
self._prof.prof('deq_stop', uid=self._uid)
except KeyboardInterrupt as ex:
self._logger.exception('Execution interrupted by user (you \
probably hit Ctrl+C), trying to exit \
gracefully...')
raise KeyboardInterrupt from ex
except Exception as ex:
self._logger.exception('Error in dequeue-thread')
raise EnTKError(ex) from ex
finally:
self.terminate_processor()
# --------------------------------------------------------------------------
#
# Public Methods
#
def start_processor(self):
"""
**Purpose**: Method to start the wfp process. The wfp function
is not to be accessed directly. The function is started in a separate
process using this method.
"""
try:
self._logger.info('Starting WFprocessor')
self._prof.prof('wfp_start', uid=self._uid)
self._enqueue_thread_terminate = mt.Event()
self._dequeue_thread_terminate = mt.Event()
# Start dequeue thread
self._dequeue_thread = mt.Thread(target=self._dequeue,
name='dequeue-thread')
self._logger.info('Starting dequeue-thread')
self._prof.prof('starting dequeue-thread', uid=self._uid)
self._dequeue_thread.start()
# Start enqueue thread
self._enqueue_thread = mt.Thread(target=self._enqueue,
name='enqueue-thread')
self._logger.info('Starting enqueue-thread')
self._prof.prof('starting enqueue-thread', uid=self._uid)
self._enqueue_thread.start()
self._logger.info('WFprocessor started')
self._prof.prof('wfp_started', uid=self._uid)
except Exception as ex:
self._logger.exception('WFprocessor failed')
self.terminate_processor()
raise EnTKError(ex) from ex
# --------------------------------------------------------------------------
#
def terminate_processor(self):
"""
**Purpose**: Method to terminate the wfp process. This method is
blocking as it waits for the wfp process to terminate (aka join).
"""
try:
tid = mt.current_thread().ident
if self._enqueue_thread:
if not self._enqueue_thread_terminate.is_set():
self._logger.info('Terminating enqueue-thread')
self._enqueue_thread_terminate.set()
if tid != self._enqueue_thread.ident:
self._enqueue_thread.join()
self._enqueue_thread = None
if self._dequeue_thread:
if not self._dequeue_thread_terminate.is_set():
self._logger.info('Terminating dequeue-thread')
self._dequeue_thread_terminate.set()
if tid != self._dequeue_thread.ident:
self._dequeue_thread.join()
self._dequeue_thread = None
self._logger.info('WFprocessor terminated')
self._prof.prof('wfp_stop', uid=self._uid)
self._prof.close()
except Exception as ex:
self._logger.exception('Could not terminate wfprocessor process')
raise EnTKError(ex) from ex
# --------------------------------------------------------------------------
#
def workflow_incomplete(self):
"""
**Purpose**: Method to check if the workflow execution is incomplete.
"""
try:
for pipe in self._workflow:
with pipe.lock:
if pipe.completed:
pass
else:
return True
return False
except Exception as ex:
self._logger.exception(
'Could not check if workflow is incomplete, error:%s' % ex)
raise EnTKError(ex) from ex
# --------------------------------------------------------------------------
#
def check_processor(self):
if self._enqueue_thread and \
self._dequeue_thread and \
self._enqueue_thread.is_alive() and \
self._dequeue_thread.is_alive():
return True
return False
# ------------------------------------------------------------------------------