forked from databio/pypiper
-
Notifications
You must be signed in to change notification settings - Fork 0
/
manager.py
2044 lines (1753 loc) · 91.5 KB
/
manager.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#!/usr/env python
"""
Pypiper is a python module with two components:
1) the PipelineManager class, and
2) other toolkits (currently just NGSTk) with functions for more specific pipeline use-cases.
The PipelineManager class can be used to create a procedural pipeline in python.
"""
import atexit
import datetime
import errno
import glob
import os
import platform
import psutil
import re
import shlex # for splitting commands like a shell does
import signal
import subprocess
import sys
import time
import pandas as _pd
if sys.version_info < (3, 3):
from collections import Iterable
else:
from collections.abc import Iterable
from attmap import AttMapEcho
from hashlib import md5
from yacman import load_yaml
from .exceptions import PipelineHalt, SubprocessError
from .flags import *
from .utils import \
check_shell, checkpoint_filepath, clear_flags, default_pipeline_config, \
flag_name, get_proc_name, is_multi_target, make_lock_name, parse_cmd, \
pipeline_filepath, CHECKPOINT_SPECIFICATIONS
from .const import PROFILE_COLNAMES
from ._version import __version__
import __main__
__all__ = ["PipelineManager"]
LOCK_PREFIX = "lock."
class Unbuffered(object):
def __init__(self, stream):
self.stream = stream
def write(self, data):
self.stream.write(data)
self.stream.flush()
def writelines(self, datas):
self.stream.writelines(datas)
self.stream.flush()
def __getattr__(self, attr):
return getattr(self.stream, attr)
class PipelineManager(object):
"""
Base class for instantiating a PipelineManager object,
the main class of Pypiper.
:param str name: Choose a name for your pipeline;
it's used to name the output files, flags, etc.
:param str outfolder: Folder in which to store the results.
:param argparse.Namespace args: Optional args object from ArgumentParser;
Pypiper will simply record these arguments from your script
:param bool multi: Enables running multiple pipelines in one script
or for interactive use. It simply disables the tee of the output,
so you won't get output logged to a file.
:param bool dirty: Overrides the pipeline's clean_add()
manual parameters, to *never* clean up intermediate files automatically.
Useful for debugging; all cleanup files are added to manual cleanup script.
:param bool recover: Specify recover mode, to overwrite lock files.
If pypiper encounters a locked target, it will ignore the lock and
recompute this step. Useful to restart a failed pipeline.
:param bool new_start: start over and run every command even if output exists
:param bool force_follow: Force run all follow functions
even if the preceding command is not run. By default,
following functions are only run if the preceding command is run.
:param int cores: number of processors to use, default 1
:param str mem: amount of memory to use. Default units are megabytes unless
specified using the suffix [K|M|G|T]."
:param str config_file: path to pipeline configuration file, optional
:param str output_parent: path to folder in which output folder will live
:param bool overwrite_checkpoints: Whether to override the stage-skipping
logic provided by the checkpointing system. This is useful if the
calls to this manager's run() method will be coming from a class
that implements pypiper.Pipeline, as such a class will handle
checkpointing logic automatically, and will set this to True to
protect from a case in which a restart begins upstream of a stage
for which a checkpoint file already exists, but that depends on the
upstream stage and thus should be rerun if it's "parent" is rerun.
:raise TypeError: if start or stop point(s) are provided both directly and
via args namespace, or if both stopping types (exclusive/prospective
and inclusive/retrospective) are provided.
"""
def __init__(
self, name, outfolder, version=None, args=None, multi=False,
dirty=False, recover=False, new_start=False, force_follow=False,
cores=1, mem="1000M", config_file=None, output_parent=None,
overwrite_checkpoints=False, **kwargs):
# Params defines the set of options that could be updated via
# command line args to a pipeline run, that can be forwarded
# to Pypiper. If any pypiper arguments are passed
# (via add_pypiper_args()), these will override the constructor
# defaults for these arguments.
# Establish default params
params = {
'dirty': dirty,
'recover': recover,
'new_start': new_start,
'force_follow': force_follow,
'config_file': config_file,
'output_parent': output_parent,
'cores': cores,
'mem': mem}
# Transform the command-line namespace into a Mapping.
args_dict = vars(args) if args else dict()
# Parse and store stage specifications that can determine pipeline
# start and/or stop point.
# First, add such specifications to the command-line namespace,
# favoring the command-line spec if both are present.
for cp_spec in set(CHECKPOINT_SPECIFICATIONS) & set(kwargs.keys()):
args_dict.setdefault(cp_spec, kwargs[cp_spec])
# Then, ensure that we set each such specification on this manager
# so that we're guaranteed safe attribute access. If it's present,
# remove the specification from the namespace that will be used to
# update this manager's parameters Mapping.
for optname in CHECKPOINT_SPECIFICATIONS:
checkpoint = args_dict.pop(optname, None)
setattr(self, optname, checkpoint)
if self.stop_before and self.stop_after:
raise TypeError("Cannot specify both pre-stop and post-stop; "
"got '{}' and '{}'".format(self.stop_before, self.stop_after))
# Update this manager's parameters with non-checkpoint-related
# command-line parameterization.
params.update(args_dict)
# If no starting point was specified, assume that the pipeline's
# execution is to begin right away and set the internal flag so that
# run() is let loose to execute instructions given.
if not self.start_point:
self._active = True
else:
self._active = False
# Pipeline-level variables to track global state and pipeline stats
# Pipeline settings
self.name = name
self.tee = None
self.overwrite_locks = params['recover']
self.new_start = params['new_start']
self.force_follow = params['force_follow']
self.dirty = params['dirty']
self.cores = params['cores']
self.output_parent = params['output_parent']
# Keep track of an ID for the number of processes attempted
self.proc_count = 0
# We use this memory to pass a memory limit to processes like java that
# can take a memory limit, so they don't get killed by a SLURM (or other
# cluster manager) overage. However, with java, the -Xmx argument can only
# limit the *heap* space, not total memory use; so occasionally SLURM will
# still kill these processes because total memory goes over the limit.
# As a kind of hack, we'll set the java processes heap limit to 95% of the
# total memory limit provided.
# This will give a little breathing room for non-heap java memory use.
if not params['mem'].endswith(('K','M','G','T')):
self.mem = params['mem'] + "M"
else:
# Assume the memory is in megabytes.
self.mem = params['mem']
self.javamem = str(int(int(self.mem[:-1]) * 0.95)) + self.mem[-1:]
self.container = None
self.clean_initialized = False
# Do some cores math for split processes
# If a pipeline wants to run a process using half the cores, or 1/4 of the cores,
# this can lead to complications if the number of cores is not evenly divisible.
# Here we add a few variables so that pipelines can easily divide the cores evenly.
# 50/50 split
self.cores1of2a = int(self.cores) / 2 + int(self.cores) % 2
self.cores1of2 = int(self.cores) / 2
# 75/25 split
self.cores1of4 = int(self.cores) / 4
self.cores3of4 = int(self.cores) - int(self.cores1of4)
self.cores1of8 = int(self.cores) / 8
self.cores7of8 = int(self.cores) - int(self.cores1of8)
self.pl_version = version
# Set relative output_parent directory to absolute
# not necessary after all. . .
#if self.output_parent and not os.path.isabs(self.output_parent):
# self.output_parent = os.path.join(os.getcwd(), self.output_parent)
# File paths:
self.outfolder = os.path.join(outfolder, '') # trailing slash
self.pipeline_log_file = pipeline_filepath(self, suffix="_log.md")
self.pipeline_profile_file = \
pipeline_filepath(self, suffix="_profile.tsv")
# Stats and figures are general and so lack the pipeline name.
self.pipeline_stats_file = \
pipeline_filepath(self, filename="stats.tsv")
self.pipeline_figures_file = \
pipeline_filepath(self, filename="figures.tsv")
self.pipeline_objects_file = \
pipeline_filepath(self, filename="objects.tsv")
# Record commands used and provide manual cleanup script.
self.pipeline_commands_file = \
pipeline_filepath(self, suffix="_commands.sh")
self.cleanup_file = pipeline_filepath(self, suffix="_cleanup.sh")
# Pipeline status variables
self.peak_memory = 0 # memory high water mark
self.starttime = time.time()
self.last_timestamp = self.starttime # time of the last call to timestamp()
self.locks = []
self.running_procs = {}
self.completed_procs = {}
self.wait = True # turn off for debugging
# Initialize status and flags
self.status = "initializing"
# as part of the beginning of the pipeline, clear any flags set by
# previous runs of this pipeline
clear_flags(self)
# In-memory holder for report_result
self.stats_dict = {}
# Checkpoint-related parameters
self.overwrite_checkpoints = overwrite_checkpoints
self.halt_on_next = False
self.prev_checkpoint = None
self.curr_checkpoint = None
# Pypiper can keep track of intermediate files to clean up at the end
self.cleanup_list = []
self.cleanup_list_conditional = []
# Register handler functions to deal with interrupt and termination signals;
# If received, we would then clean up properly (set pipeline status to FAIL, etc).
signal.signal(signal.SIGINT, self._signal_int_handler)
signal.signal(signal.SIGTERM, self._signal_term_handler)
self.start_pipeline(args, multi)
# Handle config file if it exists
# Read YAML config file
# TODO: This section should become a function, so toolkits can use it
# to locate a config file.
config_to_load = None # start with nothing
if config_file:
config_to_load = config_file
else:
cmdl_config_file = getattr(args, "config_file", None)
if cmdl_config_file:
if os.path.isabs(cmdl_config_file):
# Absolute custom config file specified
if os.path.isfile(cmdl_config_file):
config_to_load = cmdl_config_file
else:
#print("Can't find custom config file: " + cmdl_config_file)
pass
else:
# Relative custom config file specified
# Set path to be relative to pipeline script
pipedir = os.path.dirname(sys.argv[0])
abs_config = os.path.join(pipedir, cmdl_config_file)
if os.path.isfile(abs_config):
config_to_load = abs_config
else:
print(__file__)
#print("Can't find custom config file: " + abs_config)
pass
if config_to_load is not None:
pass
# TODO: Switch this message to a debug message using _LOGGER
# print("\nUsing custom config file: {}".format(config_to_load))
else:
# No custom config file specified. Check for default
default_config = default_pipeline_config(sys.argv[0])
if os.path.isfile(default_config):
config_to_load = default_config
print("Using default pipeline config file: {}".
format(config_to_load))
# Finally load the config we found.
if config_to_load is not None:
print("\nLoading config file: {}\n".format(config_to_load))
with open(config_to_load, 'r') as conf:
self.config = AttMapEcho(load_yaml(conf))
else:
print("No config file")
self.config = None
@property
def _completed(self):
"""
Is the managed pipeline in a completed state?
:return bool: Whether the managed pipeline is in a completed state.
"""
return self.status == COMPLETE_FLAG
@property
def _failed(self):
"""
Is the managed pipeline in a failed state?
:return bool: Whether the managed pipeline is in a failed state.
"""
return self.status == FAIL_FLAG
@property
def halted(self):
"""
Is the managed pipeline in a paused/halted state?
:return bool: Whether the managed pipeline is in a paused/halted state.
"""
return self.status == PAUSE_FLAG
@property
def _has_exit_status(self):
"""
Has the managed pipeline been safely stopped?
:return bool: Whether the managed pipeline's status indicates that it
has been safely stopped.
"""
return self._completed or self.halted or self._failed
def _ignore_interrupts(self):
"""
Ignore interrupt and termination signals. Used as a pre-execution
function (preexec_fn) for subprocess.Popen calls that pypiper will
control over (i.e., manually clean up).
"""
signal.signal(signal.SIGINT, signal.SIG_IGN)
signal.signal(signal.SIGTERM, signal.SIG_IGN)
def start_pipeline(self, args=None, multi=False):
"""
Initialize setup. Do some setup, like tee output, print some diagnostics, create temp files.
You provide only the output directory (used for pipeline stats, log, and status flag files).
"""
# Perhaps this could all just be put into __init__, but I just kind of like the idea of a start function
self.make_sure_path_exists(self.outfolder)
# By default, Pypiper will mirror every operation so it is displayed both
# on sys.stdout **and** to a log file. Unfortunately, interactive python sessions
# ruin this by interfering with stdout. So, for interactive mode, we do not enable
# the tee subprocess, sending all output to screen only.
# Starting multiple PipelineManagers in the same script has the same problem, and
# must therefore be run in interactive_mode.
interactive_mode = multi or not hasattr(__main__, "__file__")
if interactive_mode:
print("Warning: You're running an interactive python session. "
"This works, but pypiper cannot tee the output, so results "
"are only logged to screen.")
else:
sys.stdout = Unbuffered(sys.stdout)
# sys.stdout = os.fdopen(sys.stdout.fileno(), 'w', 0) # Unbuffer output
# The tee subprocess must be instructed to ignore TERM and INT signals;
# Instead, I will clean up this process in the signal handler functions.
# This is required because otherwise, if pypiper receives a TERM or INT,
# the tee will be automatically terminated by python before I have a chance to
# print some final output (for example, about when the process stopped),
# and so those things don't end up in the log files because the tee
# subprocess is dead. Instead, I will handle the killing of the tee process
# manually (in the exit handler).
# a for append to file
tee = subprocess.Popen(
["tee", "-a", self.pipeline_log_file], stdin=subprocess.PIPE,
preexec_fn=self._ignore_interrupts)
# If the pipeline is terminated with SIGTERM/SIGINT,
# make sure we kill this spawned tee subprocess as well.
# atexit.register(self._kill_child_process, tee.pid, proc_name="tee")
os.dup2(tee.stdin.fileno(), sys.stdout.fileno())
os.dup2(tee.stdin.fileno(), sys.stderr.fileno())
self.tee = tee
# For some reason, this exit handler function MUST be registered after
# the one that kills the tee process.
atexit.register(self._exit_handler)
# A future possibility to avoid this tee, is to use a Tee class; this works for anything printed here
# by pypiper, but can't tee the subprocess output. For this, it would require using threading to
# simultaneously capture and display subprocess output. I shelve this for now and stick with the tee option.
# sys.stdout = Tee(self.pipeline_log_file)
# Record the git version of the pipeline and pypiper used. This gets (if it is in a git repo):
# dir: the directory where the code is stored
# hash: the commit id of the last commit in this repo
# date: the date of the last commit in this repo
# diff: a summary of any differences in the current (run) version vs. the committed version
# Wrapped in try blocks so that the code will not fail if the pipeline or pypiper are not git repositories
gitvars = {}
try:
# pypiper dir
ppd = os.path.dirname(os.path.realpath(__file__))
gitvars['pypiper_dir'] = ppd
gitvars['pypiper_hash'] = subprocess.check_output("cd " + ppd + "; git rev-parse --verify HEAD 2>/dev/null", shell=True).decode().strip()
gitvars['pypiper_date'] = subprocess.check_output("cd " + ppd + "; git show -s --format=%ai HEAD 2>/dev/null", shell=True).decode().strip()
gitvars['pypiper_diff'] = subprocess.check_output("cd " + ppd + "; git diff --shortstat HEAD 2>/dev/null", shell=True).decode().strip()
gitvars['pypiper_branch'] = subprocess.check_output("cd " + ppd + "; git branch | grep '*' 2>/dev/null", shell=True).decode().strip()
except Exception:
pass
try:
# pipeline dir
pld = os.path.dirname(os.path.realpath(sys.argv[0]))
gitvars['pipe_dir'] = pld
gitvars['pipe_hash'] = subprocess.check_output("cd " + pld + "; git rev-parse --verify HEAD 2>/dev/null", shell=True).decode().strip()
gitvars['pipe_date'] = subprocess.check_output("cd " + pld + "; git show -s --format=%ai HEAD 2>/dev/null", shell=True).decode().strip()
gitvars['pipe_diff'] = subprocess.check_output("cd " + pld + "; git diff --shortstat HEAD 2>/dev/null", shell=True).decode().strip()
gitvars['pipe_branch'] = subprocess.check_output("cd " + pld + "; git branch | grep '*' 2>/dev/null", shell=True).decode().strip()
except Exception:
pass
# Print out a header section in the pipeline log:
# Wrap things in backticks to prevent markdown from interpreting underscores as emphasis.
# print("----------------------------------------")
print("### [Pipeline run code and environment:]\n")
print("* " + "Command".rjust(20) + ": " + "`" + str(" ".join(sys.argv)) + "`")
print("* " + "Compute host".rjust(20) + ": " + platform.node())
print("* " + "Working dir".rjust(20) + ": " + os.getcwd())
print("* " + "Outfolder".rjust(20) + ": " + self.outfolder)
self.timestamp("* " + "Pipeline started at".rjust(20) + ": ")
print("\n### [Version log:]\n")
print("* " + "Python version".rjust(20) + ": " + platform.python_version())
try:
print("* " + "Pypiper dir".rjust(20) + ": " + "`" + gitvars['pypiper_dir'].strip() + "`")
print("* " + "Pypiper version".rjust(20) + ": " + __version__)
print("* " + "Pypiper hash".rjust(20) + ": " + str(gitvars['pypiper_hash']))
print("* " + "Pypiper branch".rjust(20) + ": " + str(gitvars['pypiper_branch']))
print("* " + "Pypiper date".rjust(20) + ": " + str(gitvars['pypiper_date']))
if gitvars['pypiper_diff']:
print("* " + "Pypiper diff".rjust(20) + ": " + str(gitvars['pypiper_diff']))
except KeyError:
# It is ok if keys aren't set, it means pypiper isn't in a git repo.
pass
try:
print("* " + "Pipeline dir".rjust(20) + ": " + "`" + gitvars['pipe_dir'].strip() + "`")
print("* " + "Pipeline version".rjust(20) + ": " + str(self.pl_version))
print("* " + "Pipeline hash".rjust(20) + ": " + str(gitvars['pipe_hash']).strip())
print("* " + "Pipeline branch".rjust(20) + ": " + str(gitvars['pipe_branch']).strip())
print("* " + "Pipeline date".rjust(20) + ": " + str(gitvars['pipe_date']).strip())
if (gitvars['pipe_diff'] != ""):
print("* " + "Pipeline diff".rjust(20) + ": " + str(gitvars['pipe_diff']).strip())
except KeyError:
# It is ok if keys aren't set, it means the pipeline isn't a git repo.
pass
# Print all arguments (if any)
print("\n### [Arguments passed to pipeline:]\n")
for arg, val in (vars(args) if args else dict()).items():
argtext = "`{}`".format(arg)
valtext = "`{}`".format(val)
print("* {}: {}".format(argtext.rjust(20), valtext))
print("\n----------------------------------------\n")
self._set_status_flag(RUN_FLAG)
# Record the start in PIPE_profile and PIPE_commands output files so we
# can trace which run they belong to
with open(self.pipeline_commands_file, "a") as myfile:
myfile.write("# Pipeline started at " + time.strftime("%m-%d %H:%M:%S", time.localtime(self.starttime)) + "\n\n")
with open(self.pipeline_profile_file, "a") as myfile:
myfile.write("# Pipeline started at " + time.strftime("%m-%d %H:%M:%S", time.localtime(self.starttime))
+ "\n\n" + "# " + "\t".join(PROFILE_COLNAMES) + "\n")
def _set_status_flag(self, status):
"""
Configure state and files on disk to match current processing status.
:param str status: Name of new status designation for pipeline.
"""
# Remove previous status flag file.
flag_file_path = self._flag_file_path()
try:
os.remove(flag_file_path)
except:
# Print message only if the failure to remove the status flag
# is unexpected; there's no flag for initialization, so we
# can't remove the file.
if self.status != "initializing":
print("Could not remove flag file: '{}'".format(flag_file_path))
pass
# Set new status.
prev_status = self.status
self.status = status
self._create_file(self._flag_file_path())
print("\nChanged status from {} to {}.".format(
prev_status, self.status))
def _flag_file_path(self, status=None):
"""
Create path to flag file based on indicated or current status.
Internal variables used are the pipeline name and the designated
pipeline output folder path.
:param str status: flag file type to create, default to current status
:return str: path to flag file of indicated or current status.
"""
flag_file_name = "{}_{}".format(
self.name, flag_name(status or self.status))
return pipeline_filepath(self, filename=flag_file_name)
###################################
# Process calling functions
###################################
def run(self, cmd, target=None, lock_name=None, shell=None, nofail=False, clean=False, follow=None, container=None):
"""
The primary workhorse function of PipelineManager, this runs a command.
This is the command execution function, which enforces
race-free file-locking, enables restartability, and multiple pipelines
can produce/use the same files. The function will wait for the file
lock if it exists, and not produce new output (by default) if the
target output file already exists. If the output is to be created,
it will first create a lock file to prevent other calls to run
(for example, in parallel pipelines) from touching the file while it
is being created. It also records the memory of the process and
provides some logging output.
:param str | list[str] cmd: Shell command(s) to be run.
:param str | Sequence[str] target: Output file(s) to produce, optional.
If all target files exist, the command will not be run. If no target
is given, a lock_name must be provided.
:param str lock_name: Name of lock file. Optional.
:param bool shell: If command requires should be run in its own shell.
Optional. Default: None --will try to determine whether the
command requires a shell.
:param bool nofail: Whether the pipeline proceed past a nonzero return from
a process, default False; nofail can be used to implement
non-essential parts of the pipeline; if a 'nofail' command fails,
the pipeline is free to continue execution.
:param bool clean: True means the target file will be automatically added
to an auto cleanup list. Optional.
:param callable follow: Function to call after executing (each) command.
:param str container: Name for Docker container in which to run commands.
:return int: Return code of process. If a list of commands is passed,
this is the maximum of all return codes for all commands.
"""
# If the pipeline's not been started, skip ahead.
if not self._active:
cmds = [cmd] if isinstance(cmd, str) else cmd
cmds_text = [c if isinstance(c, str) else " ".join(c) for c in cmds]
print("Pipeline is inactive; skipping {} command(s):\n{}".
format(len(cmds), "\n".join(cmds_text)))
return 0
# Short-circuit if the checkpoint file exists and the manager's not
# been configured to overwrite such files.
if self.curr_checkpoint is not None:
check_fpath = checkpoint_filepath(self.curr_checkpoint, self)
if os.path.isfile(check_fpath) and not self.overwrite_checkpoints:
print("Checkpoint file exists for '{}' ('{}'), and the {} has "
"been configured to not overwrite checkpoints; "
"skipping command '{}'".format(
self.curr_checkpoint, check_fpath,
self.__class__.__name__, cmd))
return 0
# TODO: consider making the logic such that locking isn't implied, or
# TODO (cont.): that we can make it otherwise such that it's not
# TODO (cont.): strictly necessary to provide target or lock_name.
# The default lock name is based on the target name.
# Therefore, a targetless command that you want
# to lock must specify a lock_name manually.
if target is None and lock_name is None:
self.fail_pipeline(Exception(
"You must provide either a target or a lock_name."))
# Downstream code requires target to be a list, so convert if only
# a single item was given
if not is_multi_target(target) and target is not None:
target = [target]
# Downstream code requires a list of locks; convert
if isinstance(lock_name, str):
lock_name = [lock_name]
# Default lock_name (if not provided) is based on the target file name,
# but placed in the parent pipeline outfolder
lock_name = lock_name or make_lock_name(target, self.outfolder)
lock_files = [self._make_lock_path(ln) for ln in lock_name]
process_return_code = 0
local_maxmem = 0
# Decide how to do follow-up.
if not follow:
call_follow = lambda: None
elif not hasattr(follow, "__call__"):
# Warn about non-callable argument to follow-up function.
print("Follow-up function is not callable and won't be used: {}".
format(type(follow)))
call_follow = lambda: None
else:
# Wrap the follow-up function so that the log shows what's going on.
# additionally, the in_follow attribute is set to enable proper command count handling
def call_follow():
print("Follow:")
self.in_follow = True
follow()
self.in_follow = False
# The while=True loop here is unlikely to be triggered, and is just a
# wrapper to prevent race conditions; the lock_file must be created by
# the current loop. If not, we loop again and then re-do the tests.
# The recover and newstart options inform the pipeline to run a command
# in a scenario where it normally would not. We use these "local" flags
# to allow us to report on the state of the pipeline in the first round
# as normal, but then proceed on the next iteration through the outer
# loop. The proceed_through_locks is a flag that is set if any lockfile
# is found that needs to be recovered or overwritten. It instructs us to
# ignore lock files on the next iteration.
local_recover = False
local_newstart = False
proceed_through_locks = False
while True:
##### Tests block
# Base case: All targets exists and not set to overwrite targets break loop, don't run process.
# os.path.exists returns True for either a file or directory; .isfile is file-only
if target is not None and all([os.path.exists(t) for t in target]) \
and not any([os.path.isfile(l) for l in lock_files]) \
and not local_newstart:
for tgt in target:
if os.path.exists(tgt): print("Target exists: `" + tgt + "`")
if self.new_start:
print("New start mode; run anyway.")
# Set the local_newstart flag so the command will run anyway.
# Doing this in here instead of outside the loop allows us
# to still report the target existence.
local_newstart = True
continue
# Normally we don't run the follow, but if you want to force. . .
if self.force_follow:
call_follow()
# Increment process count
increment_info_pattern = "Skipped command: `{}`\nCommand ID incremented by: `{}`. Current ID: `{}`\n"
if isinstance(cmd, list):
for c in cmd:
count = len(parse_cmd(c, shell))
self.proc_count += count
print(increment_info_pattern.format(str(c), count, self.proc_count))
else:
count = len(parse_cmd(cmd, shell))
self.proc_count += count
print(increment_info_pattern.format(str(cmd), count, self.proc_count))
break # Do not run command
# Scenario 1: Lock file exists, but we're supposed to overwrite target; Run process.
if not proceed_through_locks:
for lock_file in lock_files:
recover_file = self._recoverfile_from_lockfile(lock_file)
if os.path.isfile(lock_file):
print("Found lock file: {}".format(lock_file))
if self.overwrite_locks:
print("Overwriting target. . .")
proceed_through_locks = True
elif os.path.isfile(recover_file):
print("Found dynamic recovery file ({}); "
"overwriting target. . .".format(recover_file))
# remove the lock file which will then be promptly re-created for the current run.
local_recover = True
proceed_through_locks = True
# the recovery flag is now spent; remove so we don't accidentally re-recover a failed job
os.remove(recover_file)
else: # don't overwrite locks
self._wait_for_lock(lock_file)
# when it's done loop through again to try one more
# time (to see if the target exists now)
continue
# If you get to this point, the target doesn't exist, and the lock_file doesn't exist
# (or we should overwrite). create the lock (if you can)
# Initialize lock in master lock list
for lock_file in lock_files:
self.locks.append(lock_file)
if self.overwrite_locks or local_recover:
self._create_file(lock_file)
else:
try:
self._create_file_racefree(lock_file) # Create lock
except OSError as e:
if e.errno == errno.EEXIST: # File already exists
print ("Lock file created after test! Looping again: {}".format(
lock_file))
# Since a lock file was created by a different source,
# we need to reset this flag to re-check the locks.
proceed_through_locks = False
continue # Go back to start
##### End tests block
# If you make it past these tests, we should proceed to run the process.
if target is not None:
print("Target to produce: {}\n".format(",".join(['`'+x+'`' for x in target])))
else:
print("Targetless command, running...\n")
if isinstance(cmd, list): # Handle command lists
for cmd_i in cmd:
list_ret, maxmem = \
self.callprint(cmd_i, shell, lock_file, nofail, container)
maxmem = max(maxmem) if isinstance(maxmem, Iterable) else maxmem
local_maxmem = max(local_maxmem, maxmem)
list_ret = max(list_ret) if isinstance(list_ret, Iterable) else list_ret
process_return_code = max(process_return_code, list_ret)
else: # Single command (most common)
process_return_code, local_maxmem = \
self.callprint(cmd, shell, lock_file, nofail, container) # Run command
if isinstance(process_return_code, list):
process_return_code = max(process_return_code)
# For temporary files, you can specify a clean option to automatically
# add them to the clean list, saving you a manual call to clean_add
if target is not None and clean:
for tgt in target:
self.clean_add(tgt)
call_follow()
for lock_file in lock_files:
os.remove(lock_file) # Remove lock file
self.locks.remove(lock_file)
# If you make it to the end of the while loop, you're done
break
return process_return_code
def checkprint(self, cmd, shell=None, nofail=False):
"""
Just like callprint, but checks output -- so you can get a variable
in python corresponding to the return value of the command you call.
This is equivalent to running subprocess.check_output()
instead of subprocess.call().
:param str | Iterable[str] cmd: Bash command(s) to be run.
:param bool | str shell: If command requires should be run in its own shell. Optional.
Default: "guess" -- `run()` will try to guess if the command should be run in a
shell (based on the presence of a pipe (|) or redirect (>), To force a process to
run as a direct subprocess, set `shell` to False; to force a shell, set True.
:param bool nofail: Should the pipeline bail on a nonzero return from a process? Default: False
Nofail can be used to implement non-essential parts of the pipeline; if these processes fail,
they will not cause the pipeline to bail out.
"""
self._report_command(cmd)
likely_shell = check_shell(cmd, shell)
if shell is None:
shell = likely_shell
if not shell:
if likely_shell:
print("Should this command run in a shell instead of directly in a subprocess?")
cmd = shlex.split(cmd)
try:
return subprocess.check_output(cmd, shell=shell).decode().strip()
except Exception as e:
self._triage_error(e, nofail)
def _attend_process(self, proc, sleeptime):
"""
Waits on a process for a given time to see if it finishes, returns True
if it's still running after the given time or False as soon as it
returns.
:param psutil.Popen proc: Process object opened by psutil.Popen()
:param float sleeptime: Time to wait
:return bool: True if process is still running; otherwise false
"""
# print("attend:{}".format(proc.pid))
try:
proc.wait(timeout=sleeptime)
except psutil.TimeoutExpired:
return True
return False
def callprint(self, cmd, shell=None, lock_file=None, nofail=False, container=None):
"""
Prints the command, and then executes it, then prints the memory use and
return code of the command.
Uses python's subprocess.Popen() to execute the given command. The shell argument is simply
passed along to Popen(). You should use shell=False (default) where possible, because this enables memory
profiling. You should use shell=True if you require shell functions like redirects (>) or stars (*), but this
will prevent the script from monitoring memory use. The pipes (|) will be used to split the command into
subprocesses run within python, so the memory profiling is possible.
cmd can also be a series (a dict object) of multiple commands, which will be run in succession.
:param str | Iterable[str] cmd: Bash command(s) to be run.
:param str lock_file: a lock file name
:param bool nofail: Should the pipeline bail on a nonzero return from a process? Default: False
Nofail can be used to implement non-essential parts of the pipeline; if these processes fail,
they will not cause the pipeline to bail out.
:param bool shell: if the command should be run it its own shell, default: None (will try
to determine based on the command)
:param container: Named Docker container in which to execute.
:param container: str
"""
# The Popen shell argument works like this:
# if shell=False, then we format the command (with split()) to be a list of command and its arguments.
# Split the command to use shell=False;
# leave it together to use shell=True;
def get_mem_child_sum(proc):
try:
# get children processes
children = proc.children(recursive=True)
# get RSS memory of each child proc and sum all
mem_sum = sum([x.memory_info().rss for x in children])
# return in gigs
return mem_sum/1e9
except (psutil.NoSuchProcess, psutil.ZombieProcess) as e:
print(e)
print("Warning: couldn't add memory use for process: {}".format(proc.pid))
return 0
def display_memory(memval):
return None if memval < 0 else "{}GB".format(round(memval, 3))
def make_dict(command):
a, s = (command, True) if check_shell(command, shell) else (shlex.split(command), False)
return dict(args=a, stdout=subprocess.PIPE, shell=s)
def make_hash(o):
"""
Convert the object to string and hash it, return None in case of failure
:param o: object of any type, in our case it is a dict
:return str: hahsed string representation of the dict
"""
try:
hsh = md5(str(o).encode("utf-8")).hexdigest()[:10]
except:
hsh = None
return hsh
if container:
cmd = "docker exec " + container + " " + cmd
param_list = parse_cmd(cmd, shell)
proc_name = get_proc_name(cmd)
processes = []
running_processes = []
completed_processes = []
start_time = time.time()
for i in range(len(param_list)):
running_processes.append(i)
if i == 0:
processes.append(psutil.Popen(preexec_fn=os.setpgrp, **param_list[i]))
else:
param_list[i]["stdin"] = processes[i - 1].stdout
processes.append(psutil.Popen(preexec_fn=os.setpgrp, **param_list[i]))
self.running_procs[processes[-1].pid] = {
"proc_name": get_proc_name(param_list[i]["args"]),
"start_time": start_time,
"container": container,
"p": processes[-1],
"args_hash": make_hash(param_list[i]["args"]),
"local_proc_id": self.process_counter()
}
self._report_command(cmd, [x.pid for x in processes])
# Capture the subprocess output in <pre> tags to make it format nicely
# if the markdown log file is displayed as HTML.
print("<pre>")
local_maxmems = [-1] * len(running_processes)
returncodes = [None] * len(running_processes)
proc_wrapup_text = [None] * len(running_processes)
if not self.wait:
print("</pre>")
ids = [x.pid for x in processes]
print ("Not waiting for subprocesses: " + str(ids))
return [0, -1]
def proc_wrapup(i):
"""
:param i: internal ID number of the subprocess
"""
returncode = processes[i].returncode
current_pid = processes[i].pid
info = "PID: {pid};\tCommand: {cmd};\tReturn code: {ret};\tMemory used: {mem}".format(
pid=current_pid,
cmd=self.running_procs[current_pid]["proc_name"],
ret=processes[i].returncode,
mem=display_memory(local_maxmems[i]))
# report process profile
self._report_profile(self.running_procs[current_pid]["proc_name"], lock_file,
time.time() - self.running_procs[current_pid]["start_time"], local_maxmems[i],
current_pid, self.running_procs[current_pid]["args_hash"],
self.running_procs[current_pid]["local_proc_id"])
# Remove this as a running subprocess
self.running_procs[current_pid]["info"] = info
self.running_procs[current_pid]["returncode"] = returncode
self.completed_procs[current_pid] = self.running_procs[current_pid]
del self.running_procs[current_pid]
running_processes.remove(i)
completed_processes.append(i)
proc_wrapup_text[i] = info
returncodes[i] = returncode
return info
sleeptime = .0001
while running_processes:
for i in running_processes:
local_maxmems[i] = max(local_maxmems[i], (get_mem_child_sum(processes[i])))
self.peak_memory = max(self.peak_memory, local_maxmems[i])
if not self._attend_process(processes[i], sleeptime):
proc_wrapup_text[i] = proc_wrapup(i)
# the sleeptime is extremely short at the beginning and gets longer exponentially
# (+ constant to prevent copious checks at the very beginning)
# = more precise mem tracing for short processes
sleeptime = min((sleeptime + 0.25) * 3, 60/len(processes))
# All jobs are done, print a final closing and job info
stop_time = time.time()
proc_message = "Command completed. {info}"
info = "Elapsed time: " + str(datetime.timedelta(seconds=self.time_elapsed(start_time))) + "."
info += " Running peak memory: {pipe}.".format(pipe=display_memory(self.peak_memory))
# if len(proc_wrapup_text) == 1:
# info += " {}".format(proc_wrapup_text[0])
for i in completed_processes:
info += "\n {}".format(self.completed_procs[processes[i].pid]["info"])
print("</pre>")
print(proc_message.format(info=info))
for rc in returncodes:
if rc != 0:
msg = "Subprocess returned nonzero result. Check above output for details"
self._triage_error(SubprocessError(msg), nofail)
return [returncodes, local_maxmems]
def process_counter(self):
"""