Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Speed up cviirs tiepoint interpolation #941

Merged
merged 11 commits into from Oct 27, 2019
2 changes: 1 addition & 1 deletion satpy/readers/utils.py
Expand Up @@ -46,7 +46,7 @@ def np2str(value):
"""
if hasattr(value, 'dtype') and \
issubclass(value.dtype.type, (np.string_, np.object_)) and value.size == 1:
value = np.asscalar(value)
value = value.item()
if not isinstance(value, str):
# python 3 - was scalar numpy array of bytes
# otherwise python 2 - scalar numpy array of 'str'
Expand Down
266 changes: 152 additions & 114 deletions satpy/readers/viirs_compact.py
Expand Up @@ -17,8 +17,15 @@
# satpy. If not, see <http://www.gnu.org/licenses/>.
"""Compact viirs format.

.. note:: It should be possible to further enhance this reader by performing the
interpolation of the angles and lon/lats at the native dask chunk level.
This is a reader for the Compact VIIRS format shipped on Eumetcast for the
VIIRS SDR. The format is compressed in multiple ways, notably by shipping only
tie-points for geographical data. The interpolation of this data is done using
dask operations, so it should be relatively performant.

For more information on this format, the reader can refer to the
`Compact VIIRS SDR Product Format User Guide` that can be found on this EARS_ page.

.. _EARS: https://www.eumetsat.int/website/home/Data/RegionalDataServiceEARS/EARSVIIRS/index.html

"""

Expand Down Expand Up @@ -88,22 +95,17 @@ def __init__(self, filename, filename_info, filetype_info):
raise IOError('Compact Viirs file type not recognized.')

geo_data = self.h5f["Data_Products"]["VIIRS-%s-GEO" % self.ch_type]["VIIRS-%s-GEO_Gran_0" % self.ch_type]
self.min_lat = np.asscalar(geo_data.attrs['South_Bounding_Coordinate'])
self.max_lat = np.asscalar(geo_data.attrs['North_Bounding_Coordinate'])
self.min_lon = np.asscalar(geo_data.attrs['West_Bounding_Coordinate'])
self.max_lon = np.asscalar(geo_data.attrs['East_Bounding_Coordinate'])
self.min_lat = geo_data.attrs['South_Bounding_Coordinate'].item()
self.max_lat = geo_data.attrs['North_Bounding_Coordinate'].item()
self.min_lon = geo_data.attrs['West_Bounding_Coordinate'].item()
self.max_lon = geo_data.attrs['East_Bounding_Coordinate'].item()

self.switch_to_cart = ((abs(self.max_lon - self.min_lon) > 90)
or (max(abs(self.min_lat), abs(self.max_lat)) > 60))

self.scans = self.h5f["All_Data"]["NumberOfScans"][0]
self.geostuff = self.h5f["All_Data"]['VIIRS-%s-GEO_All' % self.ch_type]

self.c_align = da.from_array(self.geostuff["AlignmentCoefficient"])[
np.newaxis, np.newaxis, :, np.newaxis]
self.c_exp = da.from_array(self.geostuff["ExpansionCoefficient"])[
np.newaxis, np.newaxis, :, np.newaxis]

for key in self.h5f["All_Data"].keys():
if key.startswith("VIIRS") and key.endswith("SDR_All"):
channel = key.split('-')[1]
Expand All @@ -112,22 +114,31 @@ def __init__(self, filename, filename_info, filetype_info):
# FIXME: this supposes there is only one tiepoint zone in the
# track direction
self.scan_size = self.h5f["All_Data/VIIRS-%s-SDR_All" %
channel].attrs["TiePointZoneSizeTrack"]
self.scan_size = np.asscalar(self.scan_size)
channel].attrs["TiePointZoneSizeTrack"].item()
self.track_offset = self.h5f["All_Data/VIIRS-%s-SDR_All" %
channel].attrs["PixelOffsetTrack"]
self.scan_offset = self.h5f["All_Data/VIIRS-%s-SDR_All" %
channel].attrs["PixelOffsetScan"]

try:
self.group_locations = self.geostuff[
"TiePointZoneGroupLocationScanCompact"].value
"TiePointZoneGroupLocationScanCompact"][()]
except KeyError:
self.group_locations = [0]

self.tpz_sizes = self.h5f["All_Data/VIIRS-%s-SDR_All" %
channel].attrs["TiePointZoneSizeScan"]
self.nb_tpzs = self.geostuff["NumberOfTiePointZonesScan"].value
self.tpz_sizes = da.from_array(self.h5f["All_Data/VIIRS-%s-SDR_All" % channel].attrs["TiePointZoneSizeScan"],
chunks=1)
if len(self.tpz_sizes.shape) == 2:
if self.tpz_sizes.shape[1] != 1:
raise NotImplementedError("Can't handle 2 dimensional tiepoint zones.")
self.tpz_sizes = self.tpz_sizes.squeeze(1)
self.nb_tpzs = self.geostuff["NumberOfTiePointZonesScan"]
self.c_align = da.from_array(self.geostuff["AlignmentCoefficient"],
chunks=tuple(self.nb_tpzs))
self.c_exp = da.from_array(self.geostuff["ExpansionCoefficient"],
chunks=tuple(self.nb_tpzs))
self.nb_tpzs = da.from_array(self.nb_tpzs, chunks=1)
self._expansion_coefs = None

self.cache = {}

Expand Down Expand Up @@ -285,91 +296,117 @@ def read_dataset(self, dataset_key, info):
rads.attrs['units'] = unit
return rads

def navigate(self):
"""Generate lon and lat datasets."""
all_lon = da.from_array(self.geostuff["Longitude"])
all_lat = da.from_array(self.geostuff["Latitude"])
def expand(self, data, coefs):
"""Perform the expansion in numpy domain."""
data = data.reshape(data.shape[:-1])

res = []
param_start = 0
for tpz_size, nb_tpz, start in zip(self.tpz_sizes, self.nb_tpzs,
self.group_locations):
coefs = coefs.reshape(self.scans, self.scan_size, data.shape[1] - 1, -1, 4)

lon = all_lon[:, start:start + nb_tpz + 1]
lat = all_lat[:, start:start + nb_tpz + 1]
coef_a = coefs[:, :, :, :, 0]
coef_b = coefs[:, :, :, :, 1]
coef_c = coefs[:, :, :, :, 2]
coef_d = coefs[:, :, :, :, 3]

c_align = self.c_align[:, :, param_start:param_start + nb_tpz, :]
c_exp = self.c_exp[:, :, param_start:param_start + nb_tpz, :]
data_a = data[:self.scans * 2:2, np.newaxis, :-1, np.newaxis]
data_b = data[:self.scans * 2:2, np.newaxis, 1:, np.newaxis]
data_c = data[1:self.scans * 2:2, np.newaxis, 1:, np.newaxis]
data_d = data[1:self.scans * 2:2, np.newaxis, :-1, np.newaxis]

param_start += nb_tpz
fdata = (coef_a * data_a + coef_b * data_b + coef_d * data_d + coef_c * data_c)

expanded = []
return fdata.reshape(self.scans * self.scan_size, -1)

if self.switch_to_cart:
arrays = lonlat2xyz(lon, lat)
else:
arrays = (lon, lat)
def expand_angle_and_nav(self, arrays):
"""Expand angle and navigation datasets."""
res = []
for array in arrays:
res.append(da.map_blocks(self.expand, array[:, :, np.newaxis], self.expansion_coefs,
dtype=array.dtype, drop_axis=2, chunks=self.expansion_coefs.chunks[:-1]))
return res

def get_coefs(self, c_align, c_exp, tpz_size, nb_tpz, v_track):
"""Compute the coeffs in numpy domain."""
nties = nb_tpz.item()
tpz_size = tpz_size.item()
v_scan = (np.arange(nties * tpz_size) % tpz_size + self.scan_offset) / tpz_size
s_scan, s_track = np.meshgrid(v_scan, v_track)
s_track = s_track.reshape(self.scans, self.scan_size, nties, tpz_size)
s_scan = s_scan.reshape(self.scans, self.scan_size, nties, tpz_size)

c_align = c_align[np.newaxis, np.newaxis, :, np.newaxis]
c_exp = c_exp[np.newaxis, np.newaxis, :, np.newaxis]

a_scan = s_scan + s_scan * (1 - s_scan) * c_exp + s_track * (
1 - s_track) * c_align
a_track = s_track
coef_a = (1 - a_track) * (1 - a_scan)
coef_b = (1 - a_track) * a_scan
coef_d = a_track * (1 - a_scan)
coef_c = a_track * a_scan
res = np.stack([coef_a, coef_b, coef_c, coef_d], axis=4).reshape(self.scans * self.scan_size, -1, 4)
return res

for data in arrays:
expanded.append(expand_array(
data, self.scans, c_align, c_exp, self.scan_size,
tpz_size, nb_tpz, self.track_offset, self.scan_offset))
@property
def expansion_coefs(self):
"""Compute the expansion coefficients."""
if self._expansion_coefs is not None:
return self._expansion_coefs
v_track = (np.arange(self.scans * self.scan_size) % self.scan_size + self.track_offset) / self.scan_size
self._expansion_coefs = da.map_blocks(self.get_coefs, self.c_align, self.c_exp, self.tpz_sizes, self.nb_tpzs,
dtype=np.float64, v_track=v_track, new_axis=[0, 2],
chunks=(self.scans * self.scan_size,
tuple(self.tpz_sizes * self.nb_tpzs), 4))

return self._expansion_coefs

if self.switch_to_cart:
res.append(xyz2lonlat(*expanded))
else:
res.append(expanded)
lons, lats = zip(*res)
return da.hstack(lons), da.hstack(lats)
def navigate(self):
"""Generate the navigation datasets."""
shape = self.geostuff['Longitude'].shape
hchunks = (self.nb_tpzs + 1).compute()
chunks = (shape[0], tuple(hchunks))
lon = da.from_array(self.geostuff["Longitude"], chunks=chunks)
lat = da.from_array(self.geostuff["Latitude"], chunks=chunks)
if self.switch_to_cart:
arrays = lonlat2xyz(lon, lat)
else:
arrays = (lon, lat)

expanded = self.expand_angle_and_nav(arrays)
if self.switch_to_cart:
return xyz2lonlat(*expanded)

return expanded

def angles(self, azi_name, zen_name):
"""Compute the angle datasets."""
all_lat = da.from_array(self.geostuff["Latitude"])
all_lon = da.from_array(self.geostuff["Longitude"])
all_zen = da.from_array(self.geostuff[zen_name])
all_azi = da.from_array(self.geostuff[azi_name])
"""Generate the angle datasets."""
shape = self.geostuff['Longitude'].shape
hchunks = (self.nb_tpzs + 1).compute()
chunks = (shape[0], tuple(hchunks))

res = []
param_start = 0
for tpz_size, nb_tpz, start in zip(self.tpz_sizes, self.nb_tpzs,
self.group_locations):
lat = all_lat[:, start:start + nb_tpz + 1]
lon = all_lon[:, start:start + nb_tpz + 1]
zen = all_zen[:, start:start + nb_tpz + 1]
azi = all_azi[:, start:start + nb_tpz + 1]

c_align = self.c_align[:, :, param_start:param_start + nb_tpz, :]
c_exp = self.c_exp[:, :, param_start:param_start + nb_tpz, :]

param_start += nb_tpz

if (np.max(azi) - np.min(azi) > 5) or (np.min(zen) < 10) or (
np.max(abs(lat)) > 80):
expanded = []
cart = convert_from_angles(azi, zen, lon, lat)
for data in cart:
expanded.append(expand_array(
data, self.scans, c_align, c_exp, self.scan_size,
tpz_size, nb_tpz, self.track_offset, self.scan_offset))

azi, zen = convert_to_angles(*expanded, lon=self.lons, lat=self.lats)
res.append((azi, zen))
else:
expanded = []
for data in (azi, zen):
expanded.append(expand_array(
data, self.scans, c_align, c_exp, self.scan_size,
tpz_size, nb_tpz, self.track_offset, self.scan_offset))
res.append(expanded)
azi = self.geostuff[azi_name]
zen = self.geostuff[zen_name]

switch_to_cart = ((np.max(azi) - np.min(azi) > 5)
or (np.min(zen) < 10)
or (max(abs(self.min_lat), abs(self.max_lat)) > 80))

azi, zen = zip(*res)
return da.hstack(azi), da.hstack(zen)
azi = da.from_array(azi, chunks=chunks)
zen = da.from_array(zen, chunks=chunks)

if switch_to_cart:
arrays = convert_from_angles(azi, zen)
else:
arrays = (azi, zen)

def convert_from_angles(azi, zen, lon, lat):
expanded = self.expand_angle_and_nav(arrays)
if switch_to_cart:
return convert_to_angles(*expanded)

return expanded


def convert_from_angles(azi, zen):
"""Convert the angles to cartesian coordinates."""
lon = np.deg2rad(lon)
lat = np.deg2rad(lat)
x, y, z, = angle2xyz(azi, zen)
# Conversion to ECEF is recommended by the provider, but no significant
# difference has been seen.
Expand All @@ -379,10 +416,8 @@ def convert_from_angles(azi, zen, lon, lat):
return x, y, z


def convert_to_angles(x, y, z, lon, lat):
def convert_to_angles(x, y, z):
"""Convert the cartesian coordinates to angles."""
lon = np.deg2rad(lon)
lat = np.deg2rad(lat)
# Conversion to ECEF is recommended by the provider, but no significant
# difference has been seen.
# x, y, z = (-np.sin(lon) * x - np.sin(lat) * np.cos(lon) * y + np.cos(lat) * np.cos(lon) * z,
Expand All @@ -392,20 +427,20 @@ def convert_to_angles(x, y, z, lon, lat):
return azi, zen


def expand_array(data,
scans,
c_align,
c_exp,
scan_size=16,
tpz_size=16,
nties=200,
track_offset=0.5,
scan_offset=0.5):
def expand_arrays(arrays,
scans,
c_align,
c_exp,
scan_size=16,
tpz_size=16,
nties=200,
track_offset=0.5,
scan_offset=0.5):
"""Expand *data* according to alignment and expansion."""
nties = np.asscalar(nties)
tpz_size = np.asscalar(tpz_size)
s_scan, s_track = da.meshgrid(np.arange(nties * tpz_size),
np.arange(scans * scan_size))
nties = nties.item()
tpz_size = tpz_size.item()
s_scan, s_track = da.meshgrid(da.arange(nties * tpz_size),
da.arange(scans * scan_size))
s_track = (s_track.reshape(scans, scan_size, nties, tpz_size) % scan_size
+ track_offset) / scan_size
s_scan = (s_scan.reshape(scans, scan_size, nties, tpz_size) % tpz_size
Expand All @@ -414,14 +449,17 @@ def expand_array(data,
a_scan = s_scan + s_scan * (1 - s_scan) * c_exp + s_track * (
1 - s_track) * c_align
a_track = s_track

data_a = data[:scans * 2:2, np.newaxis, :-1, np.newaxis]
data_b = data[:scans * 2:2, np.newaxis, 1:, np.newaxis]
data_c = data[1:scans * 2:2, np.newaxis, 1:, np.newaxis]
data_d = data[1:scans * 2:2, np.newaxis, :-1, np.newaxis]

fdata = ((1 - a_track)
* ((1 - a_scan) * data_a + a_scan * data_b)
+ a_track
* ((1 - a_scan) * data_d + a_scan * data_c))
return fdata.reshape(scans * scan_size, nties * tpz_size)
expanded = []
coef_a = (1 - a_track) * (1 - a_scan)
coef_b = (1 - a_track) * a_scan
coef_d = a_track * (1 - a_scan)
coef_c = a_track * a_scan
for data in arrays:
data_a = data[:scans * 2:2, np.newaxis, :-1, np.newaxis]
data_b = data[:scans * 2:2, np.newaxis, 1:, np.newaxis]
data_c = data[1:scans * 2:2, np.newaxis, 1:, np.newaxis]
data_d = data[1:scans * 2:2, np.newaxis, :-1, np.newaxis]
fdata = (coef_a * data_a + coef_b * data_b
+ coef_d * data_d + coef_c * data_c)
expanded.append(fdata.reshape(scans * scan_size, nties * tpz_size))
return expanded
4 changes: 3 additions & 1 deletion satpy/tests/reader_tests/__init__.py
Expand Up @@ -39,7 +39,8 @@
test_avhrr_l1b_gaclac, test_vaisala_gld360,
test_fci_l1c_fdhsi, test_tropomi_l2,
test_hsaf_grib, test_abi_l2_nc, test_eum_base,
test_ami_l1b)
test_ami_l1b, test_viirs_compact)


if sys.version_info < (2, 7):
import unittest2 as unittest
Expand Down Expand Up @@ -96,6 +97,7 @@ def suite():
mysuite.addTests(test_tropomi_l2.suite())
mysuite.addTests(test_hsaf_grib.suite())
mysuite.addTests(test_eum_base.suite())
mysuite.addTests(test_viirs_compact.suite())
mysuite.addTests(test_ami_l1b.suite())

return mysuite