Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support multiple readers in group_files and MultiScene.from_files #1269

Merged
merged 21 commits into from Jul 29, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
13daf44
Added test for multiple readers in multiscene
gerritholl Jul 21, 2020
4c3b3ca
Refactor group_files to prepare for multi readers
gerritholl Jul 22, 2020
cf14ac7
Add support for multiple readers in group_files
gerritholl Jul 22, 2020
f65365f
Grouping, fix custom grouping keys
gerritholl Jul 22, 2020
14aacdf
Improve test coverage and docs in group_files
gerritholl Jul 22, 2020
2b59310
Add "must have all" flag to MultiScene.from_files
gerritholl Jul 22, 2020
35b077a
When no reader passed, skip unimportables
gerritholl Jul 22, 2020
f80b0b7
PEP8 fixes
gerritholl Jul 22, 2020
e05e84c
Add example on combining multiple readers.
gerritholl Jul 22, 2020
3353eb6
Added missing continue statement
gerritholl Jul 22, 2020
f455ea7
Clarification on MultiScene construction
gerritholl Jul 23, 2020
11065c3
Fix typo & syntax in multiscene docs
gerritholl Jul 23, 2020
b7610b7
More clarification/improvement in multiscene docs
gerritholl Jul 23, 2020
22c8af9
Merge branch 'master' into multiscene-multireader
gerritholl Jul 28, 2020
1a888dc
multiscene doc and test fixes
gerritholl Jul 28, 2020
23fc539
Removed redundant duplicate word micrometre
gerritholl Jul 28, 2020
15f9cbe
Added real non-breaking space
gerritholl Jul 28, 2020
decb6f4
Merge branch 'master' into multiscene-multireader
gerritholl Jul 29, 2020
48ab6f6
Merge branch 'master' into multiscene-multireader
djhoese Jul 29, 2020
8db9f9d
Merge branch 'master' into multiscene-multireader
gerritholl Jul 29, 2020
ae0eb04
Merge branch 'multiscene-multireader' of github.com:gerritholl/satpy …
gerritholl Jul 29, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
48 changes: 48 additions & 0 deletions doc/source/multiscene.rst
Expand Up @@ -231,3 +231,51 @@ multiple Scenes use:
>>> mscn = MultiScene.from_files(glob('/data/abi/day_1/*C0[12]*.nc'), reader='abi_l1b')
>>> mscn.load(['C01', 'C02'])
>>> mscn.save_datasets(base_dir='/path/for/output')

Combining multiple readers
--------------------------

.. versionadded:: 0.23

The :meth:`~satpy.multiscene.MultiScene.from_files` constructor allows to
automatically combine multiple readers into a single MultiScene. It is no
longer necessary for the user to create the :class:`~satpy.scene.Scene`
objects themselves. For example, you can combine Advanced Baseline
Imager (ABI) and Global Lightning Mapper (GLM) measurements.
Constructing a multi-reader MultiScene requires more parameters than a
single-reader MultiScene, because Satpy can poorly guess how to group
files belonging to different instruments. For an example creating
a video with lightning superimposed on ABI channel 14 (11.2 µm)
using the built-in composite ``C14_flash_extent_density``,
which superimposes flash extent density from GLM (read with the
:class:`~satpy.readers.glm_l2.NCGriddedGLML2` or ``glm_l2`` reader) on ABI
channel 14 data (read with the :class:`~satpy.readers.abi_l1b.NC_ABI_L1B`
or ``abi_l1b`` reader), and therefore needs Scene objects that combine
both readers:

>>> glm_dir = "/path/to/GLMC/"
>>> abi_dir = "/path/to/ABI/"
>>> ms = satpy.MultiScene.from_files(
... glob.glob(glm_dir + "OR_GLM-L2-GLMC-M3_G16_s202010418*.nc") +
... glob.glob(abi_dir + "C*/OR_ABI-L1b-RadC-M6C*_G16_s202010418*_e*_c*.nc"),
... reader=["glm_l2", "abi_l1b"],
... ensure_all_readers=True,
... group_keys=["start_time"],
... time_threshold=30)
>>> ms.load(["C14_flash_extent_density"])
>>> ms = ms.resample(ms.first_scene["C14"].attrs["area"])
>>> ms.save_animation("/path/for/output/{name:s}_{start_time:%Y%m%d_%H%M}.mp4")

In this example, we pass to
:meth:`~satpy.multiscene.MultiScene.from_files` the additional parameters
``ensure_all_readers=True, group_keys=["start_time"], time_threshold=30``
so we only get scenes at times that both ABI and GLM have a file starting
within 30 seconds from each other, and ignore all other differences for
the purposes of grouping the two. For this example, the ABI files occur
every 5 minutes but the GLM files (processed with glmtools) every minute.
Scenes where there is a GLM file without an ABI file starting within at
most ±30 seconds are skipped. The ``group_keys`` and ``time_threshold``
keyword arguments are processed by the :func:`~satpy.readers.group_files`
function. The heavy work of blending the two instruments together is
performed by the :class:`~satpy.composites.BackgroundCompositor` class
through the `"C14_flash_extent_density"` composite.
17 changes: 14 additions & 3 deletions satpy/multiscene.py
Expand Up @@ -174,18 +174,29 @@ def first_scene(self):
return self._scene_gen.first

@classmethod
def from_files(cls, files_to_sort, reader=None, **kwargs):
def from_files(cls, files_to_sort, reader=None,
ensure_all_readers=False, **kwargs):
"""Create multiple Scene objects from multiple files.

Args:
files_to_sort (Collection[str]): files to read
reader (str or Collection[str]): reader or readers to use
ensure_all_readers (bool): If True, limit to scenes where all
readers have at least one file. If False (default), include
all scenes where at least one reader has at least one file.

This uses the :func:`satpy.readers.group_files` function to group
files. See this function for more details on possible keyword
arguments.
files. See this function for more details on additional possible
keyword arguments. In particular, it is strongly recommended to pass
mraspaud marked this conversation as resolved.
Show resolved Hide resolved
`"group_keys"` when using multiple instruments.

.. versionadded:: 0.12

"""
from satpy.readers import group_files
file_groups = group_files(files_to_sort, reader=reader, **kwargs)
if ensure_all_readers:
file_groups = [fg for fg in file_groups if all(fg.values())]
scenes = (Scene(filenames=fg) for fg in file_groups)
return cls(scenes)

Expand Down
165 changes: 130 additions & 35 deletions satpy/readers/__init__.py
Expand Up @@ -20,6 +20,7 @@
import logging
import numbers
import os
import warnings
from datetime import datetime, timedelta

import yaml
Expand Down Expand Up @@ -392,9 +393,9 @@ def group_files(files_to_sort, reader=None, time_threshold=10,

Args:
files_to_sort (iterable): File paths to sort in to group
reader (str): Reader whose file patterns should be used to sort files.
This is currently a required keyword argument, but may be optional
in the future (see inline code comments for details).
reader (str or Collection[str]): Reader or readers whose file patterns
should be used to sort files. If not given, try all readers (slow,
adding a list of readers is strongly recommended).
time_threshold (int): Number of seconds used to consider time elements
in a group as being equal. For example, if the 'start_time' item
is used to group files then any time within `time_threshold`
Expand All @@ -407,7 +408,9 @@ def group_files(files_to_sort, reader=None, time_threshold=10,
the first key in ``group_keys``. Otherwise, there is a good chance
that files will not be grouped properly (datetimes being barely
unequal). Defaults to a reader's ``group_keys`` configuration (set
in YAML), otherwise ``('start_time',)``.
in YAML), otherwise ``('start_time',)``. When passing multiple
readers, passing group_keys is strongly recommended as the
behaviour without doing so is undefined.
ppp_config_dir (str): Root usser configuration directory for Satpy.
This will be deprecated in the future, but is here for consistency
with other Satpy features.
Expand All @@ -420,40 +423,131 @@ def group_files(files_to_sort, reader=None, time_threshold=10,
a `Scene` object.

"""
# FUTURE: Find the best reader for each filename using `find_files_and_readers`
if reader is None:
raise ValueError("'reader' keyword argument is required.")
elif not isinstance(reader, (list, tuple)):

if reader is not None and not isinstance(reader, (list, tuple)):
reader = [reader]

# FUTURE: Handle multiple readers
reader = reader[0]
reader_configs = list(configs_for_reader(reader, ppp_config_dir))[0]
reader_kwargs = reader_kwargs or {}
try:
reader_instance = load_reader(reader_configs, **reader_kwargs)
except (KeyError, IOError, yaml.YAMLError) as err:
LOG.info('Cannot use %s', str(reader_configs))
LOG.debug(str(err))
# if reader and (isinstance(reader, str) or len(reader) == 1):
# # if it is a single reader then give a more usable error
# raise
raise

if group_keys is None:
group_keys = reader_instance.info.get('group_keys', ('start_time',))
file_keys = []
# make a copy because filename_items_for_filetype will modify inplace

reader_files = _assign_files_to_readers(
files_to_sort, reader, ppp_config_dir, reader_kwargs)

if reader is None:
reader = reader_files.keys()

file_keys = _get_file_keys_for_reader_files(
reader_files, group_keys=group_keys)

file_groups = _get_sorted_file_groups(file_keys, time_threshold)

return [{rn: file_groups[group_key].get(rn, []) for rn in reader} for group_key in file_groups]


def _assign_files_to_readers(files_to_sort, reader_names, ppp_config_dir,
reader_kwargs):
"""Assign files to readers.

Given a list of file names (paths), match those to reader instances.

Internal helper for group_files.

Args:
files_to_sort (Collection[str]): Files to assign to readers.
reader_names (Collection[str]): Readers to consider
ppp_config_dir (str):
reader_kwargs (Mapping):

Returns:
Mapping[str, Tuple[reader, Set[str]]]
Mapping where the keys are reader names and the values are tuples of
(reader_configs, filenames).
"""

files_to_sort = set(files_to_sort)
for _, filetype_info in reader_instance.sorted_filetype_items():
for f, file_info in reader_instance.filename_items_for_filetype(files_to_sort, filetype_info):
group_key = tuple(file_info.get(k) for k in group_keys)
file_keys.append((group_key, f))
reader_dict = {}
for reader_configs in configs_for_reader(reader_names, ppp_config_dir):
try:
reader = load_reader(reader_configs, **reader_kwargs)
except yaml.constructor.ConstructorError:
LOG.exception(
f"ConstructorError loading {reader_configs!s}, "
"probably a missing dependency, skipping "
"corresponding reader (if you did not explicitly "
"specify the reader, Satpy tries all; performance "
"will improve if you pass readers explicitly).")
continue
reader_name = reader.info["name"]
files_matching = set(reader.filter_selected_filenames(files_to_sort))
files_to_sort -= files_matching
if files_matching or reader_names is not None:
reader_dict[reader_name] = (reader, files_matching)
if files_to_sort:
raise ValueError("No matching readers found for these files: " +
", ".join(files_to_sort))
return reader_dict


def _get_file_keys_for_reader_files(reader_files, group_keys=None):
"""From a mapping from _assign_files_to_readers, get file keys.

Given a mapping where each key is a reader name and each value is a
tuple of reader instance (typically FileYAMLReader) and a collection
of files, return a mapping with the same keys, but where the values are
lists of tuples of (keys, filename), where keys are extracted from the filenames
according to group_keys and filenames are the names those keys were
extracted from.

Internal helper for group_files.

Returns:
Mapping[str, List[Tuple[Tuple, str]]], as described.
"""

file_keys = {}
for (reader_name, (reader_instance, files_to_sort)) in reader_files.items():
if group_keys is None:
group_keys = reader_instance.info.get('group_keys', ('start_time',))
file_keys[reader_name] = []
# make a copy because filename_items_for_filetype will modify inplace
files_to_sort = set(files_to_sort)
for _, filetype_info in reader_instance.sorted_filetype_items():
for f, file_info in reader_instance.filename_items_for_filetype(files_to_sort, filetype_info):
group_key = tuple(file_info.get(k) for k in group_keys)
if all(g is None for g in group_key):
warnings.warn(
f"Found matching file {f:s} for reader "
"{reader_name:s}, but none of group keys found. "
"Group keys requested: " + ", ".join(group_keys),
UserWarning)
file_keys[reader_name].append((group_key, f))
return file_keys


def _get_sorted_file_groups(all_file_keys, time_threshold):
"""Get sorted file groups.

Get a list of dictionaries, where each list item consists of a dictionary
mapping a tuple of keys to a mapping of reader names to files. The files
listed in each list item are considered to be grouped within the same time.

Args:
all_file_keys, as returned by _get_file_keys_for_reader_files
time_threshold: temporal threshold

Returns:
List[Mapping[Tuple, Mapping[str, List[str]]]], as described

Internal helper for group_files.
"""
# flatten to get an overall sorting; put the name in the middle in the
# interest of sorting
flat_keys = ((v[0], rn, v[1]) for (rn, vL) in all_file_keys.items() for v in vL)
prev_key = None
threshold = timedelta(seconds=time_threshold)
# file_groups is sorted, because dictionaries are sorted by insertion
# order in Python 3.7+
file_groups = {}
for gk, f in sorted(file_keys):
for gk, rn, f in sorted(flat_keys):
# use first element of key as time identifier (if datetime type)
if prev_key is None:
is_new_group = True
Expand All @@ -471,13 +565,14 @@ def group_files(files_to_sort, reader=None, time_threshold=10,
if this_val is not None and prev_val is not None)
# if this is a new group based on the first element
if is_new_group or any(vals_not_equal):
file_groups[gk] = [f]
file_groups[gk] = {rn: [f]}
prev_key = gk
else:
file_groups[prev_key].append(f)
sorted_group_keys = sorted(file_groups)
# passable to Scene as 'filenames'
return [{reader: file_groups[group_key]} for group_key in sorted_group_keys]
if rn not in file_groups[prev_key]:
file_groups[prev_key][rn] = [f]
else:
file_groups[prev_key][rn].append(f)
return file_groups


def read_reader_config(config_files, loader=UnsafeLoader):
Expand Down
45 changes: 41 additions & 4 deletions satpy/tests/test_multiscene.py
Expand Up @@ -124,20 +124,57 @@ def test_properties(self):
def test_from_files(self):
"""Test creating a multiscene from multiple files."""
from satpy import MultiScene
input_files = [
input_files_abi = [
"OR_ABI-L1b-RadC-M3C01_G16_s20171171502203_e20171171504576_c20171171505018.nc",
"OR_ABI-L1b-RadC-M3C01_G16_s20171171507203_e20171171509576_c20171171510018.nc",
"OR_ABI-L1b-RadC-M3C01_G16_s20171171512203_e20171171514576_c20171171515017.nc",
"OR_ABI-L1b-RadC-M3C01_G16_s20171171517203_e20171171519577_c20171171520019.nc",
"OR_ABI-L1b-RadC-M3C01_G16_s20171171522203_e20171171524576_c20171171525020.nc",
"OR_ABI-L1b-RadC-M3C01_G16_s20171171527203_e20171171529576_c20171171530017.nc",
]
input_files_glm = [
"OR_GLM-L2-GLMC-M3_G16_s20171171500000_e20171171501000_c20380190314080.nc",
"OR_GLM-L2-GLMC-M3_G16_s20171171501000_e20171171502000_c20380190314080.nc",
"OR_GLM-L2-GLMC-M3_G16_s20171171502000_e20171171503000_c20380190314080.nc",
"OR_GLM-L2-GLMC-M3_G16_s20171171503000_e20171171504000_c20380190314080.nc",
"OR_GLM-L2-GLMC-M3_G16_s20171171504000_e20171171505000_c20380190314080.nc",
"OR_GLM-L2-GLMC-M3_G16_s20171171505000_e20171171506000_c20380190314080.nc",
"OR_GLM-L2-GLMC-M3_G16_s20171171506000_e20171171507000_c20380190314080.nc",
"OR_GLM-L2-GLMC-M3_G16_s20171171507000_e20171171508000_c20380190314080.nc",
]
with mock.patch('satpy.multiscene.Scene') as scn_mock:
mscn = MultiScene.from_files(input_files, reader='abi_l1b')
self.assertEqual(len(mscn.scenes), 6)
calls = [mock.call(filenames={'abi_l1b': [in_file]}) for in_file in input_files]
mscn = MultiScene.from_files(
input_files_abi,
reader='abi_l1b')
assert len(mscn.scenes) == 6
calls = [mock.call(
filenames={'abi_l1b': [in_file_abi]})
for in_file_abi in input_files_abi]
scn_mock.assert_has_calls(calls)

scn_mock.reset_mock()
mscn = MultiScene.from_files(
input_files_abi + input_files_glm,
reader=('abi_l1b', "glm_l2"),
group_keys=["start_time"],
ensure_all_readers=True,
time_threshold=30)
assert len(mscn.scenes) == 2
calls = [mock.call(
filenames={'abi_l1b': [in_file_abi], 'glm_l2': [in_file_glm]})
for (in_file_abi, in_file_glm) in
zip(input_files_abi[0:2],
[input_files_glm[2]] + [input_files_glm[7]])]
scn_mock.assert_has_calls(calls)
scn_mock.reset_mock()
mscn = MultiScene.from_files(
input_files_abi + input_files_glm,
reader=('abi_l1b', "glm_l2"),
group_keys=["start_time"],
ensure_all_readers=False,
time_threshold=30)
assert len(mscn.scenes) == 12

def test_group(self):
from satpy import Scene, MultiScene, DatasetID

Expand Down