Skip to content

Commit

Permalink
Added new adjoint apps
Browse files Browse the repository at this point in the history
  • Loading branch information
Chai committed Feb 7, 2017
1 parent e0ca93a commit 54af881
Show file tree
Hide file tree
Showing 5 changed files with 277 additions and 77 deletions.
176 changes: 176 additions & 0 deletions apps/adFVM_adjoint.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
#!/usr/bin/python2 -u
import os
import subprocess
import numpy as np
import shutil
import glob
import h5py
import sys
import cPickle as pickle
sys.setrecursionlimit(10000)

def isfloat(s):
try:
float(s)
return True
except ValueError:
return False

nProcessors = 16
parameter = 1.0
nRuns = 4
dims = 120
segments = 20
steps = 5000
adj_write_interval = 5000
#nRuns = 1
#dims = 2
#segments = 5
#steps = 2
#adj_write_interval = 2

time = 1.0
source = '/master/home/talnikar/adFVM/'
problem = 'periodic_wake.py'
case = '/scratch/talnikar/periodic_wake_adj/'
#case = '/scratch/talnikar/periodic_wake_test/'

def getTime(time):
stime = str(time)
if time.is_integer():
stime = str(int(time))
return stime
stime = getTime(time)

internalCells = []
with h5py.File(case + 'mesh.hdf5', 'r') as mesh:
nCount = mesh['parallel/end'][:]-mesh['parallel/start'][:]
nInternalCells = nCount[:,4]
nGhostCells = nCount[:,2]-nCount[:,3]
start = 0
for i in range(0, nProcessors):
n = nInternalCells[i]
internalCells.append(np.arange(start, start + n))
start += n + nGhostCells[i]
internalCells = np.concatenate(internalCells)

fieldNames = ['rhoa', 'rhoUa', 'rhoEa']
program = source + 'apps/adjoint.py'

reference = [1., 200., 2e5]
def getInternalFields(case, time):
fields = []
with h5py.File(case + getTime(time) + '.hdf5', 'r') as phi:
for name in fieldNames:
fields.append(phi[name + '/field'][:][internalCells])
fields = [x/y for x, y in zip(fields, reference)]
return np.hstack(fields).ravel()

def writeFields(fields, caseDir, ntime):
fields = fields.reshape((fields.shape[0]/5, 5))
fields = fields[:,[0]], fields[:,1:4], fields[:,[4]]
fields = [x*y for x, y in zip(fields, reference)]
timeFile = caseDir + getTime(ntime) + '.hdf5'
with h5py.File(timeFile, 'r+') as phi:
for index, name in enumerate(fieldNames):
field = phi[name + '/field'][:]
field[internalCells] = fields[index]
phi[name + '/field'][:] = field

simTimes = []
for hdf in glob.glob(case + '*.hdf5'):
if 'mesh' not in hdf:
simTimes.append(os.path.basename(hdf))
simTimes.sort(key=lambda x: float(x[:-5]))

def spawnJob(exe, args, **kwargs):
from fds.slurm import grab_from_SLURM_NODELIST
interprocess = kwargs['interprocess']
del kwargs['interprocess']
#nodes = grab_from_SLURM_NODELIST(1, interprocess)
#print('spawnJob', nodes, exe, args)
#returncode = subprocess.call(['mpirun', '--host', ','.join(nodes.grabbed_nodes)
# , exe] + args, **kwargs)
#nodes.release()
returncode = subprocess.call(['mpirun', '-np', str(nProcessors), exe] + args, **kwargs)
return returncode

def runCase(initFields, nSteps, segment, run_id, interprocess):

# generate case folders
caseDir = '{}/temp/{}/'.format(case, run_id)
mesh.case = caseDir
if not os.path.exists(caseDir):
os.makedirs(caseDir)
shutil.copy(case + problem, caseDir)
shutil.copy(case + 'mesh.hdf5', caseDir)
jump = nSteps/adj_write_interval
start = len(simTimes) - 2 - jump*segment
times = simTimes[start:start + jump + 1]
for stime in times:
shutil.copy(case + stime, caseDir)

for pkl in glob.glob(case + '*.pkl'):
shutil.copy(pkl, caseDir)

time_data = np.loadtxt(case + '{}.{}.txt'.format(segments*nSteps, adj_write_interval))
time_data = time_data[start*nSteps: (start + 1)*nSteps]
np.savetxt(caseDir + '{}.{}.txt'.format(nSteps, adj_write_interval), time_data)

# write initial field
ntime = float(times[-1][:-5])
writeFields(initFields, caseDir, ntime)

# modify problem file
problemFile = caseDir + problem
with open(problemFile, 'r') as f:
lines = f.readlines()
with open(problemFile, 'w') as f:
for line in lines:
writeLine = line.replace('NSTEPS', str(nSteps))
writeLine = writeLine.replace('STARTTIME', times[0][:-5])
writeLine = writeLine.replace('CASEDIR', '\'{}\''.format(caseDir))
f.write(writeLine)

outputFile = caseDir + 'output.log'
with open(outputFile, 'w') as f:
if spawnJob(sys.executable, [program, problemFile], stdout=f, stderr=f, interprocess=interprocess):
raise Exception('Execution failed, check error log:', outputFile)

# read final fields
lastTime = float(times[0][:-5])
finalFields = getInternalFields(caseDir, lastTime)
# read objective values
print caseDir

return finalFields

from multiprocessing import Manager, Pool
manager = Manager()
interprocess = [manager.Lock(), manager.dict()]

if __name__ == '__main__':
u0 = getInternalFields(case, time)
#runCase(u0, parameters, steps, 'random')
V = np.random.rand(u0.shape[0], dims)
Rs = []
for i in range(0, segments):
Vn = []
res = []
print i
threads = Pool(nRuns)
for j in range(0, dims):
run_id = 'segment{}_perturb{}'.format(i,j)
v0 = V[:,j]
res.append(threads.apply_async(runCase, (v0, steps, i, run_id, interprocess)))
for j in range(0, dims):
Vn.append(res[j].get())
V = np.array(Vn).T
Q, R = np.linalg.qr(V)
V = Q[:]
Rs.append(R)
threads.close()

with open(case + '/checkpoint/m{}_{}'.format(dims, i+1), 'w') as f:
pickle.dump([V, Rs], f)

80 changes: 41 additions & 39 deletions apps/adFVM_mpi.py
100644 → 100755
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
#!/usr/bin/python2 -u
import os
import sys
import subprocess
import numpy as np
import shutil
import glob
sys.setrecursionlimit(10000)

def isfloat(s):
try:
Expand All @@ -14,20 +16,20 @@ def isfloat(s):
fileName = os.path.abspath(__file__)
python = sys.executable

nProcessors = 128
nProcsPerNode = 8
nRuns = 32
nProcessors = 16
nProcsPerNode = 16
nRuns = 4
subBlockShape = '1x2x2x2x2'

parameter = 1.0
dims = 128
dims = 120
segments = 20
steps = 5000

time = 1.0
source = '/projects/LESOpt/talnikar/'
problem = 'periodic_wake_fds.py'
case = source + 'periodic_wake/'
source = '/master/home/talnikar/'
problem = 'periodic_wake.py'
case = '/scratch/talnikar/periodic_wake/'

def getTime(time):
stime = str(time)
Expand All @@ -37,7 +39,7 @@ def getTime(time):
stime = getTime(time)

fieldNames = ['rho', 'rhoU', 'rhoE']
program = source + 'local/src/adFVM/apps/problem.py'
program = source + 'adFVM/apps/problem.py'

reference = [1., 200., 2e5]

Expand All @@ -46,8 +48,10 @@ def getParallelInfo():
from mpi4py import MPI
mpi = MPI.COMM_WORLD
rank = mpi.rank
#print rank, mpi.Get_size(), MPI.Get_processor_name()
sys.stdout.flush()

with h5py.File(case + 'mesh.hdf5', 'r') as mesh:
with h5py.File(case + 'mesh.hdf5', 'r', driver='mpio', comm=mpi) as mesh:
nCount = mesh['parallel/end'][rank]-mesh['parallel/start'][rank]
nInternalCells = nCount[4]
nGhostCells = nCount[2]-nCount[3]
Expand Down Expand Up @@ -99,26 +103,19 @@ def getHostDir(run_id):
return '{}/temp/{}/'.format(case, run_id)

def spawnJob(exe, args, **kwargs):
global cobalt
if 'interprocess' in kwargs:
cobalt.interprocess = kwargs['interprocess']
del kwargs['interprocess']
block, corner = cobalt.get_alloc()
print('spawnJob', exe, args, block, corner)
returncode = subprocess.call(['runjob', '-n', str(nProcessors),
'-p', str(nProcsPerNode),
'--block', block,
'--corner', corner,
'--shape', subBlockShape,
'--exp-env', 'PYTHONPATH',
'--verbose', 'INFO',
':', exe] + args, **kwargs)
cobalt.free_alloc((block, corner))
from fds.slurm import grab_from_SLURM_NODELIST
interprocess = kwargs['interprocess']
del kwargs['interprocess']
nodes = grab_from_SLURM_NODELIST(1, interprocess)
print('spawnJob', nodes, exe, args)
returncode = subprocess.call(['mpirun', '--host', ','.join(nodes.grabbed_nodes)
, exe] + args, **kwargs)
nodes.release()
#returncode = subprocess.call(['mpirun', '-np', str(nProcessors), exe] + args, **kwargs)
return returncode

def runCase(initFields, parameter, nSteps, run_id, interprocess):
cobalt.interprocess = interprocess
#cobalt.interprocess = interprocess

# generate case folders
caseDir = getHostDir(run_id)
Expand All @@ -133,7 +130,7 @@ def runCase(initFields, parameter, nSteps, run_id, interprocess):
# write initial field
outputFile = caseDir + 'writeFields.log'
with open(outputFile, 'w') as f:
if spawnJob(python, [fileName, 'RUN', 'writeFields', initFields, caseDir, str(time)], stdout=f, stderr=f):
if spawnJob(python, [fileName, 'RUN', 'writeFields', initFields, caseDir, str(time)], stdout=f, stderr=f, interprocess=interprocess):
raise Exception('initial field conversion failed')
print('initial field written', initFields)

Expand All @@ -154,7 +151,7 @@ def runCase(initFields, parameter, nSteps, run_id, interprocess):
errorFile = caseDir + 'error.log'
with open(outputFile, 'w') as f, open(errorFile, 'w') as fe:
#if spawnJob(python, [problemFile], stdout=f, stderr=f):
if spawnJob(python, [program, problemFile, '--mira', '-n', '--coresPerNode', str(nProcsPerNode), '--unloadingStages', '4'], stdout=f, stderr=fe):
if spawnJob(python, [program, problemFile, '--coresPerNode', str(nProcsPerNode)], stdout=f, stderr=fe, interprocess=interprocess):
raise Exception('Execution failed, check error log:', outputFile)
print('execution finished', caseDir)

Expand All @@ -164,16 +161,16 @@ def runCase(initFields, parameter, nSteps, run_id, interprocess):
finalFields = caseDir + 'output.h5'
outputFile = caseDir + 'getInternalFields.log'
with open(outputFile, 'w') as f:
if spawnJob(python, [fileName, 'RUN', 'getInternalFields', caseDir, str(lastTime), finalFields], stdout=f, stderr=f):
if spawnJob(python, [fileName, 'RUN', 'getInternalFields', caseDir, str(lastTime), finalFields], stdout=f, stderr=f, interprocess=interprocess):
raise Exception('final field conversion failed')
print('final field written', finalFields)

# read objective values
objectiveSeries = np.loadtxt(caseDir + 'timeSeries.txt')
print caseDir

cobalt.interprocess = None
return finalFields, objectiveSeries[:-1]
#cobalt.interprocess = None
return finalFields, objectiveSeries

if __name__ == '__main__':

Expand All @@ -182,19 +179,24 @@ def runCase(initFields, parameter, nSteps, run_id, interprocess):
args = sys.argv[3:]
func(*args)
else:
from fds.cobalt import CobaltManager
cobalt = CobaltManager(subBlockShape, 128)
cobalt.boot_blocks()
#from fds.cobalt import CobaltManager
#cobalt = CobaltManager(subBlockShape, 128)
#cobalt.boot_blocks()

init = getHostDir('init')
if not os.path.exists(init):
os.makedirs(init)
u0 = init + 'init.h5'
if spawnJob(sys.executable, [fileName, 'RUN', 'getInternalFields', case, str(time), u0]):
raise Exception('final field conversion failed')
with open(init + 'getInternalFields.log', 'w') as f:
if spawnJob(sys.executable, [fileName, 'RUN', 'getInternalFields', case, str(time), u0], stdout=f, stderr=f, interprocess=None):
raise Exception('final field conversion failed')

#runCase(u0, parameter, steps, 'random', None)
from fds import shadowing
shadowing(runCase, u0, parameter, dims, segments, steps, 0, simultaneous_runs=nRuns, get_host_dir=getHostDir, spawn_compute_job=spawnJob)

cobalt.free_blocks()
#from fds import shadowing
#shadowing(runCase, u0, parameter, dims, segments, steps, 0, simultaneous_runs=nRuns, get_host_dir=getHostDir, spawn_compute_job=spawnJob, checkpoint_path=case + '/checkpoint')
from fds import shadowing, continue_shadowing
from fds.checkpoint import *
checkpoint = load_last_checkpoint(case + '/checkpoint', dims)
continue_shadowing(runCase, parameter, checkpoint, segments, steps, simultaneous_runs=nRuns, get_host_dir=getHostDir, spawn_compute_job=spawnJob, checkpoint_path=case + '/checkpoint')

#cobalt.free_blocks()
Loading

0 comments on commit 54af881

Please sign in to comment.