-
Notifications
You must be signed in to change notification settings - Fork 6
/
executor.py
4468 lines (3499 loc) · 163 KB
/
executor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
import io
import os
import re
import csv
import copy
import json
import yaml
import base64
import random
import string
import logging
import bashlex
import tarfile
import zipfile
import networkx as nx
import importlib
try:
import zlib
compression = zipfile.ZIP_DEFLATED
except (ImportError, AttributeError):
compression = zipfile.ZIP_STORED
import argparse
from collections import defaultdict
from pprint import pprint
from itertools import product
logging.basicConfig(level=logging.DEBUG)
class OBC_Executor_Exception(Exception):
'''
'''
pass
def detect_circles(graph, start, end):
'''
https://stackoverflow.com/questions/40833612/find-all-cycles-in-a-graph-implementation
'''
fringe = [(start, [])]
while fringe:
state, path = fringe.pop()
if path and state == end:
yield path
continue
for next_state in graph[state]:
if next_state in path:
continue
fringe.append((next_state, path+[next_state]))
bash_patterns = {
'parse_json' : r'''
function obc_parse_json()
{
echo $1 | \
sed -e "s/.*$2\":[ ]*\"\([^\"]*\)\".*/\1/"
}
''' + '\n', # https://stackoverflow.com/a/26655887/5626738
'curl_send_json': '''curl -s --header "Content-Type: application/json" --request POST -d '{json}' {url}''',
'command_to_variable': '''{variable}=$({command})''',
'string_contains': '''
if [[ ${variable} == *'{string}'* ]]; then
{contains_yes}
else
{contains_no}
fi
''',
'fail': 'echo "{error_message}"\nexit 1\n',
'update_server_status': r'''
function update_server_status()
{
if [[ $obc_current_token =~ ^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$ ]]; then
local command="curl {insecure}-s --header \"Content-Type: application/json\" --request POST -d '{\"token\": \"$obc_current_token\", \"status\": \"$1\"}' {server}/report/"
local c=$(eval "$command")
if [[ $c == *'"success": true'* ]]; then
obc_current_token=$(obc_parse_json "$c" "token")
echo obc_current_token=\"${obc_current_token}\" > ${OBC_WORK_PATH}/obc_current_token.sh
else
if [[ $c == *'"success": false'* ]]; then
obc_error_message=$(obc_parse_json "$c" "error_message")
echo "Server Return Error: $obc_error_message"
# exit 1
else
echo "Server does not respond, or unknown error"
# exit 1
fi
fi
else
:
fi
}
''',
'base64_decode': r'''
function obc_base64_decode() {
echo $1 | base64 --decode
}
''',
'validate': r'''
function obc_validate() {
local command="$(obc_base64_decode $1)"
eval $command
}
''',
'init_report': r'''
if [ -n "${OBC_WORK_PATH}" ] ; then
export OBC_REPORT_PATH=${OBC_WORK_PATH}/${OBC_NICE_ID}.html
export OBC_REPORT_DIR=${OBC_WORK_PATH}/${OBC_NICE_ID}
mkdir -p ${OBC_REPORT_DIR}
echo "OBC: Report filename: ${OBC_REPORT_PATH}"
cat > ${OBC_REPORT_PATH} << OBCENDOFFILE
<!DOCTYPE html>
<html lang="en">
<head>
<meta http-equiv="Content-Type" content="text/html; charset=utf-8" />
</head>
<body>
<p>
OpenBio Server: <a href="${OBC_SERVER}">${OBC_SERVER}</a> <br>
Workflow: <a href="${OBC_SERVER}/w/${OBC_WORKFLOW_NAME}/${OBC_WORKFLOW_EDIT}">${OBC_WORKFLOW_NAME}/${OBC_WORKFLOW_EDIT}</a> <br>
<p>
<h3>Intermediate Variables:</h3>
<ul>
<!-- {{INTERMEDIATE_VARIABLE}} -->
</ul>
<p>
<h3>Output Variables:</h3>
<ul>
<!-- {{OUTPUT_VARIABLE}} -->
</ul>
</body>
</html>
OBCENDOFFILE
fi
''',
'function_REPORT': r'''
export OBC_REPORT_PATH=${OBC_WORK_PATH}/${OBC_NICE_ID}.html
export OBC_REPORT_DIR=${OBC_WORK_PATH}/${OBC_NICE_ID}
function REPORT() {
if [ -n "${OBC_WORK_PATH}" ] ; then
local VAR=$1
local TIMENOW=$(date)
local WHOCALLEDME=$(caller 0 | awk '{print $2}')
if [ -z $3 ] ; then
local TAG=INTERMEDIATE_VARIABLE
else
local TAG=$3
fi
if [ ${TAG} == "INTERMEDIATE_VARIABLE" ] ; then
local EXTRA="${TIMENOW}. Called from: ${WHOCALLEDME}"
else
local EXTRA=""
fi
local FILEKIND=$(file "${2}")
# echo "OBC: FILE RESULT ${FILEKIND}"
if [[ $FILEKIND == *"PNG image data"* ]]; then
local NEWFILENAME=${OBC_REPORT_DIR}/$(basename ${2})
local LOCALFILENAME=${OBC_NICE_ID}/$(basename ${2})
cp ${2} ${NEWFILENAME}
local HTML="<li>${EXTRA} ${VAR}: <br><img src=\"${LOCALFILENAME}\"></li>\\\\n <!-- {{${TAG}}} -->\\\\n"
elif [[ $FILEKIND == *"PDF document"* ]]; then
local NEWFILENAME=${OBC_REPORT_DIR}/$(basename ${2})
local LOCALFILENAME=${OBC_NICE_ID}/$(basename ${2})
cp ${2} ${NEWFILENAME}
local HTML="<li>${EXTRA} ${VAR}: <br><a href=\"${LOCALFILENAME}\">${LOCALFILENAME}</a></li>\\\\n <!-- {{${TAG}}} -->\\\\n"
elif [[ -f ${2} ]] ; then
local NEWFILENAME=${OBC_REPORT_DIR}/$(basename ${2})
local LOCALFILENAME=${OBC_NICE_ID}/$(basename ${2})
cp ${2} ${NEWFILENAME}
local HTML="<li>${EXTRA} ${VAR}: <br><a href=\"${LOCALFILENAME}\">${LOCALFILENAME}</a></li>\\\\n <!-- {{${TAG}}} -->\\\\n"
else
local VALUE=$(echo "${2}" | sed 's/&/\\\&/g; s/</\\\</g; s/>/\\\>/g; s/"/\\\"/g; s/'"'"'/\\\'/g')
local HTML="<li>${EXTRA} ${VAR}=${VALUE}</li>\\\\n <!-- {{${TAG}}} -->\\\\n"
fi
sed -i -e "s|<\!-- {{${TAG}}} -->|${HTML}|" ${OBC_REPORT_PATH}
sed 's/\\n/\
/g' ${OBC_REPORT_PATH} > ${OBC_REPORT_PATH}.tmp
mv ${OBC_REPORT_PATH}.tmp ${OBC_REPORT_PATH}
fi
}
''',
'final_report': r'''
OBC_REPORT_TGZ=${OBC_WORK_PATH}/${OBC_NICE_ID}.tgz
#echo "RUNNING: "
#echo "tar zcf ${OBC_REPORT_TGZ} -C ${OBC_WORK_PATH} ${OBC_NICE_ID}.html ${OBC_NICE_ID}/"
tar zcf ${OBC_REPORT_TGZ} -C ${OBC_WORK_PATH} ${OBC_NICE_ID}.html ${OBC_NICE_ID}/
''',
'function_PARALLEL': r'''
function PARALLEL() {
local line_counter=0
local PIDS=() #
if [[ $2 == *$'\n'* ]] ; then
while IFS= read -r line; do
if [[ -z "${line// }" ]] ; then
continue # Ignore empty lines
fi
let "line_counter=line_counter+1"
if [ $line_counter -eq 1 ] ; then
IFS=',' read -ra header <<< "$line"
local header_length=${#header[@]}
let "header_length_0=header_length-1"
continue
fi
IFS=',' read -ra line_splitted <<< "$line"
local line_length=${#line_splitted[@]}
if [ $header_length -ne $line_length ] ; then
OBC_ERROR="Line:${line_counter} ${line} contains ${line_length} fields whereas the header has ${header_length} fields."
return
fi
# Set parameters
for i in $(seq 0 ${header_length_0})
do
declare "${header[${i}]}=${line_splitted[${i}]}"
done
#echo "Calling step: $1"
eval ${1} &
P=$!
PIDS=("${PIDS[@]}" ${P})
done <<< "$2"
else
for var in "$@" ; do
#echo ${var}
eval ${var} &
P=$!
IDS=("${PIDS[@]}" ${P})
done
fi
wait "${PIDS[@]}"
OBC_ERROR=""
}
''',
}
r'''
A="
PARAM_1,PARAM_2
1,2
3,4
5,6
7,8
9,10
"
PARALLEL step_example "$A"
# a = re.findall(r'[\w]+=\"[\w\s,\.]+\"[\s]+PARALLEL[\s]+[\w]+[\s]+\"\$[\w]+\"', text)
PARALLEL step_example_1 step_example_2
'''
bash_patterns['get_json_value'] = '{variable}=$(obc_parse_json "${json_variable}" "{json_key}")'
# Global parameters
g = {
'silent': False,
'CLIENT_OBC_DATA_PATH': '/usr/local/airflow/REPORTS/DATA',
'CLIENT_OBC_TOOL_PATH': '/usr/local/airflow/REPORTS/TOOL',
'CLIENT_OBC_WORK_PATH': '/usr/local/airflow/REPORTS/WORK',
'possible_letters_nice_id': tuple(string.ascii_lowercase + string.digits),
}
def log_info(message):
'''
'''
if not g['silent']:
logging.info(message)
def setup_bash_patterns(args):
'''
Change some values of the bash patterns according to arg arguments
'''
bash_patterns['update_server_status'] = bash_patterns['update_server_status'].replace('{server}', args.server) # .format does not work since it contains "{"
bash_patterns['update_server_status'] = bash_patterns['update_server_status'].replace('{insecure}', '-k ' if args.insecure else '')
## Helper functions
def base64_encode(s):
'''
Takes a string and converts it to a base64 string
'''
return base64.b64encode(s.encode()).decode('ascii')
class Workflow:
'''
'''
WORKFLOW_TYPE = 'workflow'
INPUT_TYPE = 'input'
OUTPUT_TYPE = 'output'
STEP_TYPE = 'step'
TOOL_TYPE = 'tool'
# Variables that are assumed to be set from the environment
DEFAULT_INPUT_VARIABLES = [
'OBC_TOOL_PATH',
'OBC_DATA_PATH',
'OBC_WORK_PATH',
]
EXIT_ON_ERROR = True # Should the workflow exit on error?
def __init__(self,
workflow_filename=None,
workflow_object=None,
workflow_string=None,
askinput='JSON',
obc_server=None,
workflow_id=None,
):
'''
workflow_filename: the JSON filename of the workflow
workflow_object: The representation of the workflow
workflow_string: A string representation of the JSON workflow object (used in workflow upload)
askinput:
JSON: Ask for input during convertion to BASH
BASH: Ask for input in BASH
One of these should not be None
obc_server the server for which we are generating the script (if any)
workflow_id: The nice_id of the workflow
'''
# One of workflow_filename, workflow_object, workflow_string can be declared
declared = [x[0] for x in (
('workflow_filename', workflow_filename),
('workflow_object', workflow_object),
('workflow_string', workflow_string),
)
if x[1]
]
if len(declared) == 0:
raise OBC_Executor_Exception('None of workflow_filename, workflow_object, workflow_string have been declared')
if len(declared) > 1:
raise OBC_Executor_Exception(
f'Only one of workflow_filename, workflow_object, workflow_string can be declared. You declared: '
', '.join(declared)
)
self.workflow_filename = workflow_filename
self.workflow_object = workflow_object
self.workflow_string = workflow_string
self.askinput = askinput
self.obc_server = obc_server
self.workflow_id = workflow_id
self.parse_workflow_filename()
def __str__(self,):
'''
'''
return json.dumps(self.workflow, indent=4)
@staticmethod
def create_nice_id(length=8):
'''
Create a nice id
'''
return ''.join(random.sample(g['possible_letters_nice_id'], length))
def tool_bash_script_generator(self,):
'''
'''
tool_installation_order = self.get_tool_installation_order()
for tool in tool_installation_order:
yield tool
def get_tool_dependencies(self, tool):
'''
Returns the tools that this tool depends from
'''
def rec(tool):
ret = []
for tool_slash_id in tool['dependencies']:
tool_d = self.tool_slash_id_d[tool_slash_id]
ret.append(Workflow.get_tool_slash_id(tool_d))
ret.extend(rec(tool_d))
return ret
ret = set(rec(tool))
return [self.tool_slash_id_d[x] for x in ret]
def get_tool_dependent_variables(self, tool, include_this_tool=False):
'''
Get the variables for which this tool is dependent
include_this_tool: If True, include also the variables of this tool
'''
def recursion(tool, the_list):
for tool_slash_id in tool['dependencies']:
tool_d = self.tool_slash_id_d[tool_slash_id]
for variable in tool_d['variables']:
the_list.append((variable, tool_d))
recursion(tool_d, the_list)
the_list = []
recursion(tool, the_list)
if include_this_tool:
for variable in tool['variables']:
the_list.append((variable, tool))
return the_list
def parse_workflow_filename(self, ):
'''
Parse and perform sanity tests
'''
if self.workflow_filename:
try:
with open(self.workflow_filename) as f:
self.workflow = json.load(f)
except FileNotFoundError:
raise OBC_Executor_Exception(f'File {self.workflow_filename} does not exist')
except json.decoder.JSONDecodeError as e:
raise OBC_Executor_Exception(f'File {self.workflow_filename} does not seem to be in JSON format. Error: {str(e)}')
elif self.workflow_string:
try:
self.workflow = json.loads(self.workflow_string)
except json.decoder.JSONDecodeError as e:
raise OBC_Executor_Exception(f'Could not parse workflow_string as JSON. Error: {str(e)}')
elif self.workflow_object:
self.workflow = self.workflow_object
self.input_parameters = self.get_input_parameters()
self.root_workflow = self.get_root_workflow()
self.root_workflow_id = self.root_workflow['id']
self.root_step = self.get_root_step()
self.root_inputs_outputs = self.get_input_output_from_workflow(self.root_workflow)
self.output_parameters = self.root_inputs_outputs['outputs']
self.nice_id = self.workflow['nice_id'] # The nice ID from the server
self.nice_id_local = Workflow.create_nice_id() # A local nice ID
if self.nice_id: # The id from the JSON
self.nice_id_global = self.nice_id
elif self.workflow_id: # The id from the executor
self.nice_id_global = self.workflow_id
else:
self.nice_id_global = self.nice_id_local # The id created in this class
self.current_token = self.workflow['token']
log_info('Workflow Name: {} Edit: {} Report: {}'.format(
self.root_workflow['name'], self.root_workflow['edit'], self.nice_id,
))
# Apply some integrity checks
for node in self.node_iterator():
# Every node has a type
assert 'type' in node
# Every node has a id
assert 'id' in node
# Every node has a belongto
assert 'belongto' in node
# Assert proper fields and types in belongto
if not node['belongto'] is None:
assert 'name' in node['belongto']
assert 'edit' in node['belongto']
assert type(node['belongto']['name']) is str
assert type(node['belongto']['edit']) is int
# Check that that there are no circular dependencies
self.check_tool_dependencies_for_circles();
# Check that all root input are set
self.input_parameter_values = {}
self.input_unset_variables = [] # IDs of variables that have not be set by any step
log_info('Checking for input values.')
for root_input_node in self.root_inputs_outputs['inputs']:
var_set = False
for arg_input_name, arg_input_value in self.input_parameters.items():
if arg_input_value is None:
break
if arg_input_name == root_input_node['id']:
log_info(' {}={}'.format(root_input_node['id'], arg_input_value))
self.input_parameter_values[root_input_node['id']] = {'value': arg_input_value, 'description': root_input_node['description']}
var_set = True
break
if not var_set:
self.input_unset_variables.append(root_input_node);
user_message = Workflow.create_input_parameter_message(root_input_node['id'], root_input_node['description'])
if self.askinput == 'JSON':
local_input_parameter = input(user_message)
self.input_parameter_values[root_input_node['id']] = {'value': local_input_parameter, 'description': root_input_node['description']}
elif self.askinput == 'BASH':
pass # Do nothing
elif self.askinput == 'NO':
log_info('Warning: Input Parameter {} ({}) has not been set by any step.'.format(root_input_node['id'], root_input_node['description']))
# Set None values
self.input_parameter_values[root_input_node['id']] = {'value': None, 'description': root_input_node['description']}
# Check that all output_parameterss will be eventually set
for root_output_node in self.root_inputs_outputs['outputs']:
found_output_filling_step = False
for step_node in self.node_iterator():
if not self.is_step(step_node):
continue
if root_output_node['id'] in step_node['outputs']:
found_output_filling_step = True
continue
if not found_output_filling_step and (not self.askinput in ['BASH']):
# If askinput = BASH don't raise exception
message = 'Output {} ({}) is not set by any step!'.format(root_output_node['id'], root_output_node['description'])
raise OBC_Executor_Exception(message)
# Confirm that all main workflows have exactly one main step
# Confirm that all sub workflows have exactly one sub main step
for workflow in self.get_all_workflows():
# Is this the main workflow?
if self.is_root_workflow(workflow):
main_counter = sum(step['main'] for step in self.get_steps_from_workflow(workflow))
main_str = 'MAIN'
else:
main_counter = sum(step['sub_main'] for step in self.get_steps_from_workflow(workflow))
main_str = 'SUB'
if main_counter == 0:
message = '{} Workflow {} has 0 main steps'.format(main_str, workflow['id'])
raise OBC_Executor_Exception(message)
if main_counter > 1:
message = '{} Workflow {} has more than one ({}) main steps'.format(main_str, workflow['id'], main_counter)
raise OBC_Executor_Exception(message)
# Create a dictionary. Keys are tool slash id. values are tools
self.tool_slash_id_d = {self.get_tool_slash_id(tool):tool for tool in self.tool_iterator()}
# Create a dictionary. Keys are tool slash ids. Values are lists of the slash ids of their dependencies
self.tool_run_afters = {self.get_tool_dash_id(t, no_dots=True): [self.get_tool_dash_id(self.tool_slash_id_d[x], no_dots=True) for x in t['dependencies']] for t in self.tool_iterator()}
# Create a dictionary. Keys are tool ids. Values are tuples: (variables from which they depend from, dependent tool)
# This does not contain the variables of the tool that is the key
self.tool_dependent_variables = {self.get_tool_dash_id(tool, no_dots=True):self.get_tool_dependent_variables(tool) for tool in self.tool_iterator()}
# Create a dictionary. Keys are tool ids. Values are tuples: (variables from which they depend from, dependent tool)
# It also contains the variables of the tool that is the key
self.tool_variables = {self.get_tool_dash_id(tool, no_dots=True):self.get_tool_dependent_variables(tool, include_this_tool=True) for tool in self.tool_iterator()}
# Create a dictionary. Keys are tool variable ids . Values are the tools in which they belong.
self.tool_variables_ids = {self.get_tool_bash_variable(tool, tool_variable['name']):tool for tool in self.tool_iterator() for tool_variable in tool['variables']}
# Create a dictionary. Keys are step ids. Values are tool objects
self.step_ids = {step['id']:step for step in self.step_iterator()}
# Create a dictionary. Keys are input ids. Values are input objects
self.input_ids = {inp['id']:inp for inp in self.inputs_iterator()}
# Create a dictionary. Keys a output ids. Values are output objects
self.output_ids = {outp['id']:outp for outp in self.outputs_iterator()}
self.set_step_reads_sets()
def set_step_reads_sets(self,):
'''
The 'inputs' and 'outputs' fields for every step does NOT contain which variables are set and read respectively.
It contains which variables are **referred**
Here we make an assumption:
If a step refers to an input variable and they belong to the same workflow, then the step READS the variable
If a step refers to an output variable and they belong to the same workflow, then the step SETS the variable
'''
# keys are input ids. Values are the steps who set these values
self.input_setters = defaultdict(list)
# Keys are output ids. Values are the steps who set htese values
self.output_setters = defaultdict(list)
for step in self.step_iterator():
step['inputs_reads'] = []
step['inputs_sets'] = []
step['outputs_sets'] = []
step['outputs_reads'] = []
for step_input in step['inputs']:
input_node = self.input_ids[step_input]
if input_node['belongto'] == step['belongto']:
#print ('Step: {} Reads: {}'.format(step['id'], step_input))
step['inputs_reads'].append(step_input)
else:
step['inputs_sets'].append(step_input)
self.input_setters[step_input].append(step['id'])
for step_output in step['outputs']:
output_node = self.output_ids[step_output]
if output_node['belongto'] == step['belongto']:
#print ('Step: {} Sets: {}'.format(step['id'], step_output))
step['outputs_sets'].append(step_output)
self.output_setters[step_output].append(step['id'])
else:
step['outputs_reads'].append(step_output)
def check_tool_dependencies_for_circles(self,):
'''
'''
# Get all tools
all_tools = [node for node in self.node_iterator() if self.is_tool(node)]
# Contruct the tool graph
graph = {self.get_tool_slash_id(tool): tool['dependencies'] for tool in all_tools}
for tool_id in graph.keys():
circles = detect_circles(graph, tool_id, tool_id)
try:
circle = next(circles)
except StopIteration as e:
pass # This is supposed to happen
else:
message = 'Found circular tool dependency!'
message += '\n' + ' --> '.join(circle)
raise OBC_Executor_Exception(message)
def check_step_calls_for_circles(self,):
'''
'''
#Construct the step graph
graph = {step['id']: step['steps'] for step in self.step_iterator()}
for step_id in graph.keys():
circles = detect_circles(graph, step_id, step_id)
try:
circle = next(circles)
except StopIteration as e:
pass
else:
circle_str = ' --> '.join(circle)
return circle_str
return ''
def get_token_set_bash_commands(self, ):
'''
'''
return 'obc_current_token="{}"'.format(self.current_token)
def show_basic_info(self,):
'''
self.root_workflow['name'], self.root_workflow['edit'], self.nice_id,
'''
ret = '\n'
ret += 'OBC_WORKFLOW_NAME="{}"\n'.format(self.root_workflow['name'])
ret += 'OBC_WORKFLOW_EDIT={}\n'.format(self.root_workflow['edit'])
ret += 'OBC_NICE_ID="{}"\n'.format(self.nice_id_global)
ret += 'OBC_SERVER="{}"\n'.format(self.obc_server)
ret += 'echo "OBC: Workflow name: ${OBC_WORKFLOW_NAME}"\n'
ret += 'echo "OBC: Workflow edit: ${OBC_WORKFLOW_EDIT}"\n'
ret += f'echo "OBC: Workflow report: {self.nice_id}"\n'
ret += f'echo "OBC: Server URL: {self.obc_server}"\n'
ret += '\n'
return ret
def get_input_parameters_read_bash_commands(self, ):
'''
Bash commands for reading input/output
'''
ret = '\n'
if self.input_unset_variables:
# Read unset variables from the command line
# https://github.com/kantale/OpenBioC/issues/154
ret += Workflow.read_arguments_from_commandline([x['id'] for x in self.input_unset_variables])
# Check if the variable has been read from command line. If not halt execution and prompt for a value
for unset_variable in self.input_unset_variables:
ret += 'if [ -z ${{{}+x}} ]; then\n'.format(unset_variable['id']) # https://stackoverflow.com/questions/3601515/how-to-check-if-a-variable-is-set-in-bash
ret += ' echo "{}"\n'.format(Workflow.create_input_parameter_message(unset_variable['id'], unset_variable['description']))
ret += ' read -p "{}=" {}\n'.format(unset_variable['id'], unset_variable['id'])
ret += 'fi\n'
return ret
@staticmethod
def create_input_parameter_message(variable_id, variable_description):
'''
Message to display when input variable has not been set.
'''
return 'OBC: Input parameter: {} ({}) has not been set by any step. Enter value: '.format(variable_id, variable_description)
@staticmethod
def read_arguments_from_commandline(arguments):
'''
Help from: https://stackoverflow.com/questions/192249/how-do-i-parse-command-line-arguments-in-bash
'''
ret = ''
ret += 'for i in "$@"\n'
ret += 'do\n'
ret += 'case $i in\n'
for variable_id in arguments:
ret += ' --{VARIABLE_ID}=*)\n'.format(VARIABLE_ID=variable_id)
ret += ' {VARIABLE_ID}="${{i#*=}}"\n'.format(VARIABLE_ID=variable_id)
ret += ' shift\n'
ret += ' ;;\n'
ret += ' *)\n'
ret += ' ;;\n'
ret += 'esac\n'
ret += 'done\n\n'
return ret
def get_tool_bash_commands(self, tool,
validation=True,
update_server_status=True,
read_variables_from_command_line=False,
variables_json_filename=None,
variables_sh_filename_read=None,
variables_sh_filename_write=None,
):
'''
update_server_status: boolean, should we update the server status?
variables_json_filename: Create a json file that contains the values of the variables (if None it does not create it)
read_variables_from_command_line: Assume that this is running from a cammand line file.sh script. Read the arguments from command line
variables_sh_filename_read: If it is set (list), then read all variables from all files in the list
variables_sh_filename_write: If it is set (string), then create a bash filename with all variables
'''
log_info('Building installation bash commands for: {}'.format(tool['label']))
tool_id = Workflow.get_tool_dash_id(tool, no_dots=True)
# Add Bash commands
ret = '### BASH INSTALLATION COMMANDS FOR TOOL: {}\n'.format(tool['label'])
ret += 'echo "OBC: INSTALLING TOOL: {}"\n'.format(tool['label'])
if update_server_status:
ret += Workflow.bash_tool_installation_started(tool) + '\n'
if read_variables_from_command_line:
arguments = [Workflow.get_tool_bash_variable(dependent_tool, dependent_variable['name'])
for dependent_variable, dependent_tool in self.tool_dependent_variables[tool_id]]
ret += Workflow.read_arguments_from_commandline(arguments)
if variables_sh_filename_read:
for filename in variables_sh_filename_read:
ret += '### READING VARIABLES FROM {}\n'.format(filename)
ret += '. {}\n\n'.format(filename)
# Do we exit on commands?
if Workflow.EXIT_ON_ERROR:
tool['installation_commands'] = f'\nset -e\n{tool["installation_commands"]}\nset +x\n'
# We are adding the installation commands in parenthesis.
# By doing so, we are isolating the raw installation commands with the rest pre- and post- commands
ret += '(\n:\n' + tool['installation_commands'] + '\n)\n' # Add A bash no-op command (:) to avoid empty installation instructions
if Workflow.EXIT_ON_ERROR:
ret += 'EXITCODE=$?\n'
ret += 'if [ $EXITCODE -ne 0 ] ; then\n'
ret += ' echo OBC: INSTALLATION OF TOOL: {} FAILED\n'.format(tool['label'])
ret += ' exit $EXITCODE\n'
ret += 'fi\n'
ret += 'echo "OBC: INSTALLATION OF TOOL: {} . COMPLETED"\n'.format(tool['label'])
ret += '### END OF INSTALLATION COMMANDS FOR TOOL: {}\n\n'.format(tool['label'])
if validation:
# Add Bash validation commands
ret += '### BASH VALIDATION COMMANDS FOR TOOL: {}\n'.format(tool['label'])
ret += 'echo "OBC: VALIDATING THE INSTALLATION OF THE TOOL: {}"\n'.format(tool['label'])
#ret += tool['validation_commands'] + '\n'
#validation_script_filename = tool['label'].replace('/', '__') + '__validation.sh'
#ret += "cat > {} << ENDOFFILE\n".format(validation_script_filename) # Add 'ENDOFFILE' in single quotes to have raw input
if Workflow.EXIT_ON_ERROR:
tool['validation_commands'] = f'\nset -e\n{tool["validation_commands"]}\nset +e\n'
ret += '(\n:\n' + tool['validation_commands'] + '\n)\n' # Run validation commands in a dedicated environment () . Add A bash no-op command (:) to avoid empty installation instructions
#ret += 'ENDOFFILE\n\n'
#ret += 'chmod +x {}\n'.format(validation_script_filename)
#ret += './{}\n'.format(validation_script_filename)
ret += 'EXITCODE=$?\n'
ret += 'if [ $EXITCODE -eq 0 ] ; then\n'
ret += ' echo "OBC: VALIDATION FOR TOOL: {} SUCCEEDED"\n'.format(tool['label'])
ret += 'else\n'
ret += ' echo "OBC: VALIDATION FOR TOOL: {} FAILED"\n'.format(tool['label'])
if Workflow.EXIT_ON_ERROR:
ret += ' exit $EXITCODE\n'
ret += 'fi\n\n'
ret += '### END OF VALIDATION COMMANDS FOR TOOL: {}\n\n'.format(tool['label'])
if update_server_status:
ret += Workflow.bash_tool_installation_finished(tool) + '\n'
ret += '### SETTING TOOL VARIABLES FOR: {}\n'.format(tool['label'])
for tool_variable in tool['variables']:
tool_bash_variable=self.get_tool_bash_variable(tool, tool_variable['name'])
ret += 'export {}="{}" # {} \n'.format(tool_bash_variable, tool_variable['value'], tool_variable['description'])
ret += 'echo "OBC: SET {}=\\"${}\\" <-- {} "\n'.format(tool_bash_variable, tool_bash_variable, tool_variable['description'])
ret += '### END OF SETTING TOOL VARIABLES FOR: {}\n\n'.format(tool['label'])
if variables_json_filename:
ret += '### CREATING JSON FILE WITH TOOL VARIABLES\n'
ret += "cat > {} << ENDOFFILE\n".format(variables_json_filename) # Add 'ENDOFFILE' for raw input
ret += Workflow.get_tool_bash_variables_json(tool) + '\n'
ret += 'ENDOFFILE\n\n'
if variables_sh_filename_write:
ret += '### CREATING BASH WITH TOOL VARIABLES\n'
ret += "cat > {} << ENDOFFILE\n".format(variables_sh_filename_write)
for tool_variable in tool['variables']:
tool_bash_variable=self.get_tool_bash_variable(tool, tool_variable['name'])
ret +='{VAR}="{VALUE}"\n'.format(VAR=tool_bash_variable, VALUE=tool_variable['value'])
ret += 'ENDOFFILE\n'
return ret
@staticmethod
def get_edge_id(source_id, target_id):
return f'{source_id}..{target_id}'
def get_input_bash_commands(self,):
'''
'''
ret = '### SET ROOT WORKFLOW INPUT PARAMETERS\n'
for variable, data in self.input_parameter_values.items():
ret += '{}="{}" # {}\n'.format(variable, data['value'], data['description'])
ret += '### END OF SET ROOT WORKFLOW INPUT PARAMETERS'
return ret
def get_output_bash_commands(self,):
ret = '### PRINT OUTPUT PARAMETERS\n'
ret += 'echo "OBC: Output Variables:"\n'
for output_parameter in self.output_parameters:
ret += 'echo "OBC: {} = ${{{}}}"\n'.format(output_parameter['id'], output_parameter['id'])
ret += 'REPORT {} ${{{}}} OUTPUT_VARIABLE \n'.format(output_parameter['id'], output_parameter['id'])
ret += '### END OF PRINTING OUTPUT PARAMETERS\n'
return ret
def get_step_bash_commands(self, ):
'''
TODO: CREATE AN ORDERING ACCORDING TO WORKFLOWS!
'''
ret = '### SETTING BASH FUNCTIONS FOR STEPS\n\n'
for a_node in self.node_iterator():
if not self.is_step(a_node):
continue
ret += '# STEP: {}\n'.format(a_node['id'])
ret += '{} () {{\n'.format(a_node['id'])
ret += "OBC_WHOCALLEDME=$(caller 0 | awk '{print $2}') \n"
ret += 'if [ ${OBC_WHOCALLEDME} == "PARALLEL" ] ; then \n '
ret += " OBC_WHOCALLEDME=$(caller 1 | awk '{print $2}') \n"
ret += "fi\n"
ret += 'echo "OBC: CALLING STEP: {} CALLER: $OBC_WHOCALLEDME"\n'.format(a_node['id']) #
ret += 'update_server_status "step started {} $OBC_WHOCALLEDME"\n'.format(a_node['id'])
ret += a_node['bash'] + '\n'
ret += 'update_server_status "step finished {}"\n'.format(a_node['id'])
ret += '}\n'
ret += '### END OF SETTING BASH FUNCTIONS FOR STEPS\n'
return ret
def get_main_step_bash_commands(self,):
'''
'''
ret = '### CALLING MAIN STEP\n'
ret += '{}\n'.format(self.root_step['id'])
ret += '### END OF CALLING MAIN STEP\n'
return ret
@staticmethod
def get_tool_slash_id(tool):
'''
'''
return '/'.join([tool['name'], tool['version'], str(tool['edit'])])
@staticmethod
def get_tool_dash_id(tool, no_dots=False):
'''
'''
ret = '__'.join([tool['name'], tool['version'], str(tool['edit'])])
if no_dots:
ret = ret.replace('.', '_')
return ret
@staticmethod
def get_tool_cytoscape_id(tool, no_dots=True):
'''
'''
return f'{Workflow.get_tool_dash_id(tool, no_dots=no_dots)}__2'
@staticmethod
def create_tool_from_cytoscape_id(cytoscape_tool_id):
'''
'''
s = cytoscape_tool_id.split('__')
return {
'name': s[0],
'version': s[1],
'edit': int(s[2]),
}
@staticmethod
def get_tool_vars_filename(tool):
'''
'''
return Workflow.get_tool_vars_filename_tool_id(Workflow.get_tool_dash_id(tool, no_dots=True))
@staticmethod
def get_tool_vars_filename_tool_id(tool_id):
'''
'''
return '{TOOL_ID}_VARS.sh'.format(TOOL_ID=tool_id)
@staticmethod
def get_tool_bash_variable(tool, variable_name):
'''
'''
return '{}__{}'.format(Workflow.get_tool_dash_id(tool, no_dots=True), variable_name)
@staticmethod
def get_tool_bash_variables_json(tool):
'''
'''
d = {Workflow.get_tool_bash_variable(tool, tool_variable['name']): tool_variable['value'] for tool_variable in tool['variables']}
return json.dumps(d, indent=4)
@staticmethod
def tool_label_to_object(tool_label):
'''
The inverse of get_tool_slash_id
'''
name, version, edit = tool_label.split('/')
return {
'name': name,
'version': version,
'edit': int(edit),
}