-
Notifications
You must be signed in to change notification settings - Fork 0
/
samples_run.py
executable file
·115 lines (94 loc) · 4.01 KB
/
samples_run.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
#!/usr/bin/python3
'''For each of the acute and chronic models, for each of the 3 SATs,
and for each of 20,000 posterior parameter estimates, run 1
simulation. This produces a file called `samples.h5`.'''
import copy
import itertools
import os
from joblib import delayed, Parallel
import numpy
import pandas
import h5
import herd
import herd.samples
import run
_path = 'samples'
_index = 'time (y)'
def _run_sample(parameters, sample, tmax, path, sample_number, logging_prefix):
'''Run one simulation.'''
filename = os.path.join(path, f'{sample_number}.npy')
if not os.path.exists(filename):
p = copy.copy(parameters)
for (k, v) in sample.items():
setattr(p, k, v)
try:
# Catch AssertionError from failed optimization in
# `herd._initial_conditions._find_hazard_infection()`.
h = herd.Herd(p, run_number=sample_number,
logging_prefix=logging_prefix)
except AssertionError:
# Failed runs will get run with tighter integration tolerances
# in `herd._initial_conditions`.
pass
else:
df = h.run(tmax)
# Save the data for this sample.
numpy.save(filename, df.to_records())
def _run_samples_SAT(model, SAT, tmax, path, logging_prefix):
'''Run many simulations in parallel.'''
path_SAT = os.path.join(path, str(SAT))
os.makedirs(path_SAT, exist_ok=True)
logging_prefix_SAT = logging_prefix + f'SAT {SAT}, '
parameters = herd.Parameters(model=model, SAT=SAT)
samples = herd.samples.load(model=model, SAT=SAT)
return (delayed(_run_sample)(parameters, s, tmax, path_SAT, i,
logging_prefix_SAT)
for (i, s) in samples.iterrows())
def _run_samples_model(model, tmax, path):
path_model = os.path.join(path, model)
os.makedirs(path_model, exist_ok=True)
logging_prefix = f'model {model}, '
return itertools.chain.from_iterable(
_run_samples_SAT(model, SAT, tmax, path_model, logging_prefix)
for SAT in run._SATs)
def run_samples(tmax):
os.makedirs(_path, exist_ok=True)
jobs = itertools.chain.from_iterable(
_run_samples_model(model, tmax, _path)
for model in run._models)
Parallel(n_jobs=-1)(jobs)
def _get_sample_number(filename):
base, _ = os.path.splitext(filename)
return int(base)
def combine():
with h5.HDFStore('samples.h5', mode='a') as store:
# (model, SAT, sample) that are already in `store`.
store_idx = store.get_index().droplevel(_index).unique()
for model in os.listdir(_path):
path_model = os.path.join(_path, model)
for SAT in map(int, sorted(os.listdir(path_model))):
path_SAT = os.path.join(path_model, str(SAT))
# Sort in integer order.
for filename in sorted(os.listdir(path_SAT),
key=_get_sample_number):
sample = _get_sample_number(filename)
if (model, SAT, sample) not in store_idx:
path_sample = os.path.join(path_SAT, filename)
recarray = numpy.load(path_sample)
df = pandas.DataFrame.from_records(recarray,
index=_index)
run._prepend_index_levels(df,
model=model,
SAT=SAT,
sample=sample)
print('Inserting '
+ ', '.join((f'model={model}',
f'SAT={SAT}',
f'sample={sample}'))
+ '.')
store.put(df, min_itemsize=run._min_itemsize)
# os.remove(path_sample)
if __name__ == '__main__':
tmax = 10
run_samples(tmax)
# combine()