Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix dataset writing so computations are shared between tasks #216

Merged
merged 11 commits into from
Mar 12, 2018
16 changes: 8 additions & 8 deletions satpy/etc/composites/abi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -146,21 +146,21 @@ composites:
overview:
compositor: !!python/name:satpy.composites.GenericCompositor
prerequisites:
- 0.65
- 0.85
- 11.0
standard_name: overview

overview_sun:
compositor: !!python/name:satpy.composites.RGBCompositor
prerequisites:
- wavelength: 0.65
modifiers: [sunz_corrected]
- wavelength: 0.85
modifiers: [sunz_corrected]
- 11.0
standard_name: overview

overview_raw:
compositor: !!python/name:satpy.composites.GenericCompositor
prerequisites:
- 0.65
- 0.85
- 11.0
standard_name: overview

airmass:
compositor: !!python/name:satpy.composites.Airmass
prerequisites:
Expand Down
26 changes: 14 additions & 12 deletions satpy/scene.py
Original file line number Diff line number Diff line change
Expand Up @@ -760,34 +760,36 @@ def load_writer_config(self, config_files, **kwargs):
with open(conf_fn) as fd:
conf = recursive_dict_update(conf, yaml.load(fd))
writer_class = conf['writer']['writer']
init_kwargs, kwargs = writer_class.separate_init_kwargs(kwargs)
writer = writer_class(ppp_config_dir=self.ppp_config_dir,
config_files=config_files,
**kwargs)
return writer
**init_kwargs)
return writer, kwargs

def save_dataset(self, dataset_id, filename=None, writer=None,
overlay=None, **kwargs):
overlay=None, compute=True, **kwargs):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Docstring should mention compute and what the function returns.

"""Save the *dataset_id* to file using *writer* (default: geotiff)."""
if writer is None:
if filename is None:
writer = self.get_writer("geotiff", **kwargs)
writer, save_kwargs = self.get_writer("geotiff", **kwargs)
else:
writer = self.get_writer_by_ext(
writer, save_kwargs = self.get_writer_by_ext(
os.path.splitext(filename)[1], **kwargs)
else:
writer = self.get_writer(writer, **kwargs)
writer.save_dataset(self[dataset_id],
filename=filename,
overlay=overlay, **kwargs)
writer, save_kwargs = self.get_writer(writer, **kwargs)
return writer.save_dataset(self[dataset_id], filename=filename,
overlay=overlay, compute=compute,
**save_kwargs)

def save_datasets(self, writer="geotiff", datasets=None, **kwargs):
def save_datasets(self, writer="geotiff", datasets=None, compute=True,
**kwargs):
"""Save all the datasets present in a scene to disk using *writer*."""
if datasets is not None:
datasets = [self[ds] for ds in datasets]
else:
datasets = self.datasets.values()
writer = self.get_writer(writer, **kwargs)
writer.save_datasets(datasets, **kwargs)
writer, save_kwargs = self.get_writer(writer, **kwargs)
return writer.save_datasets(datasets, compute=compute, **save_kwargs)

def get_writer(self, writer="geotiff", **kwargs):
"""Get the writer instance."""
Expand Down
167 changes: 146 additions & 21 deletions satpy/writers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@

import numpy as np
import yaml
import dask
import dask.array as da

from satpy.config import (config_search_paths, get_environ_config_dir,
recursive_dict_update)
Expand Down Expand Up @@ -109,7 +111,7 @@ def add_overlay(orig, area, coast_dir, color=(0, 0, 0), width=0.5, resolution=No
else:
resolution = "f"

LOG.debug("Automagically choose resolution " + resolution)
LOG.debug("Automagically choose resolution %s", resolution)

if img.mode.endswith('A'):
img = img.convert('RGBA')
Expand Down Expand Up @@ -286,7 +288,6 @@ class Writer(Plugin):

def __init__(self,
name=None,
fill_value=None,
file_pattern=None,
base_dir=None,
**kwargs):
Expand All @@ -297,17 +298,23 @@ def __init__(self,
# Use options from the config file if they weren't passed as arguments
self.name = self.info.get("name",
None) if name is None else name
self.fill_value = self.info.get(
"fill_value", None) if fill_value is None else fill_value
self.file_pattern = self.info.get(
"file_pattern", None) if file_pattern is None else file_pattern

if self.name is None:
raise ValueError("Writer 'name' not provided")
if self.fill_value:
self.fill_value = float(self.fill_value)

self.create_filename_parser(base_dir)
self.filename_parser = self.create_filename_parser(base_dir)

@classmethod
def separate_init_kwargs(cls, kwargs):
# FUTURE: Don't pass Scene.save_datasets kwargs to init and here
init_kwargs = {}
kwargs = kwargs.copy()
for kw in ['base_dir', 'file_pattern']:
if kw in kwargs:
init_kwargs[kw] = kwargs.pop(kw)
return init_kwargs, kwargs

def create_filename_parser(self, base_dir):
# just in case a writer needs more complex file patterns
Expand All @@ -316,27 +323,103 @@ def create_filename_parser(self, base_dir):
file_pattern = os.path.join(base_dir, self.file_pattern)
else:
file_pattern = self.file_pattern
self.filename_parser = parser.Parser(
file_pattern) if file_pattern else None
return parser.Parser(file_pattern) if file_pattern else None

def get_filename(self, **kwargs):
if self.filename_parser is None:
raise RuntimeError(
"No filename pattern or specific filename provided")
return self.filename_parser.compose(kwargs)

def save_datasets(self, datasets, **kwargs):
def save_datasets(self, datasets, compute=True, **kwargs):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The docstring should be updated to exlain compute, and how that affects the return value.

"""Save all datasets to one or more files.

Subclasses can use this method to save all datasets to one single
file or optimize the writing of individual datasets. By default
this simply calls `save_dataset` for each dataset provided.

Args:
datasets (iterable): Iterable of `xarray.DataArray` objects to
save using this writer.
compute (bool): If `True` (default), compute all of the saves to
disk. If `False` then the return value is either
a `dask.delayed.Delayed` object or two lists to
be passed to a `dask.array.store` call.
See return values below for more details.
**kwargs: Keyword arguments to pass to `save_dataset`. See that
documentation for more details.

Returns:
Value returned depends on `compute` keyword argument. If
`compute` is `True` the value is the result of a either a
`dask.array.store` operation or a `dask.delayed.Delayed` compute,
typically this is `None`. If `compute` is `False` then the
result is either a `dask.delayed.Delayed` object that can be
computed with `delayed.compute()` or a two element tuple of
sources and targets to be passed to `dask.array.store`. If
`targets` is provided then it is the caller's responsibility to
close any objects that have a "close" method.

"""
sources = []
targets = []
for ds in datasets:
self.save_dataset(ds, **kwargs)
res = self.save_dataset(ds, compute=False, **kwargs)
if isinstance(res, tuple):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't see self.save_dataset returning a tuple, even longer down the call chain, am I missing something ?

# source, target to be passed to da.store
sources.append(res[0])
targets.append(res[1])
else:
# delayed object
sources.append(res)

# we have targets, we should save sources to targets
if targets and compute:
res = da.store(sources, targets)
for target in targets:
if hasattr(target, 'close'):
target.close()
return res
elif targets:
return sources, targets

delayed = dask.delayed(sources)
if compute:
return delayed.compute()
return delayed

def save_dataset(self, dataset, filename=None, fill_value=None,
compute=True, **kwargs):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Docstring should mention compute

"""Saves the ``dataset`` to a given ``filename``.

This method must be overloaded by the subclass.

Args:
dataset (xarray.DataArray): Dataset to save using this writer.
filename (str): Optionally specify the filename to save this
dataset to. If not provided then `file_pattern`
which can be provided to the init method will be
used and formatted by dataset attributes.
fill_value (int or float): Replace invalid values in the dataset
with this fill value if applicable to
this writer.
compute (bool): If `True` (default), compute and save the dataset.
If `False` return either a `dask.delayed.Delayed`
object or tuple of (source, target). See the
return values below for more information.
**kwargs: Other keyword arguments for this particular reader.

Returns:
Value returned depends on `compute`. If `compute` is `True` then
the return value is the result of computing a
`dask.delayed.Delayed` object or running `dask.array.store`. If
`compute` is `False` then the returned value is either a
`dask.delayed.Delayed` object that can be computed using
`delayed.compute()` or a tuple of (source, target) that should be
passed to `dask.array.store`. If target is provided the the caller
is responsible for calling `target.close()` if the target has
this method.

def save_dataset(self, dataset, filename=None, fill_value=None, **kwargs):
"""Saves the *dataset* to a given *filename*.
"""
raise NotImplementedError(
"Writer '%s' has not implemented dataset saving" % (self.name, ))
Expand All @@ -346,12 +429,11 @@ class ImageWriter(Writer):

def __init__(self,
name=None,
fill_value=None,
file_pattern=None,
enhancement_config=None,
base_dir=None,
**kwargs):
Writer.__init__(self, name, fill_value, file_pattern, base_dir,
Writer.__init__(self, name, file_pattern, base_dir,
**kwargs)
enhancement_config = self.info.get(
"enhancement_config",
Expand All @@ -360,15 +442,58 @@ def __init__(self,
self.enhancer = Enhancer(ppp_config_dir=self.ppp_config_dir,
enhancement_config_file=enhancement_config)

def save_dataset(self, dataset, filename=None, fill_value=None, overlay=None, decorate=None, **kwargs):
"""Saves the *dataset* to a given *filename*.
@classmethod
def separate_init_kwargs(cls, kwargs):
# FUTURE: Don't pass Scene.save_datasets kwargs to init and here
init_kwargs, kwargs = super(ImageWriter, cls).separate_init_kwargs(
kwargs)
for kw in ['enhancement_config']:
if kw in kwargs:
init_kwargs[kw] = kwargs.pop(kw)
return init_kwargs, kwargs

def save_dataset(self, dataset, filename=None, fill_value=None,
overlay=None, decorate=None, compute=True, **kwargs):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Docstring should mention compute and the return value

"""Saves the ``dataset`` to a given ``filename``.

This method creates an enhanced image using `get_enhanced_image`. The
image is then passed to `save_image`. See both of these functions for
more details on the arguments passed to this method.

"""
fill_value = fill_value if fill_value is not None else self.fill_value
img = get_enhanced_image(
dataset.squeeze(), self.enhancer, fill_value, overlay=overlay, decorate=decorate)
self.save_image(img, filename=filename, **kwargs)
dataset.squeeze(), self.enhancer, fill_value, overlay=overlay,
decorate=decorate)
return self.save_image(img, filename=filename, compute=compute,
**kwargs)

def save_image(self, img, filename=None, compute=True, **kwargs):
"""Save Image object to a given ``filename``.

def save_image(self, img, filename=None, **kwargs):
Args:
img (trollimage.xrimage.XRImage): Image object to save to disk.
filename (str): Optionally specify the filename to save this
dataset to. It may include string formatting
patterns that will be filled in by dataset
attributes.
compute (bool): If `True` (default), compute and save the dataset.
If `False` return either a `dask.delayed.Delayed`
object or tuple of (source, target). See the
return values below for more information.
**kwargs: Other keyword arguments to pass to this writer.

Returns:
Value returned depends on `compute`. If `compute` is `True` then
the return value is the result of computing a
`dask.delayed.Delayed` object or running `dask.array.store`. If
`compute` is `False` then the returned value is either a
`dask.delayed.Delayed` object that can be computed using
`delayed.compute()` or a tuple of (source, target) that should be
passed to `dask.array.store`. If target is provided the the caller
is responsible for calling `target.close()` if the target has
this method.

"""
raise NotImplementedError(
"Writer '%s' has not implemented image saving" % (self.name, ))

Expand Down