/
viirs2awips.py
534 lines (470 loc) · 21.1 KB
/
viirs2awips.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
#!/usr/bin/env python
# encoding: utf-8
"""Script that uses the `polar2grid` toolbox of modules to take VIIRS
hdf5 (.h5) files and create a properly scaled AWIPS compatible NetCDF file.
:author: David Hoese (davidh)
:contact: david.hoese@ssec.wisc.edu
:organization: Space Science and Engineering Center (SSEC)
:copyright: Copyright (c) 2012 University of Wisconsin SSEC. All rights reserved.
:date: Jan 2012
:license: GNU GPLv3
:revision: $Id$
"""
__docformat__ = "restructuredtext en"
from polar2grid.core import Workspace
from polar2grid.core.constants import *
from polar2grid.viirs import Frontend
from .grids.grids import determine_grid_coverage_fbf,get_grid_info
from .remap import remap_bands
from .awips import Backend
import os
import sys
import logging
from multiprocessing import Process
from glob import glob
log = logging.getLogger(__name__)
LOG_FN = os.environ.get("VIIRS2AWIPS_LOG", "./viirs2awips.log")
def setup_logging(console_level=logging.INFO):
"""Setup the logger to the console to the logging level defined in the
command line (default INFO). Sets up a file logging for everything,
regardless of command line level specified. Adds extra logger for
tracebacks to go to the log file if the exception is caught. See
`exc_handler` for more information.
:Keywords:
console_level : int
Python logging level integer (ex. logging.INFO).
"""
root_logger = logging.getLogger('')
root_logger.setLevel(logging.DEBUG)
# Console output is minimal
console = logging.StreamHandler(sys.stderr)
console_format = "%(levelname)-8s : %(message)s"
console.setFormatter(logging.Formatter(console_format))
console.setLevel(console_level)
root_logger.addHandler(console)
# Log file messages have a lot more information
file_handler = logging.FileHandler(LOG_FN)
file_format = "[%(asctime)s] : %(levelname)-8s : %(name)s : %(funcName)s : %(message)s"
file_handler.setFormatter(logging.Formatter(file_format))
file_handler.setLevel(logging.DEBUG)
root_logger.addHandler(file_handler)
# Make a traceback logger specifically for adding tracebacks to log file
traceback_log = logging.getLogger('traceback')
traceback_log.propagate = False
traceback_log.setLevel(logging.ERROR)
traceback_log.addHandler(file_handler)
def exc_handler(exc_type, exc_value, traceback):
"""An execption handler/hook that will only be called if an exception
isn't called. This will save us from print tracebacks or unrecognizable
errors to the user's console.
Note, however, that this doesn't effect code in a separate process as the
exception never gets raised in the parent.
"""
logging.getLogger(__name__).error(exc_value)
logging.getLogger('traceback').error(exc_value, exc_info=(exc_type,exc_value,traceback))
def _force_symlink(dst, linkname):
"""Create a symbolic link named `linkname` pointing to `dst`. If the
symbolic link already exists, remove it and create the new one.
:Parameters:
dst : str
Filename to be pointed to.
linkname : str
Filename of the symbolic link being created or overwritten.
"""
if os.path.exists(linkname):
log.info("Removing old file %s" % linkname)
os.remove(linkname)
log.debug("Symlinking %s -> %s" % (linkname,dst))
os.symlink(dst, linkname)
def _safe_remove(fn):
"""Remove the file `fn` if you can, if not log an error message,
but continue on.
:Parameters:
fn : str
Filename of the file to be removed.
"""
try:
log.info("Removing %s" % fn)
os.remove(fn)
except StandardError:
log.error("Could not remove %s" % fn)
def remove_products():
"""Remove as many of the possible files that were created from a previous
run of this script, including temporary files.
:note:
This does not remove the log file because it requires the log file
to report what's being removed.
"""
for f in glob(".lat*"):
_safe_remove(f)
for f in glob(".lon*"):
_safe_remove(f)
for f in glob(".mode*"):
_safe_remove(f)
for f in glob(".image*"):
_safe_remove(f)
for f in glob("latitude*.real4.*"):
_safe_remove(f)
for f in glob("longitude*.real4.*"):
_safe_remove(f)
for f in glob("image*.real4.*"):
_safe_remove(f)
for f in glob("mode_*.real4.*"):
_safe_remove(f)
for f in glob("prescale_*.real4.*"):
_safe_remove(f)
for f in glob("ll2cr_*.real4.*"):
_safe_remove(f)
for f in glob("ll2cr_*.img"):
_safe_remove(f)
for f in glob("result*.real4.*"):
_safe_remove(f)
for f in glob("SSEC_AWIPS_VIIRS*"):
_safe_remove(f)
def create_grid_jobs(sat, instrument, bands, fbf_lat, fbf_lon, backend,
forced_grids=None):
"""
TODO, documentation
"""
# Check what grids the backend can handle
all_possible_grids = set()
for band_kind, band_id in bands.keys():
this_band_can_handle = backend.can_handle_inputs(sat, instrument, band_kind, band_id, bands[(band_kind, band_id)]["data_kind"])
bands[(band_kind, band_id)]["grids"] = this_band_can_handle
if isinstance(this_band_can_handle, str):
all_possible_grids.update([this_band_can_handle])
else:
all_possible_grids.update(this_band_can_handle)
log.debug("Kind %s Band %s can handle these grids: '%r'" % (band_kind, band_id, this_band_can_handle))
# Get the set of grids we will use
if forced_grids is not None:
if isinstance(forced_grids, list): grids = forced_grids
else: grids = [forced_grids]
grids = set(grids)
else:
# Check if the data fits in the grids
all_useful_grids = determine_grid_coverage_fbf(fbf_lon, fbf_lat, list(all_possible_grids))
grids = set(all_useful_grids)
# Figure out which grids are useful for data coverage (or forced grids) and the backend can support
grid_infos = dict((g,get_grid_info(g)) for g in grids)# if g not in [GRIDS_ANY,GRIDS_ANY_GPD,GRIDS_ANY_PROJ4])
for band_kind, band_id in bands.keys():
if bands [(band_kind, band_id)]["grids"] == GRIDS_ANY:
bands [(band_kind, band_id)]["grids"] = list(grids)
elif bands[(band_kind, band_id)]["grids"] == GRIDS_ANY_PROJ4:
bands [(band_kind, band_id)]["grids"] = [ g for g in grids if grid_infos[g]["grid_kind"] == GRID_KIND_PROJ4 ]
elif bands[(band_kind, band_id)]["grids"] == GRIDS_ANY_GPD:
bands [(band_kind, band_id)]["grids"] = [ g for g in grids if grid_infos[g]["grid_kind"] == GRID_KIND_GPD ]
elif len(bands[(band_kind, band_id)]["grids"]) == 0:
log.error("The backend does not support kind %s band %s, won't add to job list..." % (band_kind, band_id))
# Handled in the next for loop via the inner for loop not adding anything
else:
bands[(band_kind, band_id)]["grids"] = grids.intersection(bands[(band_kind, band_id)]["grids"])
bad_grids = grids - set(bands[(band_kind, band_id)]["grids"])
if len(bad_grids) != 0 and forced_grids is not None:
log.error("Backend does not know how to handle grids '%r'" % list(bad_grids))
raise ValueError("Backend does not know how to handle grids '%r'" % list(bad_grids))
# Create "grid" jobs to be run through remapping
# Jobs are per grid per band
grid_jobs = {}
for band_kind, band_id in bands.keys():
for grid_name in bands[(band_kind, band_id)]["grids"]:
if grid_name not in grid_jobs: grid_jobs[grid_name] = {}
if (band_kind, band_id) not in grid_jobs[grid_name]: grid_jobs[grid_name][(band_kind, band_id)] = {}
log.debug("Kind %s band %s will be remapped to grid %s" % (band_kind, band_id, grid_name))
grid_jobs[grid_name][(band_kind, band_id)] = bands[(band_kind, band_id)].copy()
if len(grid_jobs) == 0:
msg = "No backend compatible grids were found to fit the data set"
log.error(msg)
raise ValueError(msg)
return grid_jobs
def process_data_sets(filepaths,
fornav_D=None, fornav_d=None,
forced_grid=None,
forced_gpd=None, forced_nc=None,
create_pseudo=True,
num_procs=1,
rescale_config=None,
backend_config=None,
new_dnb=False # XXX
):
"""Process all the files provided from start to finish,
from filename to AWIPS NC file.
"""
status_to_return = STATUS_SUCCESS
# Load any configuration files needed
frontend = Frontend()
backend = Backend(rescale_config=rescale_config, backend_config=backend_config)
# Extract Swaths
log.info("Extracting swaths...")
try:
meta_data = frontend.make_swaths(
filepaths,
scale_dnb=True,
new_dnb=new_dnb,
create_fog=True,
cut_bad=True
)
# Let's be lazy and give names to the 'global' viirs info
sat = meta_data["sat"]
instrument = meta_data["instrument"]
start_time = meta_data["start_time"]
bands = meta_data["bands"]
fbf_lat = meta_data["fbf_lat"]
fbf_lon = meta_data["fbf_lon"]
nav_set_uid = meta_data["nav_set_uid"]
except StandardError:
log.error("Swath creation failed")
log.debug("Swath creation error:", exc_info=1)
status_to_return |= STATUS_FRONTEND_FAIL
return status_to_return
if len(bands) == 0:
log.error("No more bands to process, quitting...")
return status_to_return or STATUS_UNKNOWN_FAIL
# Determine grid
try:
log.info("Determining what grids the data fits in...")
grid_jobs = create_grid_jobs(sat, instrument, bands, fbf_lat, fbf_lon, backend,
forced_grids=forced_grid)
except StandardError:
log.debug("Grid Determination error:", exc_info=1)
log.error("Determining data's grids failed")
status_to_return |= STATUS_GDETER_FAIL
return status_to_return
### Remap the data
try:
remapped_jobs = remap_bands(sat, instrument, nav_set_uid,
fbf_lon, fbf_lat, grid_jobs,
num_procs=num_procs, fornav_d=fornav_d, fornav_D=fornav_D,
lat_fill_value=meta_data.get("lat_fill_value", None),
lon_fill_value=meta_data.get("lon_fill_value", None),
lat_south=meta_data.get("lat_south", None),
lat_north=meta_data.get("lat_north", None),
lon_west=meta_data.get("lon_west", None),
lon_east=meta_data.get("lon_east", None)
)
except StandardError:
log.debug("Remapping Error:", exc_info=1)
log.error("Remapping data failed")
status_to_return |= STATUS_REMAP_FAIL
return status_to_return
### BACKEND ###
W = Workspace('.')
for grid_name,grid_dict in remapped_jobs.items():
for (band_kind, band_id),band_dict in grid_dict.items():
log.info("Running AWIPS backend for %s%s band grid %s" % (band_kind, band_id, grid_name))
try:
# Get the data from the flat binary file
data = getattr(W, band_dict["fbf_remapped"].split(".")[0]).copy()
# Call the backend
backend.create_product(
sat,
instrument,
band_kind,
band_id,
band_dict["data_kind"],
data,
start_time=start_time,
grid_name=grid_name,
ncml_template=forced_nc or None,
fill_value=band_dict.get("fill_value", None)
)
except StandardError:
log.error("Error in the AWIPS backend for %s%s in grid %s" % (band_kind, band_id, grid_name))
log.debug("AWIPS backend error:", exc_info=1)
del remapped_jobs[grid_name][(band_kind, band_id)]
if len(remapped_jobs[grid_name]) == 0:
log.error("All backend jobs for grid %s failed" % (grid_name,))
del remapped_jobs[grid_name]
if len(remapped_jobs) == 0:
log.warning("AWIPS backend failed for all grids for bands %r" % (bands.keys(),))
status_to_return |= STATUS_BACKEND_FAIL
log.info("Processing of bands %r is complete" % (bands.keys(),))
return status_to_return
def _process_data_sets(*args, **kwargs):
"""Wrapper function around `process_data_sets` so that it can called
properly from `run_viir2awips`, where the exitcode is the actual
returned value from `process_data_sets`.
This function also checks for exceptions other than the ones already
checked for in `process_data_sets` so that they are
recorded properly.
"""
try:
stat = process_data_sets(*args, **kwargs)
sys.exit(stat)
except MemoryError:
log.error("viirs2awips ran out of memory, check log file for more info")
log.debug("Memory error:", exc_info=1)
except OSError:
log.error("viirs2awips had a OS error, check log file for more info")
log.debug("OS error:", exc_info=1)
except StandardError:
log.error("viirs2awips had an unexpected error, check log file for more info")
log.debug("Unexpected/Uncaught error:", exc_info=1)
except KeyboardInterrupt:
log.info("viirs2awips was cancelled by a keyboard interrupt")
sys.exit(-1)
def run_glue(filepaths,
multiprocess=True, **kwargs
):
"""Separate input files into groups that share navigation files data.
Call the processing function in separate process or same process depending
on value of `multiprocess` keyword.
"""
# Rewrite/force parameters to specific format
filepaths = [ os.path.abspath(os.path.expanduser(x)) for x in sorted(filepaths) ]
M_files = sorted(set([ x for x in filepaths if os.path.split(x)[1].startswith("SVM") ]))
I_files = sorted(set([ x for x in filepaths if os.path.split(x)[1].startswith("SVI") ]))
DNB_files = sorted(set([ x for x in filepaths if os.path.split(x)[1].startswith("SVDNB") ]))
all_used = set(M_files + I_files + DNB_files)
all_provided = set(filepaths)
not_used = all_provided - all_used
if len(not_used):
log.warning("Didn't know what to do with\n%s" % "\n".join(list(not_used)))
pM = None
pI = None
pDNB = None
exit_status = 0
if len(M_files) != 0:
log.debug("Processing M files")
try:
if multiprocess:
pM = Process(target=_process_data_sets,
args = (M_files,),
kwargs = kwargs
)
pM.start()
else:
stat = _process_data_sets(M_files, **kwargs)
exit_status = exit_status or stat
except StandardError:
log.error("Could not process M files")
exit_status = exit_status or len(M_files)
if len(I_files) != 0:
log.debug("Processing I files")
try:
if multiprocess:
pI = Process(target=_process_data_sets,
args = (I_files,),
kwargs = kwargs
)
pI.start()
else:
stat = _process_data_sets(I_files, **kwargs)
exit_status = exit_status or stat
except StandardError:
log.error("Could not process I files")
exit_status = exit_status or len(I_files)
if len(DNB_files) != 0:
log.debug("Processing DNB files")
try:
if multiprocess:
pDNB = Process(target=_process_data_sets,
args = (DNB_files,),
kwargs = kwargs
)
pDNB.start()
else:
stat = _process_data_sets(DNB_files, **kwargs)
exit_status = exit_status or stat
except StandardError:
log.error("Could not process DNB files")
exit_status = exit_status or len(DNB_files)
log.debug("Waiting for subprocesses")
if pM is not None:
pM.join()
stat = pM.exitcode
exit_status = exit_status or stat
if pI is not None:
pI.join()
stat = pI.exitcode
exit_status = exit_status or stat
if pDNB is not None:
pDNB.join()
stat = pDNB.exitcode
exit_status = exit_status or stat
return exit_status
def main():
import argparse
description = """
Create VIIRS swaths, remap them to a grid, and place that remapped data
into a AWIPS compatible netcdf file.
"""
parser = argparse.ArgumentParser(description=description)
parser.add_argument('-v', '--verbose', dest='verbosity', action="count", default=0,
help='each occurrence increases verbosity 1 level through ERROR-WARNING-INFO-DEBUG (default INFO)')
parser.add_argument('--fornav-D', dest='fornav_D', default=40,
help="Specify the -D option for fornav")
parser.add_argument('--fornav-d', dest='fornav_d', default=2,
help="Specify the -d option for fornav")
parser.add_argument('-f', dest='data_files', nargs="+",
help="List of one or more hdf files")
parser.add_argument('-d', dest='data_dir', nargs="?",
help="Data directory to look for input data files")
parser.add_argument('--sp', dest='single_process', default=False, action='store_true',
help="Processing is sequential instead of one process per kind of band")
parser.add_argument('--num-procs', dest="num_procs", default=1,
help="Specify number of processes that can be used to run ll2cr/fornav calls in parallel")
parser.add_argument('-k', '--keep', dest='remove_prev', default=True, action='store_true',
help="Don't delete any files that were previously made (WARNING: processing may not run successfully)")
parser.add_argument('--no-pseudo', dest='create_pseudo', default=True, action='store_false',
help="Don't create pseudo bands")
parser.add_argument('--rescale-config', dest='rescale_config', default=None,
help="specify alternate rescale configuration file")
parser.add_argument('--new-dnb', dest='new_dnb', default=False, action='store_true',
help="run new DNB scaling if provided DNB data (temporary)") # XXX
# Remapping/Grids
parser.add_argument('-g', '--grids', dest='forced_grids', nargs="+", default="all",
help="Force remapping to only some grids, defaults to 'all', use 'all' for determination")
parser.add_argument('--gpd', dest='forced_gpd', default=None,
help="Specify a different gpd file to use")
# Backend Specific
parser.add_argument('--nc', dest='forced_nc', default=None,
help="Specify a different ncml file to use")
parser.add_argument('--backend-config', dest='backend_config', default=None,
help="specify alternate backend configuration file")
args = parser.parse_args()
levels = [logging.ERROR, logging.WARN, logging.INFO, logging.DEBUG]
setup_logging(console_level=levels[min(3, args.verbosity)])
# Don't set this up until after you have setup logging
sys.excepthook = exc_handler
fornav_D = int(args.fornav_D)
fornav_d = int(args.fornav_d)
num_procs = int(args.num_procs)
forced_grids = args.forced_grids
if forced_grids == 'all': forced_grids = None
if args.forced_gpd is not None:
args.forced_gpd = os.path.realpath(os.path.expanduser(args.forced_gpd))
if not os.path.exists(args.forced_gpd):
log.error("Specified gpd file does not exist '%s'" % args.forced_gpd)
return -1
if args.forced_nc is not None:
args.forced_nc = os.path.realpath(os.path.expanduser(args.forced_nc))
if not os.path.exists(args.forced_nc):
log.error("Specified nc file does not exist '%s'" % args.forced_nc)
return -1
if args.data_files:
hdf_files = args.data_files[:]
elif args.data_dir:
base_dir = os.path.abspath(os.path.expanduser(args.data_dir))
hdf_files = [ os.path.join(base_dir,x) for x in os.listdir(base_dir) if x.startswith("SV") and x.endswith(".h5") ]
else:
log.error("Wrong number of arguments")
parser.print_help()
return -1
# Handle the user using a '~' for their home directory
hdf_files = [ os.path.realpath(os.path.expanduser(x)) for x in hdf_files ]
if args.remove_prev:
log.debug("Removing any previous files")
remove_products()
stat = run_glue(hdf_files, fornav_D=fornav_D, fornav_d=fornav_d,
forced_grid=forced_grids,
forced_gpd=args.forced_gpd, forced_nc=args.forced_nc,
create_pseudo=args.create_pseudo,
multiprocess=not args.single_process, num_procs=num_procs,
rescale_config=args.rescale_config,
backend_config=args.backend_config,
new_dnb=args.new_dnb # XXX
)
return stat
if __name__ == "__main__":
sys.exit(main())