Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor the combine_metadata function and allow numpy arrays to be combined #1309

Merged
merged 9 commits into from
Aug 11, 2020
Merged
334 changes: 189 additions & 145 deletions satpy/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,201 @@
from copy import copy, deepcopy
from datetime import datetime
from enum import IntEnum, Enum
from functools import reduce, partial
from operator import is_, eq

import numpy as np

logger = logging.getLogger(__name__)


class MetadataObject(object):
"""A general metadata object."""

def __init__(self, **attributes):
"""Initialize the class with *attributes*."""
self.attrs = attributes

@property
def id(self):
"""Return the DataID of the object."""
try:
return self.attrs['_satpy_id']
except KeyError:
id_keys = self.attrs.get('_satpy_id_keys', minimal_default_keys_config)
return DataID(id_keys, **self.attrs)


def combine_metadata(*metadata_objects, average_times=True):
"""Combine the metadata of two or more Datasets.

If the values corresponding to any keys are not equal or do not
exist in all provided dictionaries then they are not included in
the returned dictionary. By default any keys with the word 'time'
in them and consisting of datetime objects will be averaged. This
is to handle cases where data were observed at almost the same time
but not exactly. In the interest of time, lazy arrays are compared by
object identity rather than by their contents.

Args:
*metadata_objects: MetadataObject or dict objects to combine
average_times (bool): Average any keys with 'time' in the name

Returns:
dict: the combined metadata

"""
info_dicts = _get_valid_dicts(metadata_objects)

if len(info_dicts) == 1:
return info_dicts[0].copy()

shared_keys = _shared_keys(info_dicts)

return _combine_shared_info(shared_keys, info_dicts, average_times)


def _get_valid_dicts(metadata_objects):
"""Get the valid dictionaries matching the metadata_objects."""
info_dicts = []
for metadata_object in metadata_objects:
if isinstance(metadata_object, dict):
metadata_dict = metadata_object
elif hasattr(metadata_object, "attrs"):
metadata_dict = metadata_object.attrs
else:
continue
info_dicts.append(metadata_dict)
return info_dicts


def _shared_keys(info_dicts):
key_sets = (set(metadata_dict.keys()) for metadata_dict in info_dicts)
return reduce(set.intersection, key_sets)


def _combine_shared_info(shared_keys, info_dicts, average_times):
shared_info = {}
for key in shared_keys:
values = [info[key] for info in info_dicts]
if 'time' in key and isinstance(values[0], datetime) and average_times:
shared_info[key] = average_datetimes(values)
elif _are_values_combinable(values):
shared_info[key] = values[0]
return shared_info


def average_datetimes(datetime_list):
"""Average a series of datetime objects.

.. note::

This function assumes all datetime objects are naive and in the same
time zone (UTC).

Args:
datetime_list (iterable): Datetime objects to average

Returns: Average datetime as a datetime object

"""
total = [datetime.timestamp(dt) for dt in datetime_list]
return datetime.fromtimestamp(sum(total) / len(total))


def _are_values_combinable(values):
"""Check if the *values* can be combined."""
if _contain_arrays(values):
return _all_arrays_equal(values)
elif _contain_collections_of_arrays(values):
# in the real world, the `ancillary_variables` attribute may be
# List[xarray.DataArray], this means our values are now
# List[List[xarray.DataArray]].
# note that this list_of_arrays check is also true for any
# higher-dimensional ndarray, but we only use this check after we have
# checked any_arrays so this false positive should have no impact
return _all_list_of_arrays_equal(values)
return _all_values_equal(values)


def _contain_arrays(values):
return any([_is_array(value) for value in values])


def _is_array(val):
"""Check if val is an array."""
return hasattr(val, "__array__") and not np.isscalar(val)


nan_allclose = partial(np.allclose, equal_nan=True)


def _all_arrays_equal(arrays):
"""Check if the arrays are equal.

If the arrays are lazy, just check if they have the same identity.
"""
if hasattr(arrays[0], 'compute'):
return _all_identical(arrays)
else:
return _pairwise_all(nan_allclose, arrays)


def _pairwise_all(func, values):
for value in values[1:]:
if not func(values[0], value):
return False
return True


def _all_identical(values):
"""Check that the identities of all values are the same."""
return _pairwise_all(is_, values)


def _contain_collections_of_arrays(values):
return any(
[_is_non_empty_collection(value) and
_is_all_arrays(value)
for value in values])


def _is_non_empty_collection(value):
return isinstance(value, Collection) and len(value) > 0


def _is_all_arrays(value):
return all([_is_array(sub_value) for sub_value in value])


def _all_list_of_arrays_equal(array_lists):
"""Check that the lists of arrays are equal."""
for array_list in zip(*array_lists):
if not _all_arrays_equal(array_list):
return False
return True


def _all_values_equal(values):
try:
return _pairwise_all(nan_allclose, values)
except TypeError:
return _pairwise_all(eq, values)


def get_keys_from_config(common_id_keys, config):
"""Gather keys for a new DataID from the ones available in configured dataset."""
id_keys = {}
for key, val in common_id_keys.items():
if key in config:
id_keys[key] = val
elif val is not None and (val.get('required') is True or val.get('default') is not None):
id_keys[key] = val
if not id_keys:
raise ValueError('Metadata does not contain enough information to create a DataID.')
return id_keys


class ValueList(IntEnum):
"""A static value list."""

Expand Down Expand Up @@ -223,151 +412,6 @@ def __hash__(self):
}


class MetadataObject(object):
"""A general metadata object."""

def __init__(self, **attributes):
"""Initialize the class with *attributes*."""
self.attrs = attributes

@property
def id(self):
"""Return the DataID of the object."""
try:
return self.attrs['_satpy_id']
except KeyError:
id_keys = self.attrs.get('_satpy_id_keys', minimal_default_keys_config)
return DataID(id_keys, **self.attrs)


def average_datetimes(dt_list):
"""Average a series of datetime objects.

.. note::

This function assumes all datetime objects are naive and in the same
time zone (UTC).

Args:
dt_list (iterable): Datetime objects to average

Returns: Average datetime as a datetime object

"""
total = [datetime.timestamp(dt) for dt in dt_list]
return datetime.fromtimestamp(sum(total) / len(total))


def combine_metadata(*metadata_objects, **kwargs):
"""Combine the metadata of two or more Datasets.

If the values corresponding to any keys are not equal or do not
exist in all provided dictionaries then they are not included in
the returned dictionary. By default any keys with the word 'time'
in them and consisting of datetime objects will be averaged. This
is to handle cases where data were observed at almost the same time
but not exactly. In the interest of time, arrays are compared by
object identity rather than by their contents.

Args:
*metadata_objects: MetadataObject or dict objects to combine
average_times (bool): Average any keys with 'time' in the name

Returns:
dict: the combined metadata

"""
average_times = kwargs.get('average_times', True) # python 2 compatibility (no kwarg after *args)
shared_keys = None
info_dicts = []
# grab all of the dictionary objects provided and make a set of the shared keys
for metadata_object in metadata_objects:
if isinstance(metadata_object, dict):
metadata_dict = metadata_object
elif hasattr(metadata_object, "attrs"):
metadata_dict = metadata_object.attrs
else:
continue
info_dicts.append(metadata_dict)

if shared_keys is None:
shared_keys = set(metadata_dict.keys())
else:
shared_keys &= set(metadata_dict.keys())

# combine all of the dictionaries
shared_info = {}
for k in shared_keys:
values = [nfo[k] for nfo in info_dicts]
if _share_metadata_key(k, values, average_times):
if 'time' in k and isinstance(values[0], datetime) and average_times:
shared_info[k] = average_datetimes(values)
else:
shared_info[k] = values[0]

return shared_info


def get_keys_from_config(common_id_keys, config):
"""Gather keys for a new DataID from the ones available in configured dataset."""
id_keys = {}
for key, val in common_id_keys.items():
if key in config:
id_keys[key] = val
elif val is not None and (val.get('required') is True or val.get('default') is not None):
id_keys[key] = val
if not id_keys:
raise ValueError('Metadata does not contain enough information to create a DataID.')
return id_keys


def _share_metadata_key(k, values, average_times):
"""Combine metadata. Helper for combine_metadata, decide if key is shared."""
any_arrays = any([_is_array(val) for val in values])
# in the real world, the `ancillary_variables` attribute may be
# List[xarray.DataArray], this means our values are now
# List[List[xarray.DataArray]].
# note that this list_of_arrays check is also true for any
# higher-dimensional ndarray, but we only use this check after we have
# checked any_arrays so this false positive should have no impact
list_of_arrays = any(
[isinstance(val, Collection) and len(val) > 0 and
all([_is_array(subval)
for subval in val])
for val in values])
if any_arrays:
return _share_metadata_key_array(values)
elif list_of_arrays:
return _share_metadata_key_list_arrays(values)
elif 'time' in k and isinstance(values[0], datetime) and average_times:
return True
elif all(val == values[0] for val in values[1:]):
return True
return False


def _is_array(val):
"""Check if val is an array."""
return hasattr(val, "__array__") and not np.isscalar(val)


def _share_metadata_key_array(values):
"""Combine metadata. Helper for combine_metadata, check object identity in list of arrays."""
for val in values[1:]:
if val is not values[0]:
return False
return True


def _share_metadata_key_list_arrays(values):
"""Combine metadata. Helper for combine_metadata, check object identity in list of list of arrays."""
for val in values[1:]:
for arr, ref in zip(val, values[0]):
if arr is not ref:
return False
return True


class DataID(dict):
"""Identifier for all `DataArray` objects.

Expand Down