Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix dataset writing so computations are shared between tasks #216

Merged
merged 11 commits into from
Mar 12, 2018
16 changes: 8 additions & 8 deletions satpy/etc/composites/abi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -146,21 +146,21 @@ composites:
overview:
compositor: !!python/name:satpy.composites.GenericCompositor
prerequisites:
- 0.65
- 0.85
- 11.0
standard_name: overview

overview_sun:
compositor: !!python/name:satpy.composites.RGBCompositor
prerequisites:
- wavelength: 0.65
modifiers: [sunz_corrected]
- wavelength: 0.85
modifiers: [sunz_corrected]
- 11.0
standard_name: overview

overview_raw:
compositor: !!python/name:satpy.composites.GenericCompositor
prerequisites:
- 0.65
- 0.85
- 11.0
standard_name: overview

airmass:
compositor: !!python/name:satpy.composites.Airmass
prerequisites:
Expand Down
13 changes: 11 additions & 2 deletions satpy/readers/abi_l1b.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,18 +39,21 @@

logger = logging.getLogger(__name__)

PLATFORM_NAMES = {'G16': 'GOES-16'}
PLATFORM_NAMES = {
'G16': 'GOES-16',
'G17': 'GOES-17',
}


class NC_ABI_L1B(BaseFileHandler):

def __init__(self, filename, filename_info, filetype_info):
super(NC_ABI_L1B, self).__init__(filename, filename_info,
filetype_info)
# xarray's default netcdf4 engine
self.nc = xr.open_dataset(filename,
decode_cf=True,
mask_and_scale=True,
engine='h5netcdf',
chunks={'x': CHUNK_SIZE, 'y': CHUNK_SIZE})
self.nc = self.nc.rename({'t': 'time'})
platform_shortname = filename_info['platform_shortname']
Expand Down Expand Up @@ -164,3 +167,9 @@ def start_time(self):
@property
def end_time(self):
return datetime.strptime(self.nc.attrs['time_coverage_end'], '%Y-%m-%dT%H:%M:%S.%fZ')

def __del__(self):
try:
self.nc.close()
except (IOError, OSError):
pass
26 changes: 14 additions & 12 deletions satpy/scene.py
Original file line number Diff line number Diff line change
Expand Up @@ -760,34 +760,36 @@ def load_writer_config(self, config_files, **kwargs):
with open(conf_fn) as fd:
conf = recursive_dict_update(conf, yaml.load(fd))
writer_class = conf['writer']['writer']
init_kwargs, kwargs = writer_class.separate_init_kwargs(kwargs)
writer = writer_class(ppp_config_dir=self.ppp_config_dir,
config_files=config_files,
**kwargs)
return writer
**init_kwargs)
return writer, kwargs

def save_dataset(self, dataset_id, filename=None, writer=None,
overlay=None, **kwargs):
overlay=None, compute=True, **kwargs):
"""Save the *dataset_id* to file using *writer* (default: geotiff)."""
if writer is None:
if filename is None:
writer = self.get_writer("geotiff", **kwargs)
writer, save_kwargs = self.get_writer("geotiff", **kwargs)
else:
writer = self.get_writer_by_ext(
writer, save_kwargs = self.get_writer_by_ext(
os.path.splitext(filename)[1], **kwargs)
else:
writer = self.get_writer(writer, **kwargs)
writer.save_dataset(self[dataset_id],
filename=filename,
overlay=overlay, **kwargs)
writer, save_kwargs = self.get_writer(writer, **kwargs)
return writer.save_dataset(self[dataset_id], filename=filename,
overlay=overlay, compute=compute,
**save_kwargs)

def save_datasets(self, writer="geotiff", datasets=None, **kwargs):
def save_datasets(self, writer="geotiff", datasets=None, compute=True,
**kwargs):
"""Save all the datasets present in a scene to disk using *writer*."""
if datasets is not None:
datasets = [self[ds] for ds in datasets]
else:
datasets = self.datasets.values()
writer = self.get_writer(writer, **kwargs)
writer.save_datasets(datasets, **kwargs)
writer, save_kwargs = self.get_writer(writer, **kwargs)
return writer.save_datasets(datasets, compute=compute, **save_kwargs)

def get_writer(self, writer="geotiff", **kwargs):
"""Get the writer instance."""
Expand Down
30 changes: 30 additions & 0 deletions satpy/tests/writer_tests/test_geotiff.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,40 @@


class TestGeoTIFFWriter(unittest.TestCase):

def _get_test_datasets(self):
import xarray as xr
import dask.array as da
from datetime import datetime
ds1 = xr.DataArray(
da.zeros((100, 200), chunks=50),
dims=('y', 'x'),
attrs={'name': 'test',
'start_time': datetime.utcnow()}
)
return [ds1]

def test_init(self):
from satpy.writers.geotiff import GeoTIFFWriter
w = GeoTIFFWriter()

def test_simple_write(self):
from satpy.writers.geotiff import GeoTIFFWriter
datasets = self._get_test_datasets()
w = GeoTIFFWriter()
w.save_datasets(datasets)

def test_simple_delayed_write(self):
from dask.delayed import Delayed
from satpy.writers.geotiff import GeoTIFFWriter
datasets = self._get_test_datasets()
w = GeoTIFFWriter()
# when we switch to rio_save on XRImage then this will be sources
# and targets
res = w.save_datasets(datasets, compute=False)
self.assertIsInstance(res, Delayed)
res.compute()


def suite():
"""The test suite for this writer's tests.
Expand Down
28 changes: 28 additions & 0 deletions satpy/tests/writer_tests/test_simple_image.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,38 @@


class TestPillowWriter(unittest.TestCase):

def _get_test_datasets(self):
import xarray as xr
import dask.array as da
from datetime import datetime
ds1 = xr.DataArray(
da.zeros((100, 200), chunks=50),
dims=('y', 'x'),
attrs={'name': 'test',
'start_time': datetime.utcnow()}
)
return [ds1]

def test_init(self):
from satpy.writers.simple_image import PillowWriter
w = PillowWriter()

def test_simple_write(self):
from satpy.writers.simple_image import PillowWriter
datasets = self._get_test_datasets()
w = PillowWriter()
w.save_datasets(datasets)

def test_simple_delayed_write(self):
from dask.delayed import Delayed
from satpy.writers.simple_image import PillowWriter
datasets = self._get_test_datasets()
w = PillowWriter()
res = w.save_datasets(datasets, compute=False)
self.assertIsInstance(res, Delayed)
res.compute()


def suite():
"""The test suite for this writer's tests.
Expand Down