forked from gem/oq-engine
/
dump_hazards.py
333 lines (290 loc) · 12.6 KB
/
dump_hazards.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (c) 2013, GEM Foundation
# OpenQuake is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# OpenQuake is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with OpenQuake. If not, see <http://www.gnu.org/licenses/>.
"""
A script to dump hazard outputs. If you launch it with a given
hazard_calculation_id, it will dump all the hazard outputs relevant for
risk calculations in a directory named hc<hazard-calculation-id>.
The directory can then be moved around and restored in a different
database with the companion script restore_hazards.py.
Internally the dump and restore procedures are based on
COPY TO and COPY FROM commands, so they are quite performant
even for large datasets. They cannot trivially be extended to perform
binary dump/restore since the geography type has no binary form in
PostGIS 1.5.
To restore a hazard computation and all of its outputs into a new database
run ``python restore_hazards.py <directory> <host> <dbname> <user> <password>``
The <user> must have sufficient permissions to write on <dbname>. If
your database already contains a hazard calculation with the same id,
the restore will not override it. If you think that the hazard
calculation on your database is not important and can removed together
with all of its outputs, then remove it by using ``bin/openquake
--delete-hazard-calculation`` (which must be run by a user with
sufficient permissions). Then run again ``restore_hazards.py``.
"""
import os
import shutil
import tarfile
import argparse
import psycopg2
import tempfile
import logging
log = logging.getLogger()
# return a string which is a valid SQL tuple
def _tuplestr(tup):
return '(%s)' % ', '.join(str(x) for x in tup)
class Copier(object):
"""
Small wrapper around a psycopg2 cursor, which a .copy method
writing directly to csv files. It remembers the copied filenames,
which are stored in the attribute .filenames.
"""
def __init__(self, psycopg2_cursor):
self._cursor = psycopg2_cursor
self.filenames = []
def tuplestr(self, query, *args):
"""Retrieve tuples of ids a strings"""
self._cursor.execute(query, args)
return _tuplestr(row[0] for row in self._cursor)
def fetchall(self, query, *args):
"""Dispatch to .fetchall"""
self._cursor.execute(query, args)
return self._cursor.fetchall()
def copy(self, query, dest, name, mode):
"""
Performs a COPY TO/FROM operation. <Works directly with csv files.
:param str query: the COPY query
:param str dest: the destination directory
:param str name: the destination file name
:param chr mode: 'w' (for COPY TO) or 'r' (for COPY FROM)
"""
fname = os.path.join(dest, name)
log.info('%s\n(-> %s)', query, fname)
# here is some trick to avoid storing filename and timestamp info
with open(fname, mode) as fileobj:
self._cursor.copy_expert(query, fileobj)
if fname not in self.filenames:
self.filenames.append(fname)
class HazardDumper(object):
"""
Class to dump a hazard_calculation and related tables.
The typical usage is
hd = HazardDumper(conn, '/tmp/somedir')
hd.dump(42) # dump the hazard computation #42
print hd.mktar() # generate a tarfile
"""
def __init__(self, conn, outdir=None):
self.conn = conn
self.curs = Copier(conn.cursor())
outdir = outdir or "/tmp/hc-dump"
if os.path.exists(outdir):
# cleanup previously dumped archives, if any
for fname in os.listdir(outdir):
if fname.endswith('.csv'):
os.remove(os.path.join(outdir, fname))
else:
os.mkdir(outdir)
self.outdir = outdir
def hazard_calculation(self, ids):
"""Dump hazard_calculation, lt_realization, hazard_site"""
self.curs.copy(
"""copy (select * from uiapi.hazard_calculation where id in %s)
to stdout
with (format 'csv', header true, encoding 'utf8')""" % ids,
self.outdir, 'uiapi.hazard_calculation.csv', 'w')
self.curs.copy(
"""copy (select * from hzrdr.lt_realization
where hazard_calculation_id in %s)
to stdout
with (format 'csv', header true, encoding 'utf8')""" % ids,
self.outdir, 'hzrdr.lt_realization.csv', 'w')
self.curs.copy(
"""copy (select * from hzrdi.hazard_site
where hazard_calculation_id in %s)
to stdout
with (format 'csv', header true, encoding 'utf8')""" % ids,
self.outdir, 'hzrdi.hazard_site.csv', 'w')
def oq_job(self, ids):
"""Dump hazard_calculation, oq_job"""
hc_ids = self.curs.tuplestr(
'select hazard_calculation_id from uiapi.oq_job where id in %s'
% ids)
if 'None' in hc_ids:
raise TypeError('Job %s is not associated to a hazard calculation!'
% ids)
self.hazard_calculation(hc_ids)
self.curs.copy(
"""copy (select * from uiapi.oq_job where id in %s)
to stdout
with (format 'csv', header true, encoding 'utf8')""" % ids,
self.outdir, 'uiapi.oq_job.csv', 'w')
def output(self, ids):
"""Dump output"""
self.curs.copy(
"""copy (select * from uiapi.output where id in %s)
to stdout
with (format 'csv', header true, encoding 'utf8')""" % ids,
self.outdir, 'uiapi.output.csv', 'w')
def hazard_curve(self, output):
"""Dump hazard_curve, hazard_curve_data"""
self.curs.copy(
"""copy (select * from hzrdr.hazard_curve where output_id in %s)
to stdout
with (format 'csv', header true, encoding 'utf8')""" % output,
self.outdir, 'hzrdr.hazard_curve.csv', 'a')
ids = self.curs.tuplestr(
'select id from hzrdr.hazard_curve where output_id in %s' % output)
self.curs.copy(
"""copy (select * from hzrdr.hazard_curve_data
where hazard_curve_id in {})
to stdout
with (format 'csv', header true, encoding 'utf8')""".format(
ids),
self.outdir, 'hzrdr.hazard_curve_data.csv', 'a')
def gmf(self, output):
"""Dump gmf, gmf_data"""
self.curs.copy(
"""copy (select * from hzrdr.gmf
where output_id in %s)
to stdout
with (format 'csv', header true, encoding 'utf8')""" % output,
self.outdir, 'hzrdr.gmf.csv', 'a')
coll_ids = self.curs.tuplestr('select id from hzrdr.gmf '
'where output_id in %s' % output)
self.curs.copy(
"""copy (select * from hzrdr.gmf_data
where gmf_id in %s)
to stdout
with (format 'csv', header true, encoding 'utf8')""" % coll_ids,
self.outdir, 'hzrdr.gmf_data.csv', 'a')
def ses(self, output):
"""Dump ses_collection, ses, ses_rupture"""
self.curs.copy(
"""copy (select * from hzrdr.ses_collection
where output_id in %s)
to stdout
with (format 'csv', header true, encoding 'utf8')""" % output,
self.outdir, 'hzrdr.ses_collection.csv', 'a')
coll_ids = self.curs.tuplestr('select id from hzrdr.ses_collection '
'where output_id in %s' % output)
self.curs.copy(
"""copy (select * from hzrdr.ses
where ses_collection_id in %s)
to stdout
with (format 'csv', header true, encoding 'utf8')""" % coll_ids,
self.outdir, 'hzrdr.ses.csv', 'a')
ses_ids = self.curs.tuplestr(
'select id from hzrdr.ses where ses_collection_id in %s'
% coll_ids)
self.curs.copy(
"""copy (select * from hzrdr.ses_rupture
where ses_id in %s)
to stdout
with (format 'csv', header true, encoding 'utf8')""" % ses_ids,
self.outdir, 'hzrdr.ses_rupture.csv', 'a')
def dump(self, *hazard_calculation_ids):
"""
Dump all the data associated to a given hazard_calculation_id
and relevant for risk.
"""
ids = _tuplestr(hazard_calculation_ids)
curs = self.curs
# retrieve the last job associated to the given calculation
jobs = curs.tuplestr(
'select max(id) from uiapi.oq_job '
'where hazard_calculation_id in %s' % ids)
outputs = curs.fetchall("""\
select output_type, array_agg(id) from uiapi.output
where oq_job_id in %s group by output_type
having output_type in ('hazard_curve', 'hazard_curve_multi',
'ses', 'gmf', 'gmf_scenario')
""" % jobs)
if not outputs:
raise RuntimeError('No outputs for job %s' % jobs)
# sort the outputs to prevent foreign key errors
ordering = {
'hazard_curve': 1,
'hazard_curve_multi': 2,
'ses': 1,
'gmf': 2,
'gmf_scenario': 2
}
outputs = sorted(outputs, key=lambda o: ordering[o[0]])
# dump data and collect generated filenames
self.oq_job(jobs)
all_outs = sum([output_ids for output_type, output_ids in outputs], [])
self.output(_tuplestr(all_outs))
for output_type, output_ids in outputs:
ids = _tuplestr(output_ids)
print "Dumping %s %s in %s" % (output_type, ids, self.outdir)
if output_type in ['hazard_curve', 'hazard_curve_multi']:
self.hazard_curve(ids)
elif output_type in ('gmf', 'gmf_scenario'):
self.gmf(ids)
elif output_type == 'ses':
self.ses(ids)
# save FILENAMES.txt
filenames = os.path.join(
self.outdir, 'FILENAMES.txt')
with open(filenames, 'w') as f:
f.write('\n'.join(map(os.path.basename, self.curs.filenames)))
# this is not used right now; the functionality could be restored in
# the future (optionally)
def mktar(self):
"""
Tar the contents of outdir into a tarfile and remove the directory
"""
# tar outdir
with tarfile.open(self.outdir + '.tar', 'w') as t:
t.add(self.outdir)
shutil.rmtree(self.outdir)
# return pathname of the generated tarfile
tarname = self.outdir + '.tar'
logging.info('Generated %s', tarname)
return tarname
def main(hazard_calculation_id, outdir=None,
host=None, dbname=None, user=None, password=None, port=None):
"""
Dump a hazard_calculation and its relative outputs
"""
from openquake.engine.db.models import set_django_settings_module
set_django_settings_module()
from django.conf import settings
default_cfg = settings.DATABASES['default']
host = host or default_cfg.get('HOST', 'localhost')
dbname = dbname or default_cfg.get('NAME', 'openquake')
user = default_cfg.get('USER', 'oq_admin')
password = default_cfg.get('PASSWORD', 'openquake')
port = port or str(default_cfg.get('PORT', 5432))
# this is not using the predefined Django connections since
# the typical use case is to dump from a remote database
logging.basicConfig(level=logging.WARN)
conn = psycopg2.connect(
host=host, database=dbname, user=user, password=password, port=port)
hc = HazardDumper(conn, outdir)
hc.dump(hazard_calculation_id)
log.info('Written %s' % hc.outdir)
conn.close()
return hc.outdir
if __name__ == '__main__':
p = argparse.ArgumentParser()
p.add_argument('hazard_calculation_id')
p.add_argument('outdir', nargs='?')
p.add_argument('host', nargs='?')
p.add_argument('dbname', nargs='?')
p.add_argument('user', nargs='?')
p.add_argument('password', nargs='?')
p.add_argument('port', nargs='?')
arg = p.parse_args()
main(arg.hazard_calculation_id, arg.outdir, arg.host,
arg.dbname, arg.user, arg.password, arg.port)