Skip to content

Commit

Permalink
Merge pull request #12 from ungarj/convert_to_xarray
Browse files Browse the repository at this point in the history
netcdf s3 support
  • Loading branch information
ungarj committed Nov 3, 2021
2 parents e23dde0 + 641a300 commit ec6bb62
Show file tree
Hide file tree
Showing 11 changed files with 269 additions and 88 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ Changelog


----------------------
2021.10.1 - 2021.10.25
2021.10.0 - 2021.10.25
----------------------

* packaging
Expand Down
4 changes: 2 additions & 2 deletions mapchete_xarray/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from mapchete_xarray._xarray import InputTile, METADATA, OutputDataWriter
from mapchete_xarray._xarray import InputTile, METADATA, OutputDataWriter, OutputDataReader

__all__ = ["InputTile", "METADATA", "OutputDataWriter"]
__all__ = ["InputTile", "METADATA", "OutputDataWriter", "OutputDataReader"]
__version__ = "2021.10.0"
78 changes: 54 additions & 24 deletions mapchete_xarray/_xarray.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@
from mapchete.config import validate_values
from mapchete.errors import MapcheteConfigError
from mapchete.formats import base
from mapchete.io import path_exists
from mapchete.io import path_exists, fs_from_path
from mapchete.io.raster import create_mosaic, extract_from_array
from mapchete.tile import BufferedTile
import numpy as np
import os
import s3fs
import tempfile
import xarray as xr
import zarr

Expand All @@ -22,6 +22,20 @@

class OutputDataReader(base.TileDirectoryOutputReader):


def __init__(self, output_params, **kwargs):
"""Initialize."""
super(OutputDataReader, self).__init__(output_params)
self.path = output_params["path"]
self.output_params = output_params
self.nodata = output_params.get("nodata", 0)
self.storage = output_params.get("storage", "netcdf")
if self.storage not in ["netcdf", "zarr"]:
raise ValueError("'storage' must either be 'netcdf' or 'zarr'")
self.file_extension = ".nc" if self.storage == "netcdf" else ".zarr"
self.fs = fs_from_path(self.path)


def tiles_exist(self, process_tile=None, output_tile=None):
"""
Check whether output tiles of a tile (either process or output) exists.
Expand Down Expand Up @@ -68,13 +82,6 @@ class OutputDataWriter(base.TileDirectoryOutputWriter, OutputDataReader):
def __init__(self, output_params, **kwargs):
"""Initialize."""
super(OutputDataWriter, self).__init__(output_params)
self.path = output_params["path"]
self.output_params = output_params
self.nodata = output_params.get("nodata", 0)
self.storage = output_params.get("storage", "netcdf")
if self.storage not in ["netcdf", "zarr"]:
raise ValueError("'storage' must either be 'netcdf' or 'zarr'")
self.file_extension = ".nc" if self.storage == "netcdf" else ".zarr"

def is_valid_with_config(self, config):
"""
Expand Down Expand Up @@ -166,18 +173,35 @@ def write(self, process_tile, data):
logger.debug("output tile data empty, nothing to write")
else:
if self.storage == "netcdf":
out_xarr.to_dataset(name="data").to_netcdf(
out_path, encoding={"data": self._get_encoding()}
)
elif self.storage == "zarr":
out_path = self.get_path(out_tile)
if out_path.startswith("s3://"):
# Initilize the S3 file system
s3 = s3fs.S3FileSystem()
out_path = s3fs.S3Map(root=out_path, s3=s3, check=False)
# this below does not work as writing to a file object is only
# supported by the "scipy" engine which does not accept our
# encoding dict
# with self.fs.open(out_path, "wb") as dst:
# dst.write(
# out_xarr.to_dataset(name="data").to_netcdf(
# self.fs.get_mapper(out_path),
# encoding={"data": self._get_encoding()},
# )
# )
with tempfile.TemporaryDirectory() as tmpdir:
tmp_path = os.path.join(tmpdir, "temp.nc")
logger.debug("write to temporary file %s", tmp_path)
out_xarr.to_dataset(name="data").to_netcdf(
tmp_path, encoding={"data": self._get_encoding()}
)
logger.debug("upload %s to %s", tmp_path, out_path)
self.fs.upload(tmp_path, out_path)
else:
out_xarr.to_dataset(name="data").to_netcdf(
out_path, encoding={"data": self._get_encoding()}
)
elif self.storage == "zarr":
out_path = self.fs.get_mapper(out_path)
logger.debug("write output to %s", out_path)
out_xarr.to_dataset(name="data").to_zarr(
out_path,
mode="w",
encoding={"data": self._get_encoding()}
)

Expand All @@ -195,22 +219,26 @@ def read(self, output_tile, **kwargs):
NumPy array
"""
try:
out_path = self.get_path(output_tile)
if self.storage == "netcdf":
if out_path.startswith("s3://"):
with tempfile.TemporaryDirectory() as tmpdir:
tmp_path = os.path.join(tmpdir, "temp.nc")
logger.debug("download to temporary file %s", tmp_path)
self.fs.download(out_path, tmp_path)
with self.fs.open(out_path, "rb") as src:
return xr.open_dataset(tmp_path)["data"]
return xr.open_dataset(self.get_path(output_tile))["data"]
elif self.storage == "zarr":
out_path = self.get_path(output_tile)
if out_path.startswith("s3://"):
# Initilize the S3 file system
s3 = s3fs.S3FileSystem()
out_path = s3fs.S3Map(root=out_path, s3=s3, check=False)
out_path = self.fs.get_mapper(out_path)
return xr.open_zarr(
out_path,
chunks=None
)["data"]
except (FileNotFoundError, ValueError):
return self.empty(output_tile)

def open(self, tile, process, **kwargs):
def open(self, tile, process, **kwargs): # pragma: no cover
"""
Open process output as input for other process.
Expand Down Expand Up @@ -278,6 +306,8 @@ def _read_as_tiledir(
-------
data : list for vector files or numpy array for raster files
"""
if not tiles_paths:
return self.empty(out_tile)
source_tile = tiles_paths[0][0]
if source_tile.tp.grid != out_tile.tp.grid:
raise MapcheteConfigError(
Expand Down Expand Up @@ -306,7 +336,7 @@ def _get_encoding(self):
)


class InputTile(base.InputTile):
class InputTile(base.InputTile): # pragma: no cover
"""
Target Tile representation of input data.
Expand Down
Empty file.
46 changes: 46 additions & 0 deletions mapchete_xarray/processes/convert_to_xarray.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import json
import xarray as xr


def execute(
mp,
band_names=None,
index_band=None,
index_field="index",
slice_id_field="slice_id",
):
"""
Convert raster input array to xarray with optionally named axes.
"""
coords = {}
attrs = {}
with mp.open("raster") as raster:
if raster.is_empty():
return "empty"
data = raster.read()

if "indexes" in mp.input: # pragma: no cover
if index_band is None:
raise ValueError("index_band has to be specified if indexes are provided")
s2_indexes = {
i["properties"][slice_id_field]: i["properties"][index_field]
for i in mp.open("indexes").read()
}
attrs.update(slice_ids=s2_indexes)

if band_names:
if len(band_names) != data.shape[0]: # pragma: no cover
raise ValueError("band_names has to be the same length than input array")
coords.update(bands=band_names)

return xr.DataArray(
# nd array
data,
# named dimension indexes
coords=coords,
# named dimensions
dims=("bands", "x", "y"),
# additional attributes
attrs=dict(json=json.dumps(attrs))
)
4 changes: 1 addition & 3 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
boto3
mapchete>=0.28
mapchete[s3]>=2021.11.0
netCDF4
s3fs
xarray
zarr
3 changes: 3 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ def parse_requirements(file):
'mapchete.formats.drivers': [
'xarray=mapchete_xarray',
],
"mapchete.processes": [
"convert_to_xarray=mapchete_xarray.processes.convert_to_xarray",
],
},
classifiers=[
'Development Status :: 3 - Alpha',
Expand Down
104 changes: 50 additions & 54 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,57 +1,28 @@
import boto3
from collections import namedtuple
from contextlib import contextmanager
import mapchete
from mapchete.io import fs_from_path
import os
import pytest
from tempfile import TemporaryDirectory
import uuid
import yaml

from mapchete.testing import ProcessFixture


SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
TESTDATA_DIR = os.path.join(SCRIPT_DIR, "testdata")
S3_TEMP_DIR = "s3://mapchete-test/tmp/" + uuid.uuid4().hex

ExampleConfig = namedtuple("ExampleConfig", ("path", "dict"))


# helper functions
@contextmanager
def _tempdir_mapchete(path, update={}):
with TemporaryDirectory() as temp_dir:
abs_dir, filename = os.path.split(path)

# load config to dictionary
config = yaml.safe_load(open(path).read())

# add config directory, point output to temp_dir and make path to process
# file absolute
config.update(config_dir=os.path.dirname(path))
config["output"].update(path=temp_dir)
if config["process"].endswith(".py"):
config["process"] = os.path.join(abs_dir, config["process"])

# if required apply custom changes to configuration
config.update(**update)

# dump temporary mapchete file to temporary direcotry
temp_mapchete_file = os.path.join(temp_dir, filename)
with open(temp_mapchete_file, "w") as mapchete_file:
yaml.dump(config, mapchete_file, default_flow_style=False)
yield ExampleConfig(path=temp_mapchete_file, dict=config)


# temporary directory for I/O tests
@pytest.fixture
def mp_s3_tmpdir():
"""Setup and teardown temporary directory."""

fs = fs_from_path(S3_TEMP_DIR)
def _cleanup():
for obj in boto3.resource('s3').Bucket(S3_TEMP_DIR.split("/")[2]).objects.filter(
Prefix="/".join(S3_TEMP_DIR.split("/")[-2:])
):
obj.delete()
try:
fs.rm(S3_TEMP_DIR, recursive=True)
except FileNotFoundError:
pass

_cleanup()
yield S3_TEMP_DIR
Expand All @@ -60,36 +31,61 @@ def _cleanup():

@pytest.fixture(scope="session")
def written_output():
with _tempdir_mapchete(os.path.join(TESTDATA_DIR, "example.mapchete")) as config:
with mapchete.open(config.path) as mp:
data_tile = next(mp.get_process_tiles(5))
mp.batch_process(tile=data_tile.id)
yield config
with TemporaryDirectory() as tempdir:
with ProcessFixture(
os.path.join(TESTDATA_DIR, "example.mapchete"),
output_tempdir=tempdir
) as example:
data_tile = next(example.mp().get_process_tiles(5))
example.mp().batch_process(tile=data_tile.id)
yield example


@pytest.fixture
def example_config():
with _tempdir_mapchete(os.path.join(TESTDATA_DIR, "example.mapchete")) as config:
yield config
with TemporaryDirectory() as tempdir:
with ProcessFixture(
os.path.join(TESTDATA_DIR, "example.mapchete"),
output_tempdir=tempdir
) as example:
yield example


@pytest.fixture
def zarr_config():
with _tempdir_mapchete(os.path.join(TESTDATA_DIR, "zarr_example.mapchete")) as config:
yield config
with TemporaryDirectory() as tempdir:
with ProcessFixture(
os.path.join(TESTDATA_DIR, "zarr_example.mapchete"),
output_tempdir=tempdir
) as example:
yield example


@pytest.fixture
def xarray_tiledir_input_mapchete():
with _tempdir_mapchete(
os.path.join(TESTDATA_DIR, "xarray_tiledir_input.mapchete")
) as config:
yield config
with TemporaryDirectory() as tempdir:
with ProcessFixture(
os.path.join(TESTDATA_DIR, "xarray_tiledir_input.mapchete"),
output_tempdir=tempdir
) as example:
yield example


@pytest.fixture
def xarray_mapchete_input_mapchete():
with _tempdir_mapchete(
os.path.join(TESTDATA_DIR, "xarray_mapchete_input.mapchete")
) as config:
yield config
with TemporaryDirectory() as tempdir:
with ProcessFixture(
os.path.join(TESTDATA_DIR, "xarray_mapchete_input.mapchete"),
output_tempdir=tempdir
) as example:
yield example


@pytest.fixture
def convert_to_xarray_mapchete():
with TemporaryDirectory() as tempdir:
with ProcessFixture(
os.path.join(TESTDATA_DIR, "convert_to_xarray.mapchete"),
output_tempdir=tempdir
) as example:
yield example

0 comments on commit ec6bb62

Please sign in to comment.