Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for s3 buckets in OLCI and ABI l1 readers #1439

Merged
merged 23 commits into from Dec 2, 2020
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
73 changes: 72 additions & 1 deletion satpy/readers/__init__.py
Expand Up @@ -21,6 +21,7 @@
import os
import warnings
from datetime import datetime, timedelta
from functools import total_ordering

import yaml

Expand Down Expand Up @@ -519,7 +520,7 @@ def load_readers(filenames=None, reader=None, reader_kwargs=None,


def _get_reader_kwargs(reader, reader_kwargs):
"""Helper for load_readers to form reader_kwargs.
"""Help load_readers to form reader_kwargs.

Helper for load_readers to get reader_kwargs and
reader_kwargs_without_filter in the desirable form.
Expand All @@ -538,3 +539,73 @@ def _get_reader_kwargs(reader, reader_kwargs):
reader_kwargs_without_filter[k].pop('filter_parameters', None)

return (reader_kwargs, reader_kwargs_without_filter)


@total_ordering
class FSFile(os.PathLike):
"""Implementation of a PathLike file object, that can be opened.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm confused by this line. Is it a file object or a path object? If it's a file object it's already opened? I think os.PathLike represents a path, not an open file.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is a path indeed, but can be opened to return a file object.


This is made to be used in conjuction with fsspec or s3fs. For example::

from satpy import Scene

import fsspec
filename = 'noaa-goes16/ABI-L1b-RadC/2019/001/17/*_G16_s20190011702186*'

the_files = fsspec.open_files("simplecache::s3://" + filename, s3={'anon': True})

from satpy.readers import FSFile
fs_files = [FSFile(open_file) for open_file in the_files]

scn = Scene(filenames=fs_files, reader='abi_l1b')
scn.load(['true_color_raw'])

"""

def __init__(self, file, fs=None):
"""Initialise the FSFile instance.

*file* can be string or an fsspec.OpenFile instance. In the latter case, the follow argument *fs* has no effect.
*fs* can be None or a fsspec filesystem instance.
"""
try:
self._file = file.path
self._fs = file.fs
except AttributeError:
self._file = file
self._fs = fs

def __str__(self):
"""Return the string version of the filename."""
return self._file

def __fspath__(self):
"""Comply with PathLike."""
return self._file

def __repr__(self):
"""Representation of the object."""
return '<FSFile "' + str(self._file) + '">'

def open(self):
"""Open the file.

This is read-only.
"""
try:
return self._fs.open(self._file)
except AttributeError:
return open(self._file)

def __lt__(self, other):
"""Implement ordering."""
return os.fspath(self) < os.fspath(other)


def open_file_or_filename(unknown_file_thing):
"""Try to open the *unknown_file_thing*, otherwise return the filename."""
try:
f_obj = unknown_file_thing.open()
except AttributeError:
f_obj = unknown_file_thing
return f_obj
43 changes: 29 additions & 14 deletions satpy/readers/abi_base.py
Expand Up @@ -22,10 +22,11 @@

import numpy as np
import xarray as xr

from pyresample import geometry
from satpy.readers.file_handlers import BaseFileHandler

from satpy import CHUNK_SIZE
from satpy.readers import open_file_or_filename
from satpy.readers.file_handlers import BaseFileHandler

logger = logging.getLogger(__name__)

Expand All @@ -41,14 +42,14 @@ class NC_ABI_BASE(BaseFileHandler):
def __init__(self, filename, filename_info, filetype_info):
"""Open the NetCDF file with xarray and prepare the Dataset for reading."""
super(NC_ABI_BASE, self).__init__(filename, filename_info, filetype_info)
# xarray's default netcdf4 engine
f_obj = open_file_or_filename(self.filename)
try:
self.nc = xr.open_dataset(self.filename,
self.nc = xr.open_dataset(f_obj,
decode_cf=True,
mask_and_scale=False,
chunks={'x': CHUNK_SIZE, 'y': CHUNK_SIZE}, )
except ValueError:
self.nc = xr.open_dataset(self.filename,
self.nc = xr.open_dataset(f_obj,
decode_cf=True,
mask_and_scale=False,
chunks={'lon': CHUNK_SIZE, 'lat': CHUNK_SIZE}, )
Expand Down Expand Up @@ -80,32 +81,47 @@ def __getitem__(self, item):
variables which causes inaccurate unscaled data values. This method
forces the scale factor to a 64-bit float first.
"""
def is_int(val):
return np.issubdtype(val.dtype, np.integer) if hasattr(val, 'dtype') else isinstance(val, int)

data = self.nc[item]
attrs = data.attrs

data = self._adjust_data(data, item)

data.attrs = attrs

data = self._adjust_coords(data, item)

return data

def _adjust_data(self, data, item):
"""Adjust data with typing, scaling and filling."""
factor = data.attrs.get('scale_factor', 1)
offset = data.attrs.get('add_offset', 0)
fill = data.attrs.get('_FillValue')
unsigned = data.attrs.get('_Unsigned', None)

def is_int(val):
return np.issubdtype(val.dtype, np.integer) if hasattr(val, 'dtype') else isinstance(val, int)

# Ref. GOESR PUG-L1B-vol3, section 5.0.2 Unsigned Integer Processing
if unsigned is not None and unsigned.lower() == 'true':
# cast the data from int to uint
data = data.astype('u%s' % data.dtype.itemsize)

if fill is not None:
fill = fill.astype('u%s' % fill.dtype.itemsize)

if fill is not None:
# Some backends (h5netcdf) may return attributes as shape (1,)
# arrays rather than shape () scalars, which according to the netcdf
# documentation at <URL:https://www.unidata.ucar.edu
# /software/netcdf/docs/netcdf_data_set_components.html#attributes>
# is correct.
if np.ndim(fill) > 0:
fill = fill.item()
if is_int(data) and is_int(factor) and is_int(offset):
new_fill = fill
else:
new_fill = np.nan
data = data.where(data != fill, new_fill)

if factor != 1 and item in ('x', 'y'):
# be more precise with x/y coordinates
# see get_area_def for more information
Expand All @@ -117,10 +133,10 @@ def is_int(val):
if not is_int(factor):
factor = float(factor)
data = data * factor + offset
return data

data.attrs = attrs

# handle coordinates (and recursive fun)
def _adjust_coords(self, data, item):
"""Handle coordinates (and recursive fun)."""
new_coords = {}
# 'time' dimension causes issues in other processing
# 'x_image' and 'y_image' are confusing to some users and unnecessary
Expand All @@ -135,7 +151,6 @@ def is_int(val):
self.coords[coord_name] = self[coord_name]
new_coords[coord_name] = self.coords[coord_name]
data.coords.update(new_coords)

return data

def get_dataset(self, key, info):
Expand Down
8 changes: 2 additions & 6 deletions satpy/readers/file_handlers.py
Expand Up @@ -20,9 +20,8 @@
from abc import ABCMeta

import numpy as np
from pathlib import PurePath

from pyresample.geometry import SwathDefinition

from satpy.dataset import combine_metadata


Expand All @@ -31,10 +30,7 @@ class BaseFileHandler(metaclass=ABCMeta):

def __init__(self, filename, filename_info, filetype_info):
"""Initialize file handler."""
if isinstance(filename, PurePath):
self.filename = str(filename)
else:
self.filename = filename
self.filename = filename
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change will break pathlib objects for all other readers right? (assuming the low-level I/O library doesn't support them)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That said, I'm ok with this.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably yes... Is this bad?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think @gerritholl was the first user to point out that Satpy didn't work with pathlib objects so he should maybe make the final decision. It probably isn't great that this breaks it for all other readers, but I'm not sure how many readers need strings for their lower-level I/O libraries either.

self.navigation_reader = None
self.filename_info = filename_info
self.filetype_info = filetype_info
Expand Down
43 changes: 24 additions & 19 deletions satpy/readers/olci_nc.py
Expand Up @@ -40,16 +40,17 @@


import logging
from datetime import datetime
from contextlib import suppress
from functools import lru_cache
from functools import reduce

import dask.array as da
import numpy as np
import xarray as xr

from satpy import CHUNK_SIZE
from satpy.readers import open_file_or_filename
from satpy.readers.file_handlers import BaseFileHandler
from satpy.utils import angle2xyz, xyz2angle
from satpy import CHUNK_SIZE
from functools import reduce

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -100,30 +101,36 @@ def __init__(self, filename, filename_info, filetype_info,
"""Init the olci reader base."""
super(NCOLCIBase, self).__init__(filename, filename_info,
filetype_info)
self.nc = xr.open_dataset(self.filename,
self._engine = engine
self._start_time = filename_info['start_time']
self._end_time = filename_info['end_time']
# TODO: get metadata from the manifest file (xfdumanifest.xml)
self.platform_name = PLATFORM_NAMES[filename_info['mission_id']]
self.sensor = 'olci'
self.open_file = None

@property
@lru_cache(maxsize=2)
def nc(self):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mostly for my curiosity: I've never seen lru_cache used on a property before. I don't know much about OLCI. What is the purpose and effect of a cache of size 2 on this property? Does that mean that if I open 3 OLCI files that the one I opened first is closed again? Or just that there is work redone if .nc is accessed on the first one?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I put size 2 for safety, but in principle only 1 is needed, as this is a property on the instance. So each instance of the file handler will have a different cache for this accessor, and allows repeated calls to the property not to trigger a reread (especially important when this is a remote file of course).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wasn't this cached_property before? What's the difference?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it was, but cached_property was introduced in python 3.8, so I have to pass on that for now!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah ok. What about a try/except ImportError on that? Can lru_cache take a maxsize=1? Does it default to that? Basically can we use cached_property when it exists, but use lru_cache for backwards compatibility for now?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can do that.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm, I thought this would do it, but no... Will investigate more tomorrow

def cached_property(func):
    return property(lru_cache(maxsize=1)(func))

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, got it to work now.

"""Get the nc xr dataset."""
f_obj = open_file_or_filename(self.filename)
dataset = xr.open_dataset(f_obj,
decode_cf=True,
mask_and_scale=True,
engine=engine,
engine=self._engine,
chunks={'columns': CHUNK_SIZE,
'rows': CHUNK_SIZE})

self.nc = self.nc.rename({'columns': 'x', 'rows': 'y'})

# TODO: get metadata from the manifest file (xfdumanifest.xml)
self.platform_name = PLATFORM_NAMES[filename_info['mission_id']]
self.sensor = 'olci'
return dataset.rename({'columns': 'x', 'rows': 'y'})

@property
def start_time(self):
"""Start time property."""
return datetime.strptime(self.nc.attrs['start_time'],
'%Y-%m-%dT%H:%M:%S.%fZ')
return self._start_time

@property
def end_time(self):
"""End time property."""
return datetime.strptime(self.nc.attrs['stop_time'],
'%Y-%m-%dT%H:%M:%S.%fZ')
return self._end_time

def get_dataset(self, key, info):
"""Load a dataset."""
Expand All @@ -134,10 +141,8 @@ def get_dataset(self, key, info):

def __del__(self):
"""Close the NetCDF file that may still be open."""
try:
with suppress(IOError, OSError, AttributeError):
self.nc.close()
except (IOError, OSError, AttributeError):
pass


class NCOLCICal(NCOLCIBase):
Expand Down
33 changes: 33 additions & 0 deletions satpy/tests/reader_tests/test_abi_l1b.py
Expand Up @@ -241,3 +241,36 @@ def test_vis_calibrate(self):
'toa_bidirectional_reflectance')
self.assertEqual(res.attrs['long_name'],
'Bidirectional Reflectance')


class Test_NC_ABI_File(unittest.TestCase):
"""Test file opening."""

@mock.patch('satpy.readers.abi_base.xr')
def test_open_dataset(self, _):
"""Test openning a dataset."""
from satpy.readers.abi_l1b import NC_ABI_L1B

openable_thing = mock.MagicMock()

NC_ABI_L1B(openable_thing, {'platform_shortname': 'g16'}, None)
openable_thing.open.assert_called()


class Test_NC_ABI_L1B_H5netcdf(Test_NC_ABI_L1B):
"""Allow h5netcdf peculiarities."""

def setUp(self):
"""Create fake data for the tests."""
rad_data = np.int16(50)
rad = xr.DataArray(
rad_data,
attrs={
'scale_factor': 0.5,
'add_offset': -1.,
'_FillValue': np.array([1002]),
'units': 'W m-2 um-1 sr-1',
'valid_range': (0, 4095),
}
)
super(Test_NC_ABI_L1B_H5netcdf, self).setUp(rad=rad)
16 changes: 16 additions & 0 deletions satpy/tests/reader_tests/test_olci_nc.py
Expand Up @@ -76,6 +76,22 @@ def test_instantiate(self, mocked_dataset):
mocked_dataset.assert_called()
mocked_dataset.reset_mock()

@mock.patch('xarray.open_dataset')
def test_open_file_objects(self, mocked_open_dataset):
"""Test initialization of file handlers."""
from satpy.readers.olci_nc import NCOLCIBase
filename_info = {'mission_id': 'S3A', 'dataset_name': 'Oa01', 'start_time': 0, 'end_time': 0}

open_file = mock.MagicMock()

file_handler = NCOLCIBase(open_file, filename_info, 'c')
# deepcode ignore W0104: This is a property that is actually a function call.
file_handler.nc # pylint: disable=W0104
mocked_open_dataset.assert_called()
open_file.open.assert_called()
assert (open_file.open.return_value in mocked_open_dataset.call_args[0] or
open_file.open.return_value == mocked_open_dataset.call_args[1].get('filename_or_obj'))

@mock.patch('xarray.open_dataset')
def test_get_dataset(self, mocked_dataset):
"""Test reading datasets."""
Expand Down
13 changes: 12 additions & 1 deletion satpy/tests/test_file_handlers.py
Expand Up @@ -28,7 +28,7 @@ class TestBaseFileHandler(unittest.TestCase):
"""Test the BaseFileHandler."""

def setUp(self):
"""Setup the test."""
"""Set up the test."""
self._old_set = BaseFileHandler.__abstractmethods__
BaseFileHandler._abstractmethods__ = set()
self.fh = BaseFileHandler(
Expand Down Expand Up @@ -140,6 +140,17 @@ def test_combine_orbital_parameters(self):
# Empty
self.fh.combine_info([{}])

def test_file_is_kept_intact(self):
"""Test that the file object passed (string, path, or other) is kept intact."""
open_file = mock.MagicMock()
bfh = BaseFileHandler(open_file, {'filename_info': 'bla'}, 'filetype_info')
assert bfh.filename == open_file

from pathlib import Path
filename = Path('/bla/bla.nc')
bfh = BaseFileHandler(filename, {'filename_info': 'bla'}, 'filetype_info')
assert isinstance(bfh.filename, Path)

def tearDown(self):
"""Tear down the test."""
BaseFileHandler.__abstractmethods__ = self._old_set