-
Notifications
You must be signed in to change notification settings - Fork 0
/
spawn_jobs.py
executable file
·456 lines (396 loc) · 17 KB
/
spawn_jobs.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
#!/usr/bin/env python
import changemat
from collections import OrderedDict
import ConfigParser
import datetime
import os
import meshdb
import monitorjobs
from multiprocessing import Pool
import shutil
import sqlite3 as lite
from subprocess import Popen, PIPE
import sys
from time import sleep, asctime, localtime
class JobProperties(object):
' Common class for holding job properties much like a struct '
def __init__(self, jobsdb, hashid):
self.jobsdb = jobsdb
self.props = OrderedDict()
self.hashid = hashid
self.createHashTable()
def createHashTable(self):
try:
con = lite.connect(self.jobsdb)
cur = con.cursor()
except:
print("Unable to find the database. Please be in the sim_dir and "
"populate the db folder with the proper databases\n")
print "Tried to open jobs database:", self.jobsdb
sys.exit()
with con:
# Getting table names for jobs
cmd = 'PRAGMA table_info(Jobs)'
pragma_data = cur.execute(cmd).fetchall()
columns = [p[1] for p in pragma_data]
# Getting information from particular job
cmd = 'SELECT * FROM Jobs WHERE HASHID = "%s"' % self.hashid
job_info = cur.execute(cmd).fetchall()[0]
# Creating the hash table
for i, column in enumerate(columns):
self.props[column] = job_info[i]
def getProp(self, prop):
return self.props[prop]
def setProp(self, prop, newValue):
self.props[prop] = newValue
def keys(self):
for k in self.props.iterkeys():
return k
def isIn(self, line):
isIn = False
line = line.split()
for k in self.props.iterkeys():
kref = '<'+k+'>' # exmaple in code would like like <soil_pav>
for w in line:
if kref.upper() == w.upper():
isIn = True
break
if isIn:
break
return isIn, kref.lower(), k
def __str__(self):
s = ""
for k,v in self.props.iteritems():
s += "key: %s\tvalue: %s\n" % (k,v)
return s
class JobSpawner(object):
' Common class for spawning jobs and job control '
def __init__(self, jobsdb, mshdb, cnf, sim_name):
self.jobsdb = jobsdb
self.mshdb = mshdb
self.cnf = cnf
self.sim_name = sim_name
self.nextjob = ""
self.numjobsrunning = 0
self.totconjobs = 3 # This is hardcoded for the time being
self.jobsrunning = []
self.totaljobsran = 0
self.simdir = os.getcwd() + '/'
def createHotStart(self, nodes, facets):
# Create hotstart file
props = JobProperties(self.jobsdb, self.nextjob)
head = props.getProp('HEAD')
bot_ts = props.getProp('BOT_TS')
header='DATASET\nOBJTYPE "mesh3d"\nBEGSCL\n'
header +='ND %s\n' % nodes
header += 'NC %s\n' % facets
ih_header = header + 'NAME "IH"\nTS 0 0\n'
tmp_header = header + 'NAME "IT"\nTS 0 0\n'
with open('jobs/'+self.nextjob+'/'+self.nextjob+'.hot','w') as outfile:
outfile.write(ih_header)
for i in range(int(nodes)):
outfile.write('%s\n' % head)
outfile.write('ENDSS\n')
outfile.write(tmp_header)
for i in range(int(nodes)):
outfile.write('%s\n' % bot_ts)
outfile.write('ENDDS')
print "Hotstart file created with %d IH and %d IT" % (head, bot_ts)
def createBcFile(self, bcs):
try:
config = ConfigParser.ConfigParser()
config.read(self.cnf)
except:
print "Config file not found in config directory."
sys.exit()
# Getting the start and end time from the config file
st = config.get('sim_info','start_time')
st = st.split()[0]
stime = datetime.datetime.strptime(st, '%Y-%m-%d')
start_time = stime.timetuple().tm_yday
et = config.get('sim_info','end_time')
et = et.split()[0]
etime = datetime.datetime.strptime(et, '%Y-%m-%d')
end_time = etime.timetuple().tm_yday
# Getting the properties from the jobs.db
props = JobProperties(self.jobsdb, self.nextjob)
props.setProp('start_day',start_time)
props.setProp('end_hour', (end_time-start_time)*24+1)
bc_handle = config.get('sim_info','bc_template')
new_bc = []
with open(bc_handle) as infile:
for line in infile.readlines():
inLine, propref, prop = props.isIn(line)
if inLine:
line = line.split()
index = line.index(propref)
line[index] = str(props.getProp(prop))
line = ' '.join(line)
line += '\n'
new_bc.append(line)
else:
new_bc.append(line)
with open('jobs/'+self.nextjob+'/'+self.nextjob+'.bc','w') as outfile:
outfile.write('! '+self.nextjob+' simulation bc file\n')
for line in new_bc:
outfile.write(line)
outfile.write('\n'+bcs+'END')
print "BC File created %s" % self.nextjob+'.bc'
def getNextJob(self):
try:
self.con = lite.connect(self.jobsdb)
cur = self.con.cursor()
except:
print("Unable to find the database. Please be in sim_dir and "
"populate the db folder with the databases")
print "Tried to open database:",self.jobsdb
sys.exit()
with self.con:
cmd = 'SELECT running, complete, hashid FROM Jobs WHERE\
sim_name = "%s"' % self.sim_name
jobs = cur.execute(cmd).fetchall()
# Separating out the lists in jobs into their respective lists
running, comp, hashid = \
zip(*[(x[0],x[1],x[2]) for x in jobs])
foundJob = False
totaljobs = len(running)
job_count = 1
for r,c,h in zip(running, comp, hashid):
# print r, h
if r == "N" and c == "N":
foundJob = True
if foundJob and self.numjobsrunning < self.totconjobs:
self.nextjob = str(h) # Convert from unicode
self.updateJobsDB('RUNNING','Y')
print "Next job to run is %s" % self.nextjob
break
if job_count==totaljobs and not foundJob:
print "There are no more jobs to complete"
sys.exit()
job_count += 1
return self.nextjob
def jobSetup(self, hashid=None):
"""
This sets up all the necessary files in the simdir/jobs directory.
If you do not feed it a hashid then it will choose self.nextjob. This
is set to do both so there is no confusion when running in parallel.
"""
# Seeing if it is running in parallel or serially
if hashid is None:
hashid = self.nextjob
# Testing for necessary directories in simdir
cwd = os.getcwd()
os.chdir(cwd)
necDirs = True
necessary_dirs = ['db','config','bin'] # We could add to this list later
for directory in necessary_dirs:
if os.path.exists(directory) is False:
print "You are missing the %s directory in your simdir\n"\
% directory
necDirs = False
if not necDirs:
sys.exit()
# Set up the directory
if not os.path.exists('jobs'):
os.mkdir('jobs', 0775)
if not os.path.exists('jobs/'+ hashid):
os.mkdir('jobs/'+ hashid, 0775)
# Get mesh ID
try:
self.con = lite.connect(self.jobsdb)
cur = self.con.cursor()
except:
print "Make sure you have a %s in your db/ directory" % self.jobsdb
sys.exit()
with self.con:
try:
cmd = 'SELECT PAV_DZ, BASE_DZ, SOIL_DZ, MESH_ID from Jobs where\
hashid = "%s"' % hashid
except:
cmd = 'SELECT PAV_DZ, BASE_DZ, SOIL_DZ, MESH_ID from Jobs\
LIMIT 1'
stats = cur.execute(cmd).fetchall()
pav_dz, base_dz, soil_dz, meshid = stats[0]
tot_z = pav_dz + base_dz + soil_dz
print "pav_dz: " + str(pav_dz)
print "base_dz: " + str(base_dz)
# Get the mesh and bc cards
msd = meshdb.MeshDB(self.mshdb)
mesh_name, mesh, bcs = msd.get(meshid)
mesh = "%s" % mesh
bcs = "%s" % bcs
with open('jobs/%s/%s.3dm' % (hashid, hashid),'w') as f:
f.write(mesh+'END')
#with open('/jobs/%s/%s.bcs' % (nextjob, nextjob)) as f:
#f.write(bcs)
# Change the materials
os.chdir('jobs/'+hashid)
changemat.changeMaterials(mesh_name = hashid +'.3dm',\
depth=[pav_dz, base_dz])
os.chdir(self.simdir)
# Create bc file
self.createBcFile(bcs)
# Create hotstart file
nodes, facets = msd.getNdsAndFcs(meshid)
self.createHotStart(nodes, facets)
# Grab the met file
try:
config = ConfigParser.ConfigParser()
config.read(self.cnf)
except:
print "Config file not found in config directory."
sys.exit()
met_path = config.get('sim_info','met_file')
met_handle = met_path.split('/')[-1]
if not os.path.isfile('config/'+met_handle):
try:
shutil.copyfile(met_path, 'config/')
print "Using the met from %s" % met_path
except:
print "No met file found"
sys.exit()
else:
shutil.copyfile('config/'+met_handle,'jobs/'+ hashid +'/'+\
hashid +'.met')
print "Using the %s met file from the config folder" % met_handle
def updateJobsDB(self, toset, value, hashid=None):
"""
This sets the jobs.db you are working with because I was doing it
over and over in the code and wanted to stop having to copy and
paste. You feed it the name of the key you want to change and the
value you want to change it to
By default it is going to use the self.nextjob, but in a parallel
environment this might have changed since the job finished so you'll
need to supply it with the actual hashid
Serial
input: value in database to set, new value
Parallel
input: value in database to set, new value, hashid
"""
# For serial job
if hashid is None:
hashid = self.nextjob
#======================================================================
# Trying to connect to the database
#======================================================================
try:
self.con = lite.connect(self.simdir+self.jobsdb)
cur = self.con.cursor()
except:
print("Unable to find the database. Please be in sim_dir and "
"populate the db folder with the databases")
print "Tried to open database:",self.jobsdb
sys.exit()
with self.con:
cmd = 'UPDATE Jobs SET %s = "%s" WHERE hashid = "%s"'\
% (toset, value, hashid)
cur.execute(cmd)
def runSerially(self):
self.getNextJob()
self.jobSetup()
print "Running job %s" % self.nextjob
os.chdir('jobs/'+self.nextjob)
print "Job starting at %s" % asctime(localtime())
logfile = open(self.nextjob+'.log','w')
errfile = open(self.nextjob+'.err', 'w')
p1 = Popen(['../../bin/adh', self.nextjob],stdout=logfile, stderr=errfile)
#output = p1.communicate()[0]
self.updateJobsDB('PID', p1.pid)
p1.wait()
print "Job %s has finished at %s" % (self.nextjob, asctime(localtime()))
#print stdout
#with open(self.nextjob+'.log','w') as outfile:
#outfile.write(output)
print "Simulation %s has finished." % self.nextjob
# move back up
os.chdir(self.simdir)
self.numjobsrunning -= 1
self.updateJobsDB('COMPLETE', 'Y')
def runJobParallel(self, cmd):
"""
This is fed a cmd after being unwrapped by an unwrapping function
outside of the code that strips self for pickling reasons. Google
pickling classes multiprocessing for more info on why.
This runs the jobs in parallel, and returns the exitcode or an error
"""
os.chdir(self.simdir+'jobs/'+cmd[-1]) # Should give me job
try:
with open(cmd[-1]+'.log','w') as logfile:
p1 = Popen(cmd, stdout=logfile)
os.chdir(self.simdir+'jobs/')
return cmd, p1.wait(), None # Use this to find if error is None
except Exception as e:
os.chdir(self.simdir+'jobs/')
return cmd, None, str(e)
def runJobsParallel(self):
"""
This will run the jobs concurrently with however many number of
processors you give it.
This will be set manually with a mac and will probably end up reading
the PBS information on the supercomputer.
"""
self.maxprocs = 3 # This will be set somewhere else eventually
# self.numprocs = 0
#======================================================================
# Set the size of the pool that will spawn the jobs concurrently.
#======================================================================
pool = Pool(self.maxprocs)
#======================================================================
# Set up the intial first jobs to spawn. I imagine we'll want to grab
# the first jobs that haven't been ran at the database at this point.
# Later after these runs complete we might choose to do some scoring
# methods, so I'm letting another part of the code handle that.
#======================================================================
cmds = []
for i in range(self.maxprocs):
print i
hashid = self.getNextJob()
self.jobSetup(hashid)
print hashid + " this is the next job"
cmds.append(['../../bin/adh', hashid ])
#======================================================================
# This will look to see when a job finishes and add another one to the
# cmds list, if not it should just wait until a job finishes and add
# one. I've commented lines out for updating the ERROR in the DB for
# now.
#======================================================================
while True:
for cmd, status, error in pool.imap_unordered(unwrap_self,zip([self]*len(cmds), cmds)):
if error is None:
print "%s job has completed with status %s" % (cmd, status)
cmds.remove(cmd)
print "%s has been removed" % cmd
self.updateJobsDB('COMPLETE','Y', hashid=hashid)
# self.updateJobsDB('ERROR','N', hashid=hashid)
hashid = self.getNextJob()
self.jobSetup(hashid)
cmds.append(['../../bin/adh', hashid])
else:
print "%s job failed with error %s" % (cmd, error)
cmds.remove(cmd)
# self.updateJobsDB('ERROR','Y',hashid=cmd[-1])
hashid = self.getNextJob()
self.jobSetup(hashid)
cmds.append(['../../bin/adh', hashid])
#==================================================================
# This is like a wait which hopefully keeps the last jobs from not
# being completed after the first job of that pool is done.
#==================================================================
pool.join()
def unwrap_self(arg, **kwargs):
"""
This is simply to unwrap the Pool() call to it's intended target. This
has to do with a pickling issue within Classes in python.
"""
return JobSpawner.runJobParallel(*arg, **kwargs)
if __name__ == '__main__':
cwd = os.getcwd()
os.chdir(cwd)
jobsdb = 'db/jobs.db'
#hashid = '59b8da00c8fd1a7f5a9b065ea235c5df'
mshdb = 'db/meshes.db'
cnf = 'hagler.ini'
sim_name = 'hagler'
job = JobSpawner(jobsdb, mshdb, cnf, sim_name)
job.runJobsParallel()
#job.runSerially()