/
create_eos_posterior_pipeline
executable file
·584 lines (506 loc) · 34.8 KB
/
create_eos_posterior_pipeline
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
#! /usr/bin/env python
#
# GOAL
# Simple pipeline: just CIP + fitting (and puff) iteration jobs. Here CIP acts as ILE would in normal pipeline. Input from individual events needed.
#
#
# - Inputs
# args_marg_event.txt # CIP arguments, arguments to compute the marginalized likelihood of each event
# args_eos_post.txt # arguments to compute the posterior from marginal likeliyhood
# - data formats
# Common format for input (with likelihoods) and output
import argparse
import sys
import os
import shutil
import numpy as np
import RIFT.lalsimutils as lalsimutils
import lalsimulation as lalsim
import lal
import functools
import itertools
import json
from glue import pipeline # https://github.com/lscsoft/lalsuite-archive/blob/5a47239a877032e93b1ca34445640360d6c3c990/glue/glue/pipeline.py
import RIFT.misc.dag_utils as dag_utils
from RIFT.misc.dag_utils import mkdir
from RIFT.misc.dag_utils import which
parser = argparse.ArgumentParser()
parser.add_argument("--working-directory",default="./")
parser.add_argument("--input-grid",default=None,help="EOS parameters ")
parser.add_argument("--event-file",default=None,action='append',help="Specify the events here. Will be copied into 'event-X.net' ")
parser.add_argument("--marg-event-args",default=None,help="filename of args_marg_event.txt file which holds CIP arguments. Should NOT conflict with arguments auto-set by this DAG ... in particular, i/o arguments will be modified. ")
parser.add_argument("--marg-event-exe",default=None,help="filename of CIP or equivalent executable. Will default to `which util_ConstructIntrinsicPosterior_GenericCoordinates` in low-level code")
parser.add_argument("--marg-event-exe-list-file",default=None,help="filename of a file containing a LIST of exe ")
parser.add_argument("--marg-event-args-list-file",default=None,help="filename of a file with one line of arguments per event. ")
parser.add_argument("--marg-event-nchunk-list-file",default=None,help="filename of a file with one number per event, indicating the chunk size of its marginalization job (how many points to evaluate per job). Job must accept --using-eos-index and --n-events-to-analyze arguments, like ILE ")
parser.add_argument("--request-memory-marg",default=16384,type=int,help="Memory request for condor (in Mb) for fitting jobs.")
parser.add_argument("--puff-exe",default=None,help="similar to PUFF, but cannot be the same executable since format different")
parser.add_argument("--puff-args",default=None,help=" argumetns")
parser.add_argument("--puff-max-it",default=np.inf,type=int,help="Maximum iteration number that puffball is applied. default to infinity ")
parser.add_argument("--eos-post-args",default=None,help="filename of args_ile.txt file which holds ILE arguments. Should NOT conflict with arguments auto-set by this DAG ... in particular, i/o arguments will be modified")
parser.add_argument("--eos-post-exe",default=None,help="filename of ILE or equivalent executable. Will default to `which integrate_likelihood_extrinsic` in low-level code")
parser.add_argument("--eos-post-retries",default=0,type=int,help="Number of retry attempts for ILE jobs. (These can fail)")
parser.add_argument("--eos-post-explode-jobs",default=2,type=int,help="Number of MARG jobs to use in posterior generation.")
parser.add_argument("--eos-post-explode-jobs-last",default=2,type=int,help="Number of MARG jobs to use in posterior generation on the final stage (if desired).")
parser.add_argument("--test-args",default=None,help="filename of args_test.txt, which holds test arguments. Note i/o arguments will be modified, so should NOT specify the samples files or the output file, just the test to be performed and any related arguments")
parser.add_argument("--general-retries",default=0,type=int,help="Number of retry attempts for internal jobs (convert, CIP, ...). (These can fail, albeit more rarely, usually due to filesystem problems)")
parser.add_argument("--general-request-disk",default="10M",type=str,help="Request disk passed to condor. Must be done for all jobs now")
parser.add_argument("--transfer-file-list",default=None,help="File containing list of *input* filenames to transfer, one name per file. Copied into transfer_files for condor directly. If provided, also enables attempts to deduce files that need to be transferred for the pipeline to operate, as needed for OSG, etc")
parser.add_argument("--use-singularity",action='store_true',help="Attempts to use a singularity image in SINGULARITY_RIFT_IMAGE")
parser.add_argument("--use-osg",action='store_true',help="Attempts to set up an OSG workflow. Must submit from an osg allowed submit machine")
parser.add_argument('--n-samples-per-job',default=2000,type=int,help="Number of samples generated each iteration; also, number of marg jobs run each iteration. Should increase with dimension of problem")
parser.add_argument('--neff-threshold',default=800,type=int,help="Number of samples generated each iteration")
parser.add_argument("--condor-local-nonworker",action='store_true',help="Uses local universe for non-worker condor jobs. Important to run in non-NFS location, as other jobs don't have file transfer set up.")
parser.add_argument("--condor-nogrid-nonworker",action='store_true',help="Uses local flocking for non-worker condor jobs. Important to run in non-NFS location, as other jobs don't have file transfer set up.")
parser.add_argument('--n-iterations',default=3,type=int,help="Number of iterations to perform")
parser.add_argument("--start-iteration",default=0,type=int,help="starting iteration. If >0, does not copy over --input-grid. DOES rewrite sub files. This allows you to change the arguments provided (e.g., use more iterations or settings at late times). Note this overwrites the .sub files ")
parser.add_argument("--use-full-submit-paths",action='store_true',help="DAG created has full paths to submit files generated. Note this is implemented on a per-file/as-needed basis, mainly to facilitate using this dag as an external subdag")
opts= parser.parse_args()
print(opts)
if not(opts.input_grid):
raise Exception(" --input-grid FNAME required ")
local_worker_universe="vanilla"
no_worker_grid=False
if opts.condor_local_nonworker:
local_worker_universe="local"
if opts.condor_nogrid_nonworker:
no_worker_grid=True
working_dir_inside_local = working_dir_inside = opts.working_directory
out_dir_inside_marg = opts.working_directory
out_dir_inside_marg+= "/iteration_$(macroiteration)_marg/event_$(macroid)/"
if opts.use_singularity or opts.use_osg:
working_dir_inside = "./" # all files on the remote machine are in the current directory
singularity_image = None
if opts.use_singularity:
print(" === USING SINGULARITY === ")
singularity_image = os.environ["SINGULARITY_RIFT_IMAGE"] # must be present to use singularity
# SINGULARITY IMAGES ARE ON CVMFS, SO WE CAN AVOID THE SINGULARITY EXEC CALL
# hardcoding a fiducial copy of lalapps_path2cache; beware about the executable name change
os.environ['LALAPPS_PATH2CACHE'] = "/cvmfs/oasis.opensciencegrid.org/ligo/sw/conda/envs/igwn-py39/bin/lal_path2cache" #"singularity exec {singularity_image} lalapps_path2cache".format(singularity_image=singularity_image)
print(singularity_image)
###
### Event files : copy, give standard name (naming convention assumes CIP for now)
###
indx=0
for event in opts.event_file:
shutil.copyfile(event,opts.working_directory+"/event-{}.net".format(indx)) # put in working directory !
indx+=1
n_events = len(opts.event_file)
###
### Process args
###
# Load args.txt. Remove first item. Store
marg_event_args_list=None
marg_event_exe_list=None
marg_event_nchunk_list=None
if opts.marg_event_args:
with open(opts.marg_event_args) as f:
marg_event_args_list = f.readlines()
marg_event_args = ' '.join( [x.replace('\\','') for x in marg_event_args_list] )
marg_event_args = ' '.join(marg_event_args.split(' ')[1:])
# Some argument protection for later
marg_event_args = marg_event_args.replace('[', ' \'[')
marg_event_args = marg_event_args.replace(']', ']\'')
marg_event_args=marg_event_args.rstrip()
print("MARG", marg_event_args)
marg_event_args_list =None
elif opts.marg_event_args_list_file:
with open(opts.marg_event_args_list_file) as f:
marg_event_args_list = f.readlines()
with open(opts.marg_event_exe_list_file) as f:
marg_event_exe_list = f.readlines()
marg_event_nchunk_list = np.ones(len(marg_event_exe_list)) # default: one point per evaluation
if opts.marg_event_nchunk_list_file and os.path.isfile(opts.marg_event_nchunk_list_file):
marg_event_nchunk_list = np.loadtxt(opts.marg_event_nchunk_list_file,dtype=int) # load in 1d array
assert len(marg_event_nchunk_list) == len(marg_event_exe_list)
assert len(marg_event_exe_list) == len(marg_event_args_list) # fail if not true: not matching MARG args to exe
assert len(marg_event_exe_list) == len(opts.event_file) # fail if not true: not matching MARG to event list
for indx in np.arange(len(marg_event_args_list)):
exe_here = marg_event_exe_list[indx]
exe_here.rstrip()
marg_event_exe_list[indx] = exe_here
line = marg_event_args_list[indx]
line = line.rstrip() # remove CR, whitespace
marg_event_args_list[indx]=line
print("MARG {} {} ".format(indx, exe_here), line)
marg_exe_master = None
if opts.marg_event_exe:
marg_exe_master = "{}".format(marg_exe) # force clpy, so NOT BY REFERENCE
if opts.marg_event_args:
with open('command-single.sh','w') as f:
f.write('util_ConstructIntrinsicPosterior_GenericCoordinates.py {} '.format(marg_event_args))
puff_args=None
puff_cadence = None
puff_max_it = opts.puff_max_it
if opts.puff_args:
puff_cadence = 1
# Load args.txt. Remove first item. Store
with open(opts.puff_args) as f:
puff_args_list = f.readlines()
puff_args = ' '.join( [x.replace('\\','') for x in puff_args_list] )
puff_args = ' '.join(puff_args.split(' ')[1:])
# Some argument protection for later
puff_args = puff_args.replace('[', ' \'[')
puff_args = puff_args.replace(']', ']\'')
puff_args=puff_args.rstrip()
print("PUFF", puff_args)
print("PUFF CADENCE", puff_cadence)
# Load args.txt. Remove first item. Store
if True: # not (opts.eos_post_args is None):
with open(opts.eos_post_args) as f:
eos_post_args_list = f.readlines()
eos_post_args = ' '.join( [x.replace('\\','') for x in eos_post_args_list] )
eos_post_args = ' '.join(eos_post_args.split(' ')[1:])
# Some argument protection for later. Make sure spaces in place to avoid breaking compound arguments like g0:[a,b] for ranges.
# intended to make sure any spaces in arguments like [a, b] are encoded as '[a, b]'
# eos_post_args = eos_post_args.replace(' [', ' \'[')
# eos_post_args = eos_post_args.replace('] ', ']\' ')
eos_post_args=eos_post_args.rstrip()
eos_post_args += ' --no-plots '
print("EOS_POST", eos_post_args)
eospost_exe = which('util_ConstructEOSPosterior.py')
if opts.eos_post_exe:
eospost_exe = "{}".format(opts.eos_post_exe) # force clpy, so NOT BY REFERENCE
# Copy seed grid into place as grid-0.dat
it_start = opts.start_iteration
n_initial = opts.n_samples_per_job
if (it_start == 0):
shutil.copyfile(opts.input_grid,"grid-0.dat") # put in working directory !
n_initial = len(np.loadtxt("grid-0.dat"))
transfer_file_names = []
if not (opts.transfer_file_list is None):
transfer_file_names=[]
with open(opts.transfer_file_list) as f:
for line in f.readlines():
transfer_file_names.append(line.rstrip())
print(" Input files to transfer to job working directory (note!)", transfer_file_names)
transfer_file_names_post = list(transfer_file_names) + ['../all.marg_net'] # make a copy
###
### DAG generation
###
dag = pipeline.CondorDAG(log=os.getcwd())
# Make directories for all iterations
for indx in np.arange(it_start,opts.n_iterations+1):
ile_dir = opts.working_directory+"/iteration_"+str(indx)+"_marg"
cip_dir = opts.working_directory+"/iteration_"+str(indx)+"_post"
consolidate_dir = opts.working_directory+"/iteration_"+str(indx)+"_con"
mkdir(ile_dir); mkdir(ile_dir+"/logs")
indx=0
for event in opts.event_file:
mkdir(ile_dir + "/event_{}".format(indx))
indx+=1
mkdir(cip_dir); mkdir(cip_dir+"/logs")
mkdir(consolidate_dir); mkdir(consolidate_dir+"/logs")
if opts.test_args:
test_dir = opts.working_directory+"/iteration_"+str(indx)+"_test"
mkdir(test_dir); mkdir(test_dir+'/logs')
if opts.use_singularity or opts.use_osg:
transfer_file_names_post.append("../grid-$(macroiteration).dat")
# ++++
# Create workflow tasks
# ++++
cip_job_list =[]
cip_puff_job_list =[]
if opts.marg_event_args:
## MARG job (CIP is default)
# - issue is that transfer files depend on event
# - all output goes into the same iter*_marg directory
# - join script will merge all together
cip_job, cip_job_name = dag_utils.write_CIP_sub(tag='MARG',log_dir=None,arg_str=marg_event_args,using_eos='file:'+working_dir_inside_local+"/grid-$(macroiteration).dat",using_eos_index='$(macroevent)',request_memory=opts.request_memory_marg,input_net=working_dir_inside_local+'/event-$(macroid).net',output='MARG-$(macroid)-$(macroevent)',out_dir=out_dir_inside_marg,exe=marg_exe_master,universe=local_worker_universe,no_grid=not(opts.use_osg),use_osg=opts.use_osg,use_singularity=opts.use_osg and opts.use_singularity,singularity_image=singularity_image,use_simple_osg_requirements=opts.use_osg,transfer_files=['../event-$(macroid).net']+transfer_file_names)
# Modify: set 'initialdir'
cip_job.add_condor_cmd("initialdir",opts.working_directory+"/iteration_$(macroiteration)_marg")
# Modify output argument: change logs and working directory to be subdirectory for the run
cip_job.set_log_file(opts.working_directory+"/iteration_$(macroiteration)_marg/logs/marg-$(macroevent)-$(macroid)-$(cluster)-$(process).log")
cip_job.set_stderr_file(opts.working_directory+"/iteration_$(macroiteration)_marg/logs/marg-$(macroevent)-$(macroid)-$(cluster)-$(process).err")
cip_job.set_stdout_file(opts.working_directory+"/iteration_$(macroiteration)_marg/logs/marg-$(macroevent)-$(macroid)-$(cluster)-$(process).out")
cip_job.add_condor_cmd('request_disk',opts.general_request_disk)
if opts.use_full_submit_paths:
fname = opts.working_directory+"/"+cip_job.get_sub_file()
cip_job.set_sub_file(fname)
cip_job.write_sub_file()
# PUFF variant
cipPuff_job, cipPuff_job_name = dag_utils.write_CIP_sub(tag='MARG_PUFF',log_dir=None,arg_str=marg_event_args,using_eos='file:'+working_dir_inside_local+"/grid_puff-$(macroiteration).dat",using_eos_index='$(macroevent)',request_memory=opts.request_memory_marg,input_net=working_dir_inside_local+'/event-$(macroid).net',output='MARG_puff-$(macroid)-$(macroevent)',out_dir=out_dir_inside_marg,exe=marg_exe_master,universe=local_worker_universe,no_grid=not(opts.use_osg),use_osg=opts.use_osg,use_singularity=opts.use_osg and opts.use_singularity,singularity_image=singularity_image,use_simple_osg_requirements=opts.use_osg,transfer_files=['../event-$(macroid).net']+transfer_file_names)
# Modify: set 'initialdir'
cipPuff_job.add_condor_cmd("initialdir",opts.working_directory+"/iteration_$(macroiteration)_marg")
# Modify output argument: change logs and working directory to be subdirectory for the run
cipPuff_job.set_log_file(opts.working_directory+"/iteration_$(macroiteration)_marg/logs/margPuff-$(macroevent)-$(cluster)-$(process).log")
cipPuff_job.set_stderr_file(opts.working_directory+"/iteration_$(macroiteration)_marg/logs/margPuff-$(macroevent)-$(cluster)-$(process).err")
cipPuff_job.set_stdout_file(opts.working_directory+"/iteration_$(macroiteration)_marg/logs/margPuff-$(macroevent)-$(cluster)-$(process).out")
cipPuff_job.add_condor_cmd('request_disk',opts.general_request_disk)
if opts.use_full_submit_paths:
fname = opts.working_directory+"/"+cipPuff_job.get_sub_file()
cipPuff_job.set_sub_file(fname)
cipPuff_job.write_sub_file()
else:
for indx in np.arange(len(marg_event_args_list)):
n_chunk_here = int(marg_event_nchunk_list[indx])
cip_job, cip_job_name = dag_utils.write_CIP_sub(tag='MARG_{}'.format(indx),log_dir=None,exe=marg_event_exe_list[indx],arg_str=marg_event_args_list[indx],using_eos='file:'+working_dir_inside_local+"/grid-$(macroiteration).dat",using_eos_index='$(macroevent)',n_events_to_analyze=n_chunk_here,request_memory=opts.request_memory_marg,input_net=working_dir_inside_local+'/event-$(macroid).net',output='MARG-$(macroid)-$(macroevent)',out_dir=out_dir_inside_marg,universe=local_worker_universe,no_grid=not(opts.use_osg),use_osg=opts.use_osg,use_singularity=opts.use_osg and opts.use_singularity,singularity_image=singularity_image,use_simple_osg_requirements=opts.use_osg,transfer_files=['../event-$(macroid).net']+transfer_file_names)
# Modify: set 'initialdir'
cip_job.add_condor_cmd("initialdir",opts.working_directory+"/iteration_$(macroiteration)_marg")
# Modify output argument: change logs and working directory to be subdirectory for the run
cip_job.set_log_file(opts.working_directory+"/iteration_$(macroiteration)_marg/logs/marg-$(macroevent)-$(macroid)-$(cluster)-$(process).log")
cip_job.set_stderr_file(opts.working_directory+"/iteration_$(macroiteration)_marg/logs/marg-$(macroevent)-$(macroid)-$(cluster)-$(process).err")
cip_job.set_stdout_file(opts.working_directory+"/iteration_$(macroiteration)_marg/logs/marg-$(macroevent)-$(macroid)-$(cluster)-$(process).out")
cip_job.add_condor_cmd('request_disk',opts.general_request_disk)
if opts.use_full_submit_paths:
fname = opts.working_directory+"/"+cip_job.get_sub_file()
cip_job.set_sub_file(fname)
cip_job.write_sub_file()
cip_job_list.append(cip_job)
# PUFF variant
cipPuff_job, cipPuff_job_name = dag_utils.write_CIP_sub(tag='MARG_PUFF_{}'.format(indx),log_dir=None,exe=marg_event_exe_list[indx],arg_str=marg_event_args_list[indx],using_eos='file:'+working_dir_inside_local+"/grid_puff-$(macroiteration).dat",using_eos_index='$(macroevent)',n_events_to_analyze=n_chunk_here,request_memory=opts.request_memory_marg,input_net=working_dir_inside_local+'/event-$(macroid).net',output='MARG_puff-$(macroid)-$(macroevent)',out_dir=out_dir_inside_marg,universe=local_worker_universe,no_grid=not(opts.use_osg),use_osg=opts.use_osg,use_singularity=opts.use_osg and opts.use_singularity,singularity_image=singularity_image,use_simple_osg_requirements=opts.use_osg,transfer_files=['../event-$(macroid).net']+transfer_file_names)
# Modify: set 'initialdir'
cipPuff_job.add_condor_cmd("initialdir",opts.working_directory+"/iteration_$(macroiteration)_marg")
# Modify output argument: change logs and working directory to be subdirectory for the run
cipPuff_job.set_log_file(opts.working_directory+"/iteration_$(macroiteration)_marg/logs/margPuff-$(macroevent)-$(macroid)-$(cluster)-$(process).log")
cipPuff_job.set_stderr_file(opts.working_directory+"/iteration_$(macroiteration)_marg/logs/margPuff-$(macroevent)-$(macroid)-$(cluster)-$(process).err")
cipPuff_job.set_stdout_file(opts.working_directory+"/iteration_$(macroiteration)_marg/logs/margPuff-$(macroevent)-$(macroid)-$(cluster)-$(process).out")
cipPuff_job.add_condor_cmd('request_disk',opts.general_request_disk)
if opts.use_full_submit_paths:
fname = opts.working_directory+"/"+cipPuff_job.get_sub_file()
cipPuff_job.set_sub_file(fname)
cipPuff_job.write_sub_file()
cip_puff_job_list.append(cipPuff_job)
## CON job 1 : consolidate single events
# - joins together all data files from a single run
# - matches duplicates, averaging likelihoods
# - if multiple events present, works fine,but will AVERAGE and not ADD, so not as significant as needed. Change?
# - skip for now, just have one event at firsty ==>? SONOT QUTE CORRECT because likelihoods are averaged, which reduces net significance. FIXME: two-stage process
with open("con_marg.sh",'w') as f:
f.write("""#! /bin/bash
{} {}/iteration_$1_marg/event_$2/MARG*[0-9]+annotation.dat > {}/iteration_$1_marg/consolidated_$1_$2.net_marg
""".format(dag_utils.which('util_HyperCombine.py'),opts.working_directory,opts.working_directory) )
os.system("chmod a+x con_marg.sh")
con_marg_job, con_marg_job_name = dag_utils.write_convert_sub(exe=opts.working_directory+"/con_marg.sh",tag='CON',log_dir=None,arg_str='',file_input="$(macroiteration) $(macroevent) ", out_dir=opts.working_directory,universe=local_worker_universe,no_grid=no_worker_grid)
con_marg_job.add_condor_cmd("initialdir",opts.working_directory)
con_marg_job.set_log_file(opts.working_directory+"/iteration_$(macroiteration)_con/logs/con-$(macroevent).log")
con_marg_job.set_stdout_file(opts.working_directory+"/iteration_$(macroiteration)_con/logs/con-$(macroevent).out")
con_marg_job.set_stderr_file(opts.working_directory+"/iteration_$(macroiteration)_con/logs/con-$(macroevent).err")
con_marg_job.add_condor_cmd('request_disk',opts.general_request_disk)
if opts.use_full_submit_paths:
fname = opts.working_directory+"/"+con_marg_job.get_sub_file()
con_marg_job.set_sub_file(fname)
if opts.use_osg:
con_marg_job.add_condor_cmd("+DESIRED_SITES",'"nogrid"')
con_marg_job.add_condor_cmd("+flock_local",'true')
con_marg_job.write_sub_file()
## CON job 2 : join multiple events together into a single overall result
# - joins together all data files from a single run
# - matches duplicates, averaging likelihoods
# - if multiple events present, works fine,but will AVERAGE and not ADD, so not as significant as needed. Change?
# - skip for now, just have one event at firsty ==>? SONOT QUTE CORRECT because likelihoods are averaged, which reduces net significance. FIXME: two-stage process
with open("con_prod.sh",'w') as f:
f.write("""#! /bin/bash
{} --combination product {}/iteration_$1_marg/consolidated_*_*.net_marg > {}/consolidated_$1.net_marg
""".format(dag_utils.which('util_HyperCombine.py'),opts.working_directory,opts.working_directory) )
os.system("chmod a+x con_prod.sh")
con_prod_job, con_prod_job_name = dag_utils.write_convert_sub(exe=opts.working_directory+"/con_prod.sh",tag='CON_PROD',log_dir=None,arg_str='',file_input="$(macroiteration) ", out_dir=opts.working_directory,universe=local_worker_universe,no_grid=no_worker_grid)
con_prod_job.add_condor_cmd("initialdir",opts.working_directory)
con_prod_job.set_log_file(opts.working_directory+"/iteration_$(macroiteration)_con/logs/con-$(macroevent).log")
con_prod_job.set_stdout_file(opts.working_directory+"/iteration_$(macroiteration)_con/logs/con-$(macroevent).out")
con_prod_job.set_stderr_file(opts.working_directory+"/iteration_$(macroiteration)_con/logs/con-$(macroevent).err")
con_prod_job.add_condor_cmd('request_disk',opts.general_request_disk)
if opts.use_full_submit_paths:
fname = opts.working_directory+"/"+con_prod_job.get_sub_file()
con_prod_job.set_sub_file(fname)
if opts.use_osg:
con_prod_job.add_condor_cmd("+DESIRED_SITES",'"nogrid"')
con_prod_job.add_condor_cmd("+flock_local",'true')
con_prod_job.write_sub_file()
## unify job
# - joins together all files into one
with open("unify.sh",'w') as f:
f.write("""#! /bin/bash
cat *.net_marg | sort -r > all.marg_net
""")
os.system("chmod a+x unify.sh")
unify_marg_job, unify_marg_job_name = dag_utils.write_convert_sub(exe=opts.working_directory+"/unify.sh",tag='UNIFY',log_dir=None,arg_str='',file_input='',out_dir=opts.working_directory,universe=local_worker_universe,no_grid=no_worker_grid)
unify_marg_job.add_condor_cmd("initialdir",opts.working_directory)
unify_marg_job.set_log_file(opts.working_directory+"/iteration_$(macroiteration)_con/logs/con-$(macroevent).log")
unify_marg_job.set_stdout_file(opts.working_directory+"/iteration_$(macroiteration)_con/logs/con-$(macroevent).out")
unify_marg_job.set_stderr_file(opts.working_directory+"/iteration_$(macroiteration)_con/logs/con-$(macroevent).err")
unify_marg_job.add_condor_cmd('request_disk',opts.general_request_disk)
if opts.use_full_submit_paths:
fname = opts.working_directory+"/"+unify_marg_job.get_sub_file()
unify_marg_job.set_sub_file(fname)
if opts.use_osg:
unify_marg_job.add_condor_cmd("+DESIRED_SITES",'"nogrid"')
unify_marg_job.add_condor_cmd("+flock_local",'true')
unify_marg_job.write_sub_file()
## EOS_POST job (workers)
# - main hyperparameter integration executable
eospost_marg_job, eospost_marg_job_name = dag_utils.write_hyperpost_sub(exe=eospost_exe,tag='EOS_POST_worker',arg_str=eos_post_args,out_dir=opts.working_directory,output='iteration_$(macroiteration)_post/output-$(macroiterationnext)-$(cluster)',input_net=working_dir_inside + "/all.marg_net",transfer_files=transfer_file_names_post)
eospost_marg_job.add_condor_cmd("initialdir",opts.working_directory+"/iteration_$(macroiteration)_post")
eospost_marg_job.set_log_file(opts.working_directory+"/iteration_$(macroiteration)_post/logs/post-$(cluster)-$(process).log")
eospost_marg_job.set_stderr_file(opts.working_directory+"/iteration_$(macroiteration)_post/logs/post-$(cluster)-$(process).err")
eospost_marg_job.set_stdout_file(opts.working_directory+"/iteration_$(macroiteration)_post/logs/post-$(cluster)-$(process).out")
eospost_marg_job.add_condor_cmd('request_disk',opts.general_request_disk)
if opts.use_full_submit_paths:
fname = opts.working_directory+"/"+eospost_marg_job.get_sub_file()
eospost_marg_job.set_sub_file(fname)
eospost_marg_job.write_sub_file()
# worker join job
with open("join_post.sh",'w') as f:
f.write("""#! /bin/bash
head -n 1 grid-0.dat > $2
cat $1/output*.dat | sort -r | uniq >> $2
""")
os.system("chmod a+x join_post.sh")
join_post_job, join_post_job_name = dag_utils.write_convert_sub(exe=opts.working_directory+"/join_post.sh",tag='JOIN_POST',log_dir=None,arg_str='',file_input=opts.working_directory+"/iteration_$(macroiteration)_post/ "+opts.working_directory+"/grid-$(macroiterationnext).dat", out_dir=opts.working_directory,universe=local_worker_universe,no_grid=no_worker_grid)
join_post_job.add_condor_cmd("initialdir",opts.working_directory)
join_post_job.set_log_file(opts.working_directory+"/iteration_$(macroiteration)_con/logs/unify-$(macroevent).log")
join_post_job.set_stdout_file(opts.working_directory+"/iteration_$(macroiteration)_con/logs/unify-$(macroevent).out")
join_post_job.set_stderr_file(opts.working_directory+"/iteration_$(macroiteration)_con/logs/unify-$(macroevent).err")
join_post_job.add_condor_cmd('request_disk',opts.general_request_disk)
if opts.use_full_submit_paths:
fname = opts.working_directory+"/"+join_post_job.get_sub_file()
join_post_job.set_sub_file(fname)
join_post_job.write_sub_file()
if opts.use_osg:
join_post_job.add_condor_cmd("+DESIRED_SITES",'"nogrid"')
join_post_job.add_condor_cmd("+flock_local",'true')
# PUFF job
# ... pending
if puff_args and puff_cadence:
puff_job, puff_job_name = dag_utils.write_puff_sub(tag='PUFF',log_dir=None,arg_str=puff_args,request_memory=opts.request_memory_marg,input_net=opts.working_directory+'/grid-$(macroiterationnext).dat',output=opts.working_directory+'/grid_puff-$(macroiterationnext).dat',out_dir=opts.working_directory,exe=opts.puff_exe,universe=local_worker_universe,no_grid=no_worker_grid)
# Modify: set 'initialdir' to CIP WORKING DIR
puff_job.add_condor_cmd("initialdir",opts.working_directory+"/iteration_$(macroiteration)_post")
# Modify output argument: change logs and working directory to be subdirectory for the run
puff_job.set_log_file(opts.working_directory+"/iteration_$(macroiteration)_post/logs/puff-$(cluster)-$(process).log")
puff_job.set_stderr_file(opts.working_directory+"/iteration_$(macroiteration)_post/logs/puff-$(cluster)-$(process).err")
puff_job.set_stdout_file(opts.working_directory+"/iteration_$(macroiteration)_post/logs/puff-$(cluster)-$(process).out")
puff_job.add_condor_cmd('request_disk',opts.general_request_disk)
if opts.use_full_submit_paths:
fname = opts.working_directory+"/"+puff_job.get_sub_file()
puff_job.set_sub_file(fname)
puff_job.write_sub_file()
if opts.use_osg:
puff_job.add_condor_cmd("+DESIRED_SITES",'"nogrid"')
puff_job.add_condor_cmd("+flock_local",'true')
# ++++
# Create workflow
# ++++
# Create workflow
# - Create grid node as needed
# - Loop over iterations
# - if iteration0, use seed grid (should already be copied in place)
# - if not iteration 0, grid should be in place (from previous stage)
# - Loop over events, make ILE node per event
# - create consolidate job, make it depend on all events in that iteration
# - create fit job, make it depend on consolidate job
#
parent_fit_node = None
last_node=None
unify_node_list = []
for it in np.arange(it_start,opts.n_iterations):
print(it, opts.n_iterations)
consolidate_now = None
fit_node_now = None
marg_nodes_now =[]
# Create consolidate job 2
con_prod_node = pipeline.CondorDAGNode(con_prod_job)
con_prod_node.add_macro("macroiteration",it)
con_prod_node.set_retry(opts.general_retries)
# Create unify job
unify_node = pipeline.CondorDAGNode(unify_marg_job)
unify_node.add_macro("macroiteration",it)
unify_node.set_retry(opts.general_retries)
if not(it ==0): # and not(opts.first_iteration_jumpstart): # don't require first composite to be nonempty if we are running a jumpstart!
unify_node_list.append(unify_node)
# Create one node per job. (Will change on per event basis)
n_group = 1
n_jobs_this_time = opts.n_samples_per_job # hardcode for now?
if it ==it_start:
n_jobs_this_time = n_initial
indx_max = int((1.0*n_jobs_this_time)/n_group)
if indx_max*n_jobs_this_time < n_group:
indx_max+=1
if True: #not(it==it_start): # and opts.first_iteration_jumpstart): # if on first iteration ,don't do this for jumpstart
for event in np.arange(n_events):
n_chunk_here = marg_event_nchunk_list[event]
indx_max_event = indx_max/n_chunk_here
# Create consolidate job 1 for that event
con_node = pipeline.CondorDAGNode(con_marg_job)
con_node.add_macro("macroiteration",it)
con_node.add_macro("macroevent",int(event))
con_node.set_retry(opts.general_retries)
for row in np.arange(indx_max_event): #np.arange(n_jobs_this_time):
# Add task per MARG operation
if opts.marg_event_args:
cip_node = pipeline.CondorDAGNode(cip_job)
else:
cip_node = pipeline.CondorDAGNode(cip_job_list[event])
cip_node.set_retry(opts.general_retries)
cip_node.add_macro("macroid",event)
cip_node.add_macro("macroevent", int(row*n_chunk_here))
cip_node.add_macro("macroiteration", it)
if not(parent_fit_node is None):
cip_node.add_parent(parent_fit_node)
con_node.add_parent(cip_node) # consolidate depends on all of the individual jobs
dag.add_node(cip_node)
# if PUFF is done for this event, also add MARG_PUFF nodes. Note grid size assumed identical, so don't need another loop over 'id'/row
if puff_args and puff_cadence:
if it>it_start and it <= puff_max_it and (it-1)%puff_cadence ==0: # we made a puffball last iteration, so run it through ILE now
if opts.marg_event_args:
cipPuff_node = pipeline.CondorDAGNode(cipPuff_job)
else:
cipPuff_node = pipeline.CondorDAGNode(cip_puff_job_list[event])
cipPuff_node.set_retry(opts.general_retries)
cipPuff_node.add_macro("macroid",int(event))
cipPuff_node.add_macro("macroevent", int(row*n_chunk_here))
cipPuff_node.add_macro("macroiteration", it)
if not(parent_fit_node is None):
cipPuff_node.add_parent(parent_fit_node)
con_node.add_parent(cipPuff_node) # consolidate depends on all of the individual jobs
dag.add_node(cipPuff_node)
# Add node to combine event into consolidated_*_*.net_marg in subdirectory, at end of for loop
dag.add_node(con_node)
# Add node to combine all events. con_prod requires all con nodes done
con_prod_node.add_parent(con_node)
unify_node.add_parent(con_prod_node)
# add con job, then unify
dag.add_node(con_prod_node)
dag.add_node(unify_node)
parent_fit_node=unify_node
# Create EOS_POST nodes. Explode out workers
# Create fit node, which depends on consolidate node
print(" Exploding workers out ")
# Create job to consolidate worker outputs
join_node =pipeline.CondorDAGNode(join_post_job)
join_node.add_macro("macroiteration", it)
join_node.add_macro("macroiterationnext", it+1)
join_node.set_category("join_post")
join_node.set_retry(opts.general_retries)
# Create exploded worker job nodes
n_explode = opts.eos_post_explode_jobs
# if we are on the last iteration and we want to use more exploded jobs now, explode more jobs
if opts.eos_post_explode_jobs_last and (it == opts.n_iterations-1): # last iteration
print(" Last iteration explode size ", opts.eos_post_explode_jobs_last)
n_explode = opts.eos_post_explode_jobs_last
for indx in np.arange(n_explode):
worker_node =pipeline.CondorDAGNode(eospost_marg_job)
worker_node.add_macro("macroiteration", it)
worker_node.add_macro("macroiterationnext", it+1)
worker_node.set_category("EOS_post_worker")
worker_node.add_parent(parent_fit_node) # only fit if we have results from the previous iteration
worker_node.set_retry(opts.general_retries)
join_node.add_parent(worker_node) # make sure to add worker node as parent
dag.add_node(worker_node)
dag.add_node(join_node)
parent_fit_node=join_node
# Check if puffball being created, and if so create it *immediately* after the above join creates the final 'grid-X.dat' file
if puff_args and puff_cadence:
if it > -1 and it <= puff_max_it and it%puff_cadence ==0:
print(" Puffball for iteration ", it)
puff_node = pipeline.CondorDAGNode(puff_job)
puff_node.add_macro("macroiteration", it)
puff_node.add_macro("macroiterationnext", it+1)
puff_node.set_category("PUFF")
puff_node.set_retry(opts.general_retries)
if not (parent_fit_node is None):
puff_node.add_parent(parent_fit_node) # only fit if we have results from the previous iteration
dag.add_node(puff_node)
parent_fit_node = puff_node
dag_name="marginalize_hyperparameters"
dag.set_dag_file(dag_name)
dag.write_concrete_dag()