Skip to content

Commit

Permalink
Better memory usage in new style viirs sdr reader
Browse files Browse the repository at this point in the history
  • Loading branch information
djhoese committed Aug 10, 2015
1 parent 21cd5ab commit 975500b
Showing 1 changed file with 57 additions and 59 deletions.
116 changes: 57 additions & 59 deletions mpop/readers/viirs_sdr.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,28 +67,17 @@ def __new__(cls, name, variable_name, scaling_factors=None, dtype=np.float32, **


class HDF5MetaData(object):

"""Small class for inspecting a HDF5 file and retrieve its metadata/header data.
"""
Small class for inspecting a HDF5 file and retrieve its metadata/header
data. It is developed for JPSS/NPP data but is really generic and should
work on most other hdf5 files.
Supports
"""

def __init__(self, filename, **kwargs):
self.metadata = {}
self.filename = filename
if not os.path.exists(filename):
raise IOError("File %s does not exist!" % filename)
self._file_handle = h5py.File(self.filename, 'r')
self._file_handle.visititems(self.collect_metadata)
self._collect_attrs('/', self._file_handle.attrs)

def close(self):
self._file_handle.close()
file_handle = h5py.File(self.filename, 'r')
file_handle.visititems(self.collect_metadata)
self._collect_attrs('/', file_handle.attrs)
file_handle.close()

def _collect_attrs(self, name, attrs):
for key, value in attrs.iteritems():
Expand All @@ -101,10 +90,15 @@ def _collect_attrs(self, name, attrs):
def collect_metadata(self, name, obj):
if isinstance(obj, h5py.Dataset):
self.metadata[name] = obj
self.metadata[name + "/shape"] = obj.shape
self._collect_attrs(name, obj.attrs)

def __getitem__(self, key):
return self.metadata[key]
val = self.metadata[key]
if isinstance(val, h5py.Dataset):
# these datasets are closed and inaccessible when the file is closed, need to reopen
return h5py.File(self.filename, 'r')[key].value
return val

def keys(self):
return self.metadata.keys()
Expand Down Expand Up @@ -133,7 +127,9 @@ def __init__(self, file_type, filename, file_keys=None, **kwargs):
self.end_time = self.get_end_time()

def __getitem__(self, item):
if item in self.file_keys:
if item.endswith("/shape") and item[:-6] in self.file_keys:
item = self.file_keys[item[:-6]].variable_name.format(**self.file_info) + "/shape"
elif item in self.file_keys:
item = self.file_keys[item].variable_name.format(**self.file_info)

return super(SDRFileReader, self).__getitem__(item)
Expand Down Expand Up @@ -177,23 +173,28 @@ def get_unit(self, calibrate=1):

return None

def scale_swath_data(self, data, scaling_factors):
def scale_swath_data(self, data, mask, scaling_factors):
"""Scale swath data using scaling factors and offsets.
Multi-granule (a.k.a. aggregated) files will have more than the usual two values.
"""
num_grans = len(scaling_factors)/2
gran_size = data.shape[0]/num_grans
scaling_mask = np.zeros(data.shape, dtype=np.bool)
for i in range(num_grans):
start_idx = i * gran_size
end_idx = start_idx + gran_size
m = scaling_factors[i*2]
b = scaling_factors[i*2 + 1]
# in rare cases the scaling factors are actually fill values
if m <= -999 or b <= -999:
scaling_mask[start_idx:end_idx] = 1
mask[start_idx:end_idx] = 1
else:
data[start_idx:end_idx] = m * data[start_idx:end_idx] + b
data[start_idx:end_idx] *= m
data[start_idx:end_idx] += b

return data, scaling_mask
return data, mask

def get_swath_data(self, item, filename=False):
def get_swath_data(self, item, filename=False, data_out=None, mask_out=None):
"""Get swath data, apply proper scalings, and apply proper masks.
"""
if filename:
Expand All @@ -204,36 +205,35 @@ def get_swath_data(self, item, filename=False):

# Can't guarantee proper file info until we get the data first
var_info = self.file_keys[item]
# This assumes that we aren't promoting the dtypes (ex. float file data -> int array)
data = self[item].value
is_floating = np.issubdtype(data.dtype, np.floating)
data = data.astype(var_info.dtype)
data = self[item]
is_floating = np.issubdtype(data_out.dtype, np.floating)
if data_out is not None:
# This assumes that we are promoting the dtypes (ex. float file data -> int array)
# and that it happens automatically when assigning to the existing out array
data_out[:] = data
else:
data_out = data[:].astype(var_info.dtype)
mask_out = np.zeros_like(data_out, dtype=np.bool)

try:
factors = self[var_info.scaling_factors]
factors = self[var_info.scaling_factors] if var_info.scaling_factors is not None else None

This comment has been minimized.

Copy link
@mraspaud

mraspaud Aug 10, 2015

Member

I would've use a try except... or get if possible...

This comment has been minimized.

Copy link
@djhoese

djhoese Aug 10, 2015

Author Member

My thought was that the scaling_factors might not exist in the configuration file (like for longitude and latitude) so it is clearer to check if they aren't None and that we are actually getting something before trying to get the data from the file.

except KeyError:
LOG.debug("No scaling factors found for %s", item)
factors = None

# If the data is an integer then we mask everything >= fill_min_int
fill_min = int(var_info.kwargs.get("fill_min_int", 65528))
# If the data is a float then we mask everything <= -999.0
fill_max = float(var_info.kwargs.get("fill_max_float", -999.0))

if is_floating:
mask = data <= fill_max
# If the data is a float then we mask everything <= -999.0
fill_max = float(var_info.kwargs.get("fill_max_float", -999.0))
mask_out[:] = data_out <= fill_max
else:
mask = data >= fill_min
# If the data is an integer then we mask everything >= fill_min_int
fill_min = int(var_info.kwargs.get("fill_min_int", 65528))
mask_out[:] = data_out >= fill_min

if factors:
data, scaling_mask = self.scale_swath_data(data, factors)
mask |= scaling_mask
if factors is not None:
data_out, scaling_mask = self.scale_swath_data(data_out, mask_out, factors)

# FIXME: If dtype is int then this won't work, return masked array?
data[mask] = np.nan
# data[mask] = -999.0

return data
return data_out, mask_out


class MultiFileReader(object):
Expand All @@ -259,31 +259,29 @@ def get_swath_data(self, item, extra_mask=None, filename=None):
raise RuntimeError("Can not get swath data when no file key information was found")

var_info = self.file_keys[item]
shapes = [x[item].shape for x in self.file_readers]
num_rows = sum([x[0] for x in shapes])
num_cols = shapes[0][1]
granule_shapes = [x[item + "/shape"] for x in self.file_readers]
num_rows = sum([x[0] for x in granule_shapes])
num_cols = granule_shapes[0][1]

if filename:
raise NotImplementedError("Saving data arrays to disk is not supported yet")
data = np.memmap(filename, dtype=var_info.dtype, mode='w', shape=(num_rows, num_cols))
# data = np.memmap(filename, dtype=var_info.dtype, mode='w', shape=(num_rows, num_cols))
else:
data = np.empty((num_rows, num_cols), dtype=var_info.dtype)
if extra_mask is not None:
mask = extra_mask.copy()
else:
mask = np.zeros_like(data, dtype=np.bool)

idx = 0
for file_reader in self.file_readers:
for granule_shape, file_reader in zip(granule_shapes, self.file_readers):
# Get the data from each individual file reader (assumes it gets the data with the right data type)
tmp_data = file_reader.get_swath_data(item, filename=filename)
data[idx: idx+tmp_data.shape[0]] = tmp_data
idx += tmp_data.shape[0]
file_reader.get_swath_data(item, filename=filename,
data_out=data[idx: idx + granule_shape[0]],
mask_out=mask[idx: idx + granule_shape[0]])
idx += granule_shape[0]

# FIXME: This may get ugly when using memmaps, maybe move projectable creation here instead
# data[np.isnan(data)] = 0
print(data.min())
print(data.max())

mask = np.isnan(data)
if extra_mask is not None:
mask |= extra_mask
return np.ma.array(data, mask=mask, copy=False)


Expand Down

0 comments on commit 975500b

Please sign in to comment.