-
Notifications
You must be signed in to change notification settings - Fork 1
/
RunJobEvent.py
2494 lines (1940 loc) · 106 KB
/
RunJobEvent.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# Class definition:
# RunJobEvent: module for receiving and processing events from the Event Service
# Instances are generated with RunJobFactory via pUtil::getRunJob()
# Implemented as a singleton class
# http://stackoverflow.com/questions/42558/python-and-the-singleton-pattern
# Import relevant python/pilot modules
from RunJob import RunJob # Parent RunJob class
from pUtil import tolog, writeToFileWithStatus # Logging method that sends text to the pilot log
# Standard python modules
import os
import re
import sys
import time
import stat
import atexit
import signal
import commands
import traceback
from optparse import OptionParser
from json import loads
from shutil import copy2
from xml.dom import minidom
# Pilot modules
import Job
import Node
import Site
import pUtil
import RunJobUtilities
import Mover as mover
from JobRecovery import JobRecovery
from FileStateClient import updateFileStates, dumpFileStates
from ErrorDiagnosis import ErrorDiagnosis # import here to avoid issues seen at BU with missing module
from PilotErrors import PilotErrors
from StoppableThread import StoppableThread
from pUtil import debugInfo, tolog, isAnalysisJob, readpar, createLockFile, getDatasetDict, getChecksumCommand,\
tailPilotErrorDiag, getFileAccessInfo, getCmtconfig, getExperiment, getEventService, httpConnect,\
getSiteInformation, getGUID
from FileHandling import getExtension, addToOSTransferDictionary
from EventRanges import downloadEventRanges, updateEventRange
try:
from PilotYamplServer import PilotYamplServer as MessageServer
except Exception, e:
MessageServer = None
print "RunJobEvent caught exception:",e
class RunJobEvent(RunJob):
# private data members
__runjob = "RunJobEvent" # String defining the sub class
__instance = None # Boolean used by subclasses to become a Singleton
__error = PilotErrors() # PilotErrors object
__errorCode = 0 # Error code, e.g. set by stage-out method
__experiment = "ATLAS" # Current experiment (can be set with pilot option -F <experiment>)
__pilotserver = "localhost" # Default server
__pilotport = 88888 # Default port
__failureCode = None # Set by signal handler when user/batch system kills the job
__pworkdir = "/tmp" # Site work dir used by the parent
__logguid = None # GUID for the log file
__pilotlogfilename = "pilotlog.txt" # Default pilotlog filename
__stageinretry = None # Number of stage-in tries
__stageoutretry = None # Number of stage-out tries
__pilot_initdir = "" # location of where the pilot is untarred and started
__proxycheckFlag = True # True (default): perform proxy validity checks, False: no check
__globalPilotErrorDiag = "" # Global pilotErrorDiag used with signal handler (only)
__globalErrorCode = 0 # Global error code used with signal handler (only)
__inputDir = "" # Location of input files (source for mv site mover)
__outputDir = "" # Location of output files (destination for mv site mover)
__taskID = "" # TaskID (needed for OS transfer file and eventually for job metrics)
__event_loop_running = False # Is the event loop running?
__output_files = [] # A list of all files that have been successfully staged-out, used by createFileMetadata()
__guid_list = [] # Keep track of downloaded GUIDs
__lfn_list = [] # Keep track of downloaded LFNs
__eventRange_dictionary = {} # eventRange_dictionary[event_range_id] = [path, cpu, wall]
__eventRangeID_dictionary = {} # eventRangeID_dictionary[event_range_id] = True (corr. output file has been transferred)
__stageout_queue = [] # Queue for files to be staged-out; files are added as they arrive and removed after they have been staged-out
__pfc_path = "" # The path to the pool file catalog
__message_server = None #
__message_thread = None #
__status = True # Global job status; will be set to False if an event range or stage-out fails
__athenamp_is_ready = False # True when an AthenaMP worker is ready to process an event range
__asyncOutputStager_thread = None #
__analysisJob = False # True for analysis job
__jobSite = None # Site object
__job = None # Job object
__cache = "" # Cache URL, e.g. used by LSST
__metadata_filename = "" # Full path to the metadata file
__yamplChannelName = None # Yampl channel name
__useEventIndex = True # Should Event Index be used? If not, a TAG file will be created
__tokenextractor_input_list_filenane = "" #
# Getter and setter methods
def getExperiment(self):
""" Getter for __experiment """
return self.__experiment
def setExperiment(self, experiment):
""" Setter for __experiment """
self.__experiment = experiment
def getPilotServer(self):
""" Getter for __pilotserver """
return self.__pilotserver
def setPilotServer(self, pilotserver):
""" Setter for __pilotserver """
self.__pilotserver = pilotserver
def getPilotPort(self):
""" Getter for __pilotport """
return self.__pilotport
def setPilotPort(self, pilotport):
""" Setter for __pilotport """
self.__pilotport = pilotport
def getFailureCode(self):
""" Getter for __failureCode """
return self.__failureCode
def setFailureCode(self, code):
""" Setter for __failureCode """
self.__failureCode = code
def getParentWorkDir(self):
""" Getter for __pworkdir """
return self.__pworkdir
def setParentWorkDir(self, pworkdir):
""" Setter for __pworkdir """
self.__pworkdir = pworkdir
def getLogGUID(self):
""" Getter for __logguid """
return self.__logguid
def setLogGUID(self, logguid):
""" Setter for __logguid """
self.__logguid = logguid
def getPilotLogFilename(self):
""" Getter for __pilotlogfilename """
return self.__pilotlogfilename
def setPilotLogFilename(self, pilotlogfilename):
""" Setter for __pilotlogfilename """
self.__pilotlogfilename = pilotlogfilename
def getStageInRetry(self):
""" Getter for __stageinretry """
return self.__stageinretry
def setStageInRetry(self, stageinretry):
""" Setter for __stageinretry """
self.__stageinretry = stageinretry
def getStageOutRetry(self):
""" Getter for __stageoutretry """
return self.__stageoutretry
def setStageOutRetry(self, stageoutretry):
""" Setter for __stageoutretry """
self.__stageoutretry = stageoutretry
def getPilotInitDir(self):
""" Getter for __pilot_initdir """
return self.__pilot_initdir
def setPilotInitDir(self, pilot_initdir):
""" Setter for __pilot_initdir """
self.__pilot_initdir = pilot_initdir
def getProxyCheckFlag(self):
""" Getter for __proxycheckFlag """
return self.__proxycheckFlag
def setProxyCheckFlag(self, proxycheckFlag):
""" Setter for __proxycheckFlag """
self.__proxycheckFlag = proxycheckFlag
def getGlobalPilotErrorDiag(self):
""" Getter for __globalPilotErrorDiag """
return self.__globalPilotErrorDiag
def setGlobalPilotErrorDiag(self, pilotErrorDiag):
""" Setter for __globalPilotErrorDiag """
self.__globalPilotErrorDiag = pilotErrorDiag
def getGlobalErrorCode(self):
""" Getter for __globalErrorCode """
return self.__globalErrorCode
def setGlobalErrorCode(self, code):
""" Setter for __globalErrorCode """
self.__globalErrorCode = code
def getErrorCode(self):
""" Getter for __errorCode """
return self.__errorCode
def setErrorCode(self, code):
""" Setter for __errorCode """
self.__errorCode = code
def getInputDir(self):
""" Getter for __inputDir """
return self.__inputDir
def setInputDir(self, inputDir):
""" Setter for __inputDir """
self.__inputDir = inputDir
def getOutputDir(self):
""" Getter for __outputDir """
return self.__outputDir
def setOutputDir(self, outputDir):
""" Setter for __outputDir """
self.__outputDir = outputDir
def getEventLoopRunning(self):
""" Getter for __event_loop_running """
return self.__event_loop_running
def setEventLoopRunning(self, event_loop_running):
""" Setter for __event_loop_running """
self.__event_loop_running = event_loop_running
def getOutputFiles(self):
""" Getter for __output_files """
return self.__output_files
def setOutputFiles(self, output_files):
""" Setter for __output_files """
self.__output_files = output_files
def getGUIDList(self):
""" Getter for __guid_list """
return self.__guid_list
def setGUIDList(self, guid_list):
""" Setter for __guid_list """
self.__guid_list = guid_list
def getLFNList(self):
""" Getter for __lfn_list """
return self.__lfn_list
def setLFNList(self, lfn_list):
""" Setter for __lfn_list """
self.__lfn_list = lfn_list
def getEventRangeDictionary(self):
""" Getter for __eventRange_dictionary """
return self.__eventRange_dictionary
def setEventRangeDictionary(self, eventRange_dictionary):
""" Setter for __eventRange_dictionary """
self.__eventRange_dictionary = eventRange_dictionary
def getEventRangeIDDictionary(self):
""" Getter for __eventRangeID_dictionary """
return self.__eventRangeID_dictionary
def setEventRangeIDDictionary(self, eventRangeID_dictionary):
""" Setter for __eventRangeID_dictionary """
self.__eventRangeID_dictionary = eventRangeID_dictionary
def getStageOutQueue(self):
""" Getter for __stageout_queue """
return self.__stageout_queue
def setStageOutQueue(self, stageout_queue):
""" Setter for __stageout_queue """
self.__stageout_queue = stageout_queue
def getPoolFileCatalogPath(self):
""" Getter for __pfc_path """
return self.__pfc_path
def setPoolFileCatalogPath(self, pfc_path):
""" Setter for __pfc_path """
self.__pfc_path = pfc_path
def getMessageServer(self):
""" Getter for __message_server """
return self.__message_server
def setMessageServer(self, message_server):
""" Setter for __message_server """
self.__message_server = message_server
def getMessageThread(self):
""" Getter for __message_thread """
return self.__message_thread
def setMessageThread(self, message_thread):
""" Setter for __message_thread """
self.__message_thread = message_thread
def isAthenaMPReady(self):
""" Getter for __athenamp_is_ready """
return self.__athenamp_is_ready
def setAthenaMPIsReady(self, athenamp_is_ready):
""" Setter for __athenamp_is_ready """
self.__athenamp_is_ready = athenamp_is_ready
def getAsyncOutputStagerThread(self):
""" Getter for __asyncOutputStager_thread """
return self.__asyncOutputStager_thread
def setAsyncOutputStagerThread(self, asyncOutputStager_thread):
""" Setter for __asyncOutputStager_thread """
self.__asyncOutputStager_thread = asyncOutputStager_thread
def getAnalysisJob(self):
""" Getter for __analysisJob """
return self.__analysisJob
def setAnalysisJob(self, analysisJob):
""" Setter for __analysisJob """
self.__analysisJob = analysisJob
def getCache(self):
""" Getter for __cache """
return self.__cache
def setCache(self, cache):
""" Setter for __cache """
self.__cache = cache
def getMetadataFilename(self):
""" Getter for __cache """
return self.__metadata_filename
def setMetadataFilename(self, event_range_id):
""" Setter for __metadata_filename """
self.__metadata_filename = os.path.join(self.__job.workdir, "metadata-%s.xml" % (event_range_id))
def getJobSite(self):
""" Getter for __jobSite """
return self.__jobSite
def setJobSite(self, jobSite):
""" Setter for __jobSite """
self.__jobSite = jobSite
def getYamplChannelName(self):
""" Getter for __yamplChannelName """
return self.__yamplChannelName
def setYamplChannelName(self, yamplChannelName):
""" Setter for __yamplChannelName """
self.__yamplChannelName = yamplChannelName
def getStatus(self):
""" Getter for __status """
return self.__status
def setStatus(self, status):
""" Setter for __status """
self.__status = status
# Get/setters for the job object
def getJob(self):
""" Getter for __job """
return self.__job
def setJob(self, job):
""" Setter for __job """
self.__job = job
# Reset the outFilesGuids list since guids will be generated by this module
self.__job.outFilesGuids = []
def getJobWorkDir(self):
""" Getter for workdir """
return self.__job.workdir
def setJobWorkDir(self, workdir):
""" Setter for workdir """
self.__job.workdir = workdir
def getJobID(self):
""" Getter for jobId """
return self.__job.jobId
def setJobID(self, jobId):
""" Setter for jobId """
self.__job.jobId = jobId
def getJobDataDir(self):
""" Getter for datadir """
return self.__job.datadir
def setJobDataDir(self, datadir):
""" Setter for datadir """
self.__job.datadir = datadir
def getJobTrf(self):
""" Getter for trf """
return self.__job.trf
def setJobTrf(self, trf):
""" Setter for trf """
self.__job.trf = trf
def getJobResult(self):
""" Getter for result """
return self.__job.result
def setJobResult(self, result):
""" Setter for result """
self.__job.result = result
def getJobState(self):
""" Getter for jobState """
return self.__job.jobState
def setJobState(self, jobState):
""" Setter for jobState """
self.__job.jobState = jobState
def getJobStates(self):
""" Getter for job states """
return self.__job.result
def setJobStates(self, states):
""" Setter for job states """
self.__job.result = states
self.__job.currentState = states[0]
def getTaskID(self):
""" Getter for TaskID """
return self.__taskID
def setTaskID(self, taskID):
""" Setter for taskID """
self.__taskID = taskID
def getJobOutFiles(self):
""" Getter for outFiles """
return self.__job.outFiles
def setJobOutFiles(self, outFiles):
""" Setter for outFiles """
self.__job.outFiles = outFiles
def getTokenExtractorInputListFilename(self):
""" Getter for __tokenextractor_input_list_filenane """
return self.__tokenextractor_input_list_filenane
def setTokenExtractorInputListFilename(self, tokenextractor_input_list_filenane):
""" Setter for __tokenextractor_input_list_filenane """
self.__tokenextractor_input_list_filenane = tokenextractor_input_list_filenane
def useEventIndex(self):
""" Should the Event Index be used? """
return self.__useEventIndex
def setUseEventIndex(self, value):
""" Set the __useEventIndex variable to a boolean value """
self.__useEventIndex = value
# Required methods
def __init__(self):
""" Default initialization """
# e.g. self.__errorLabel = errorLabel
self.__yamplChannelName = "EventService_EventRanges"
# self.__yamplChannelName = "EventService_EventRanges-%s" % (commands.getoutput('uuidgen'))
# is this necessary? doesn't exist in RunJob
def __new__(cls, *args, **kwargs):
""" Override the __new__ method to make the class a singleton """
if not cls.__instance:
cls.__instance = super(RunJobEvent, cls).__new__(cls, *args, **kwargs)
return cls.__instance
def getRunJob(self):
""" Return a string with the experiment name """
return self.__runjob
def getRunJobFileName(self):
""" Return the filename of the module """
return super(RunJobEvent, self).getRunJobFileName()
def allowLoopingJobKiller(self):
""" Should the pilot search for looping jobs? """
# The pilot has the ability to monitor the payload work directory. If there are no updated files within a certain
# time limit, the pilot will consider the as stuck (looping) and will kill it. The looping time limits are set
# in environment.py (see e.g. loopingLimitDefaultProd)
return True
def argumentParser(self):
""" Argument parser for the RunJob module """
# Return variables
appdir = None
queuename = None
sitename = None
workdir = None
parser = OptionParser()
parser.add_option("-a", "--appdir", dest="appdir",
help="The local path to the applications directory", metavar="APPDIR")
parser.add_option("-b", "--queuename", dest="queuename",
help="Queue name", metavar="QUEUENAME")
parser.add_option("-d", "--workdir", dest="workdir",
help="The local path to the working directory of the payload", metavar="WORKDIR")
parser.add_option("-g", "--inputdir", dest="inputDir",
help="Location of input files to be transferred by the mv site mover", metavar="INPUTDIR")
parser.add_option("-i", "--logfileguid", dest="logguid",
help="Log file guid", metavar="GUID")
parser.add_option("-k", "--pilotlogfilename", dest="pilotlogfilename",
help="The name of the pilot log file", metavar="PILOTLOGFILENAME")
parser.add_option("-l", "--pilotinitdir", dest="pilot_initdir",
help="The local path to the directory where the pilot was launched", metavar="PILOT_INITDIR")
parser.add_option("-m", "--outputdir", dest="outputDir",
help="Destination of output files to be transferred by the mv site mover", metavar="OUTPUTDIR")
parser.add_option("-o", "--parentworkdir", dest="pworkdir",
help="Path to the work directory of the parent process (i.e. the pilot)", metavar="PWORKDIR")
parser.add_option("-s", "--sitename", dest="sitename",
help="The name of the site where the job is to be run", metavar="SITENAME")
parser.add_option("-w", "--pilotserver", dest="pilotserver",
help="The URL of the pilot TCP server (localhost) WILL BE RETIRED", metavar="PILOTSERVER")
parser.add_option("-p", "--pilotport", dest="pilotport",
help="Pilot TCP server port (default: 88888)", metavar="PORT")
parser.add_option("-t", "--proxycheckflag", dest="proxycheckFlag",
help="True (default): perform proxy validity checks, False: no check", metavar="PROXYCHECKFLAG")
parser.add_option("-x", "--stageinretries", dest="stageinretry",
help="The number of stage-in retries", metavar="STAGEINRETRY")
#parser.add_option("-B", "--filecatalogregistration", dest="fileCatalogRegistration",
# help="True (default): perform file catalog registration, False: no catalog registration", metavar="FILECATALOGREGISTRATION")
parser.add_option("-E", "--stageoutretries", dest="stageoutretry",
help="The number of stage-out retries", metavar="STAGEOUTRETRY")
parser.add_option("-F", "--experiment", dest="experiment",
help="Current experiment (default: ATLAS)", metavar="EXPERIMENT")
parser.add_option("-H", "--cache", dest="cache",
help="Cache URL", metavar="CACHE")
# options = {'experiment': 'ATLAS'}
try:
(options, args) = parser.parse_args()
except Exception,e:
tolog("!!WARNING!!3333!! Exception caught:" % (e))
print options.experiment
else:
if options.appdir:
# self.__appdir = options.appdir
appdir = options.appdir
if options.experiment:
self.__experiment = options.experiment
if options.logguid:
self.__logguid = options.logguid
if options.inputDir:
self.__inputDir = options.inputDir
if options.pilot_initdir:
self.__pilot_initdir = options.pilot_initdir
if options.pilotlogfilename:
self.__pilotlogfilename = options.pilotlogfilename
if options.pilotserver:
self.__pilotserver = options.pilotserver
if options.proxycheckFlag:
if options.proxycheckFlag.lower() == "false":
self.__proxycheckFlag = False
else:
self.__proxycheckFlag = True
else:
self.__proxycheckFlag = True
if options.pworkdir:
self.__pworkdir = options.pworkdir
if options.outputDir:
self.__outputDir = options.outputDir
if options.pilotport:
try:
self.__pilotport = int(options.pilotport)
except Exception, e:
tolog("!!WARNING!!3232!! Exception caught: %s" % (e))
# self.__queuename is not needed
if options.queuename:
queuename = options.queuename
if options.sitename:
sitename = options.sitename
if options.stageinretry:
try:
self.__stageinretry = int(options.stageinretry)
except Exception, e:
tolog("!!WARNING!!3232!! Exception caught: %s" % (e))
if options.stageoutretry:
try:
self.__stageoutretry = int(options.stageoutretry)
except Exception, e:
tolog("!!WARNING!!3232!! Exception caught: %s" % (e))
if options.workdir:
workdir = options.workdir
if options.cache:
self.__cache = options.cache
# use sitename as queuename if queuename == ""
if queuename == "":
queuename = sitename
return sitename, appdir, workdir, "", queuename # get rid of the dq2url (, "") in this return list
def cleanup(self, rf=None):
""" Cleanup function """
# 'rf' is a list that will contain the names of the files that could be transferred
# In case of transfer problems, all remaining files will be found and moved
# to the data directory for later recovery.
tolog("********************************************************")
tolog(" This job ended with (trf,pilot) exit code of (%d,%d)" % (self.__job.result[1], self.__job.result[2]))
tolog("********************************************************")
# clean up the pilot wrapper modules
pUtil.removePyModules(self.__job.workdir)
if os.path.isdir(self.__job.workdir):
os.chdir(self.__job.workdir)
# remove input files from the job workdir
remFiles = self.__job.inFiles
for inf in remFiles:
if inf and inf != 'NULL' and os.path.isfile("%s/%s" % (self.__job.workdir, inf)): # non-empty string and not NULL
try:
os.remove("%s/%s" % (self.__job.workdir, inf))
except Exception,e:
tolog("!!WARNING!!3000!! Ignore this Exception when deleting file %s: %s" % (inf, str(e)))
pass
# only remove output files if status is not 'holding'
# in which case the files should be saved for the job recovery.
# the job itself must also have finished with a zero trf error code
# (data will be moved to another directory to keep it out of the log file)
# always copy the metadata-<jobId>.xml to the site work dir
# WARNING: this metadata file might contain info about files that were not successfully moved to the SE
# it will be regenerated by the job recovery for the cases where there are output files in the datadir
try:
copy2("%s/metadata-%s.xml" % (self.__job.workdir, self.__job.jobId), "%s/metadata-%s.xml" % (self.__pworkdir, self.__job.jobId))
except Exception, e:
tolog("Warning: Could not copy metadata-%s.xml to site work dir - ddm Adder problems will occure in case of job recovery" % \
(self.__job.jobId))
if self.__job.result[0] == 'holding' and self.__job.result[1] == 0:
try:
# create the data directory
os.makedirs(self.__job.datadir)
except OSError, e:
tolog("!!WARNING!!3000!! Could not create data directory: %s, %s" % (self.__job.datadir, str(e)))
else:
# find all remaining files in case 'rf' is not empty
remaining_files = []
moved_files_list = []
try:
if rf != None:
moved_files_list = RunJobUtilities.getFileNamesFromString(rf[1])
remaining_files = RunJobUtilities.getRemainingFiles(moved_files_list, self.__job.outFiles)
except Exception, e:
tolog("!!WARNING!!3000!! Illegal return value from Mover: %s, %s" % (str(rf), str(e)))
remaining_files = self.__job.outFiles
# move all remaining output files to the data directory
nr_moved = 0
for _file in remaining_files:
try:
os.system("mv %s %s" % (_file, self.__job.datadir))
except OSError, e:
tolog("!!WARNING!!3000!! Failed to move file %s (abort all)" % (_file))
break
else:
nr_moved += 1
tolog("Moved %d/%d output file(s) to: %s" % (nr_moved, len(remaining_files), self.__job.datadir))
# remove all successfully copied files from the local directory
nr_removed = 0
for _file in moved_files_list:
try:
os.system("rm %s" % (_file))
except OSError, e:
tolog("!!WARNING!!3000!! Failed to remove output file: %s, %s" % (_file, e))
else:
nr_removed += 1
tolog("Removed %d output file(s) from local dir" % (nr_removed))
# copy the PoolFileCatalog.xml for non build jobs
if not pUtil.isBuildJob(remaining_files):
_fname = os.path.join(self.__job.workdir, "PoolFileCatalog.xml")
tolog("Copying %s to %s" % (_fname, self.__job.datadir))
try:
copy2(_fname, self.__job.datadir)
except Exception, e:
tolog("!!WARNING!!3000!! Could not copy PoolFileCatalog.xml to data dir - expect ddm Adder problems during job recovery")
# remove all remaining output files from the work directory
# (a successfully copied file should already have been removed by the Mover)
rem = False
for inf in self.__job.outFiles:
if inf and inf != 'NULL' and os.path.isfile("%s/%s" % (self.__job.workdir, inf)): # non-empty string and not NULL
try:
os.remove("%s/%s" % (self.__job.workdir, inf))
except Exception, e:
tolog("!!WARNING!!3000!! Ignore this Exception when deleting file %s: %s" % (inf, e))
pass
else:
tolog("Lingering output file removed: %s" % (inf))
rem = True
if not rem:
tolog("All output files already removed from local dir")
tolog("Payload cleanup has finished")
def sysExit(self, rf=None):
'''
wrapper around sys.exit
rs is the return string from Mover::put_data() containing a list of files that were not transferred
'''
self.cleanup(rf=rf)
sys.stderr.close()
tolog("RunJobEvent (payload wrapper) has finished")
# change to sys.exit?
os._exit(self.__job.result[2]) # pilotExitCode, don't confuse this with the overall pilot exit code,
# which doesn't get reported back to panda server anyway
def failJob(self, transExitCode, pilotExitCode, job, ins=None, pilotErrorDiag=None, docleanup=True):
""" set the fail code and exit """
job.setState(["failed", transExitCode, pilotExitCode])
if pilotErrorDiag:
job.pilotErrorDiag = pilotErrorDiag
tolog("Will now update local pilot TCP server")
rt = RunJobUtilities.updatePilotServer(job, self.__pilotserver, self.__pilotport, final=True)
if ins:
ec = pUtil.removeFiles(job.workdir, ins)
if docleanup:
self.sysExit()
def getTrfExitInfo(self, exitCode, workdir):
""" Get the trf exit code and info from job report if possible """
exitAcronym = ""
exitMsg = ""
# does the job report exist?
extension = getExtension(alternative='pickle')
if extension.lower() == "json":
_filename = "jobReport.%s" % (extension)
else:
_filename = "jobReportExtract.%s" % (extension)
filename = os.path.join(workdir, _filename)
# first backup the jobReport to the job workdir since it will be needed later
# (the current location will disappear since it will be tarred up in the jobs' log file)
d = os.path.join(workdir, '..')
try:
copy2(filename, os.path.join(d, _filename))
except Exception, e:
tolog("Warning: Could not backup %s to %s: %s" % (_filename, d, e))
else:
tolog("Backed up %s to %s" % (_filename, d))
# It might take a short while longer until the job report is created (unknown why)
count = 1
max_count = 10
nap = 5
found = False
while count <= max_count:
if os.path.exists(filename):
tolog("Found job report: %s" % (filename))
found = True
break
else:
tolog("Waiting %d s for job report to arrive (#%d/%d)" % (nap, count, max_count))
time.sleep(nap)
count += 1
if found:
# search for the exit code
try:
f = open(filename, "r")
except Exception, e:
tolog("!!WARNING!!1112!! Failed to open job report: %s" % (e))
else:
if extension.lower() == "json":
from json import load
else:
from pickle import load
data = load(f)
# extract the exit code and info
_exitCode = self.extractDictionaryObject("exitCode", data)
if _exitCode:
if _exitCode == 0 and exitCode != 0:
tolog("!!WARNING!!1111!! Detected inconsistency in %s: exitcode listed as 0 but original trf exit code was %d (using original error code)" %\
(filename, exitCode))
else:
exitCode = _exitCode
_exitAcronym = self.extractDictionaryObject("exitAcronym", data)
if _exitAcronym:
exitAcronym = _exitAcronym
_exitMsg = self.extractDictionaryObject("exitMsg", data)
if _exitMsg:
exitMsg = _exitMsg
f.close()
tolog("Trf exited with:")
tolog("...exitCode=%d" % (exitCode))
tolog("...exitAcronym=%s" % (exitAcronym))
tolog("...exitMsg=%s" % (exitMsg))
# Ignore special trf error for now
if (exitCode == 65 and exitAcronym == "TRF_EXEC_FAIL") or (exitCode == 68 and exitAcronym == "TRF_EXEC_LOGERROR") or (exitCode == 66 and exitAcronym == "TRF_EXEC_VALIDATION_FAIL"):
exitCode = 0
exitAcronym = ""
exitMsg = ""
tolog("!!WARNING!!3333!! Reset TRF error codes..")
else:
tolog("Job report not found: %s" % (filename))
return exitCode, exitAcronym, exitMsg
def convertToLFNs(self):
""" Convert the output file names to LFNs """
# Remove the file paths
lfns = []
for f in self.getOutputFiles():
lfns.append(os.path.basename(f))
return lfns
def createFileMetadata(self, outsDict, dsname, datasetDict, sitename):
""" create the metadata for the output + log files """
ec = 0
# get the file sizes and checksums for the local output files
# WARNING: any errors are lost if occur in getOutputFileInfo()
ec, pilotErrorDiag, fsize, checksum = pUtil.getOutputFileInfo(list(self.getOutputFiles()), getChecksumCommand(), skiplog=True, logFile=self.__job.logFile)
if ec != 0:
tolog("!!FAILED!!2999!! %s" % (pilotErrorDiag))
self.failJob(self.__job.result[1], ec, self.__job, pilotErrorDiag=pilotErrorDiag)
# Get the correct log guid (a new one is generated for the Job() object, but we need to get it from the -i logguid parameter)
if self.__logguid:
guid = self.__logguid
else:
guid = self.__job.tarFileGuid
# Convert the output file list to LFNs
lfns = self.convertToLFNs()
# Create preliminary metadata (no metadata yet about log file - added later in pilot.py)
_fname = "%s/metadata-%s.xml" % (self.__job.workdir, self.__job.jobId)
tolog("fguids=%s"%str(self.__job.outFilesGuids))
lfns = []
self.__job.outFilesGuids = []
tolog("Reset output file LFN and GUID list (pilot will not report these to the server - xml shoould only contain log file info)")
try:
_status = pUtil.PFCxml(self.__experiment, _fname, fnlist=lfns, fguids=self.__job.outFilesGuids, fntag="lfn", alog=self.__job.logFile, alogguid=guid,\
fsize=fsize, checksum=checksum, analJob=self.__analysisJob)
except Exception, e:
pilotErrorDiag = "PFCxml failed due to problematic XML: %s" % (e)
tolog("!!WARNING!!1113!! %s" % (pilotErrorDiag))
self.failJob(self.__job.result[1], self.__error.ERR_MISSINGGUID, self.__job, pilotErrorDiag=pilotErrorDiag)
else:
if not _status:
pilotErrorDiag = "Missing guid(s) for output file(s) in metadata"
tolog("!!FAILED!!2999!! %s" % (pilotErrorDiag))
self.failJob(self.__job.result[1], self.__error.ERR_MISSINGGUID, self.__job, pilotErrorDiag=pilotErrorDiag)
tolog("NOTE: Output file info will not be sent to the server as part of xml metadata")
tolog("..............................................................................................................")
tolog("Created %s with:" % (_fname))
tolog(".. log : %s (to be transferred)" % (self.__job.logFile))
tolog(".. log guid : %s" % (guid))
tolog(".. out files : %s" % str(self.__job.outFiles))
tolog(".. out file guids : %s" % str(self.__job.outFilesGuids))
tolog(".. fsize : %s" % str(fsize))
tolog(".. checksum : %s" % str(checksum))
tolog("..............................................................................................................")
# convert the preliminary metadata-<jobId>.xml file to OutputFiles-<jobId>.xml for NG and for CERNVM
# note: for CERNVM this is only really needed when CoPilot is used
if os.environ.has_key('Nordugrid_pilot') or sitename == 'CERNVM':
if RunJobUtilities.convertMetadata4NG(os.path.join(self.__job.workdir, self.__job.outputFilesXML), _fname, outsDict, dsname, datasetDict):