Skip to content

Commit

Permalink
Merge pull request #19 from nens/casper-geometryfilesink
Browse files Browse the repository at this point in the history
GeometryFileSink
  • Loading branch information
arjanverkerk committed Dec 18, 2019
2 parents d8e1b36 + e334585 commit bea39de
Show file tree
Hide file tree
Showing 18 changed files with 556 additions and 35 deletions.
14 changes: 13 additions & 1 deletion CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,19 @@ Changelog of dask-geomodeling
2.1.2 (unreleased)
------------------

- Nothing changed yet.
- utils.get_crs now leaves EPSG codes instead of converting them to their Proj4
representation.

- Implemented GeometryFileSink that writes ESRI Shapefile, GeoJSON, GML, and
geopackage.

- Added a .to_file() method to all GeometryBlocks.

- Start using google docstring convention.

- Several minor doc fixes.

- Fix setting of the .crs property in the GeometryFileSource.


2.1.1 (2019-12-06)
Expand Down
4 changes: 2 additions & 2 deletions dask_geomodeling/geometry/aggregate.py
Original file line number Diff line number Diff line change
Expand Up @@ -364,8 +364,8 @@ def process(geom_data, raster_data, process_kwargs):
req_srs = process_kwargs["req_srs"]
agg_srs = process_kwargs["agg_srs"]

agg_geometries = features["geometry"].apply(
utils.shapely_transform, args=(req_srs, agg_srs)
agg_geometries = utils.geoseries_transform(
features["geometry"], req_srs, agg_srs,
)

statistic = process_kwargs["statistic"]
Expand Down
37 changes: 36 additions & 1 deletion dask_geomodeling/geometry/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ class GeometryBlock(Block):
A geometry request contains the following fields:
- mode: ``'intersects'`` or ``'extent'``. default ``'intersects'``.
- mode: one of ``{"intersects", "centroid", "extent"}``
- geometry: limit returned objects to objects that intersect with this
shapely geometry object
- projection: projection to return the geometries in as WKT string
Expand Down Expand Up @@ -50,6 +50,41 @@ def set(self, *args):
# NB cannot use __setitem__ as block instances are immutable
return SetSeriesBlock(self, *args)

def to_file(self, *args, **kwargs):
"""Utility function to export data from this block to a file on disk.
You need to specify the target file path as well as the extent geometry
you want to save.
Args:
url (str): The target file path. The extension determines the format.
For supported formats, consult
GeometryFileSink.supported_extensions.
fields (dict): a mapping that relates column names to output file
field names field names,
``{<output file field name>: <column name>, ...}``.
tile_size (int): Optionally use this for large exports to stay within
memory constraints. The export is split in tiles of given size
(units are determined by the projection). Finally the tiles are
merged.
geometry (shapely Geometry): Limit exported objects to objects whose
centroid intersects with this geometry.
projection (str): The projection as a WKT string or EPSG code.
Sets the projection of the geometry argument, the target
projection of the data, and the tiling projection.
start (datetime): start date as UTC datetime
stop (datetime): stop date as UTC datetime
**request: see GeometryBlock request specification
Relevant settings can be adapted as follows:
>>> from dask import config
>>> config.set({"geomodeling.root": '/my/output/data/path'})
>>> config.set({"temporary_directory": '/my/alternative/tmp/dir'})
"""
from dask_geomodeling.geometry.sinks import to_file

return to_file(self, *args, **kwargs)


class SeriesBlock(Block):
""" A helper block for GeometryBlocks, representing one single field"""
Expand Down
2 changes: 1 addition & 1 deletion dask_geomodeling/geometry/set_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ def process(source_data, other_data=None):
a = source_data["features"]
b = other_data["features"]
if len(a) == 0 or len(b) == 0:
return source_data # do nothing
return source_data # do nothing

a_series = a["geometry"]
b_series = b["geometry"].reindex(a_series.index)
Expand Down
230 changes: 230 additions & 0 deletions dask_geomodeling/geometry/sinks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,230 @@
import glob
import os
import sys
import shutil

import fiona
import geopandas
import tempfile
from dask.config import config
from dask.base import tokenize
from dask_geomodeling import utils

from .base import BaseSingle
from .parallelize import GeometryTiler

__all__ = ["GeometryFileSink", "to_file"]


class GeometryFileSink(BaseSingle):
"""Write geometry data to files in a specified directory
Use GeometryFileSink.merge_files to merge tiles into one large file.
Args:
source: the block the data is coming from
url: the target directory to put the files in
extension: the file extension (defines the format), the options depend
on the platform. See GeometryFileSink.supported_extensions
fields: a mapping that relates column names to output file field names
field names, ``{<output file field name>: <column name>, ...}``.
Relevant settings can be adapted as follows:
>>> from dask import config
>>> config.set({"geomodeling.root": '/my/output/data/path'})
"""

supported_extensions = {
k: v
for k, v in (
("shp", "ESRI Shapefile"),
("gpkg", "GPKG"),
("geojson", "GeoJSON"),
("gml", "GML"),
)
# take only drivers which Fiona reports as writable
if "w" in fiona.supported_drivers.get(v, "")
# skip GPKG and GML on Win32 platform as it yields a segfault
and not (sys.platform == "win32" and k in ("gpkg", "gml"))
}

def __init__(self, source, url, extension="shp", fields=None):
safe_url = utils.safe_file_url(url)
if not isinstance(extension, str):
raise TypeError("'{}' object is not allowed".format(type(extension)))
if len(extension) > 0 and extension[0] == ".":
extension = extension[1:] # chop off the dot
if extension not in self.supported_extensions:
raise ValueError("Format '{}' is unsupported".format(extension))
if fields is None:
fields = {x: x for x in source.columns if x != "geometry"}
elif not isinstance(fields, dict):
raise TypeError("'{}' object is not allowed".format(type(fields)))
else:
missing = set(fields.values()) - source.columns
if missing:
raise ValueError("Columns {} are not available".format(missing))
super().__init__(source, safe_url, extension, fields)

@property
def url(self):
return self.args[1]

@property
def extension(self):
return self.args[2]

@property
def fields(self):
return self.args[3]

@property
def columns(self):
return {"saved"}

def get_sources_and_requests(self, **request):
process_kwargs = {
"url": self.url,
"fields": self.fields,
"extension": self.extension,
"hash": tokenize(request)[:7],
}
return [(self.source, request), (process_kwargs, None)]

@staticmethod
def process(data, process_kwargs):
if "features" not in data or len(data["features"]) == 0:
return data # do nothing for non-feature or empty requests

features = data["features"].copy()
projection = data["projection"]
path = utils.safe_abspath(process_kwargs["url"])
fields = process_kwargs["fields"]
extension = process_kwargs["extension"]
driver = GeometryFileSink.supported_extensions[extension]

# generate the directory if necessary
os.makedirs(path, exist_ok=True)

# the target file path is a deterministic hash of the request
filename = ".".join([process_kwargs["hash"], extension])

# add the index to the columns if necessary
index_name = features.index.name
if index_name in fields.values() and index_name not in features.columns:
features[index_name] = features.index

# copy the dataframe
features = features[["geometry"] + list(fields.values())]

# rename the columns
features.columns = ["geometry"] + list(fields.keys())

# generate the file
features.to_file(os.path.join(path, filename), driver=driver)

result = geopandas.GeoDataFrame(index=features.index)
result["saved"] = True
return {"features": result, "projection": projection}

@staticmethod
def merge_files(path, target, remove_source=False):
"""Merge files (the output of this Block) into one single file.
Optionally removes the source files.
"""
path = utils.safe_abspath(path)
target = utils.safe_abspath(target)

if os.path.exists(target):
raise IOError("Target '{}' already exists".format(target))

ext = os.path.splitext(target)[1]
source_paths = glob.glob(os.path.join(path, '*' + ext))
if len(source_paths) == 0:
raise IOError(
"No source files found with matching extension '{}'".format(ext)
)
elif len(source_paths) == 1:
# shortcut for single file
if remove_source:
shutil.move(source_paths[0], target)
else:
shutil.copy(source_paths[0], target)
return

with utils.fiona_env():
# first detect the driver etc
with fiona.collection(source_paths[0], "r") as source:
kwargs = {
"driver": source.driver,
"crs": source.crs,
"schema": source.schema,
}
if source.encoding:
kwargs["encoding"] = source.encoding

with fiona.collection(target, "w", **kwargs) as out:
for source_path in source_paths:
with fiona.collection(source_path, "r") as source:
out.writerecords(v for k, v in source.items())
if remove_source:
os.remove(source_path)

if remove_source:
try:
os.rmdir(path)
except IOError: # directory not empty: do nothing
pass


def to_file(source, url, fields=None, tile_size=None, **request):
"""Utility function to export data from a GeometryBlock to a file on disk.
You need to specify the target file path as well as the extent geometry
you want to save.
Args:
source (GeometryBlock): the block the data is coming from
url (str): The target file path. The extension determines the format. For
supported formats, consult GeometryFileSink.supported_extensions.
fields (dict): a mapping that relates column names to output file field
names field names, ``{<output file field name>: <column name>, ...}``.
tile_size (int): Optionally use this for large exports to stay within
memory constraints. The export is split in tiles of given size (units
are determined by the projection). Finally the tiles are merged.
geometry (shapely Geometry): Limit exported objects to objects whose
centroid intersects with this geometry.
projection (str): The projection as a WKT string or EPSG code.
Sets the projection of the geometry argument, the target
projection of the data, and the tiling projection.
mode (str): one of ``{"intersects", "centroid"}``, default "centroid"
start (datetime): start date as UTC datetime
stop (datetime): stop date as UTC datetime
**request: see GeometryBlock request specification
Relevant settings can be adapted as follows:
>>> from dask import config
>>> config.set({"geomodeling.root": '/my/output/data/path'})
>>> config.set({"temporary_directory": '/my/alternative/tmp/dir'})
"""
if "mode" not in request:
request["mode"] = "centroid"

path = utils.safe_abspath(url)
extension = os.path.splitext(path)[1]

with tempfile.TemporaryDirectory(
dir=config.get("temporary_directory", None)
) as tmpdir:
sink = GeometryFileSink(source, tmpdir, extension=extension, fields=fields)

# wrap the sink in a GeometryTiler
if tile_size is not None:
sink = GeometryTiler(sink, tile_size, request["projection"])

# export the dataset to the tmpdir (full dataset or multiple tiles)
sink.get_data(**request)

# copy the file / gather the tiles to the target location
GeometryFileSink.merge_files(tmpdir, path)
12 changes: 3 additions & 9 deletions dask_geomodeling/geometry/sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,6 @@
from .base import GeometryBlock

# this import is a copy from geopandas.io.files
try:
from fiona import Env as fiona_env
except ImportError:
from fiona import drivers as fiona_env


__all__ = ["GeometryFileSource"]

Expand Down Expand Up @@ -63,7 +58,7 @@ def path(self):
@property
def columns(self):
# this is exactly how geopandas determines the columns
with fiona_env(), fiona.open(self.path) as reader:
with utils.fiona_env(), fiona.open(self.path) as reader:
properties = reader.meta["schema"]["properties"]
return set(properties.keys()) | {"geometry"}

Expand Down Expand Up @@ -122,9 +117,8 @@ def process(url, request):
f = f[mask]

# convert the data to the requested crs
f["geometry"] = f["geometry"].apply(
utils.shapely_transform,
args=(utils.crs_to_srs(f["geometry"].crs), request["projection"]),
utils.geodataframe_transform(
f, utils.crs_to_srs(f.crs), request["projection"]
)

# compute the bounds of each geometry and filter on min_size
Expand Down
2 changes: 2 additions & 0 deletions dask_geomodeling/ipyleaflet_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ class GeomodelingWMSHandler(IPythonHandler):
See:
https://jupyter-notebook.readthedocs.io/en/stable/extending/handlers.html
"""

def get(self):
block = Block.from_json(self.get_query_argument("layers"))
style = self.get_query_argument("styles")
Expand Down Expand Up @@ -110,6 +111,7 @@ class GeomodelingLayer(WMSLayer):
This plugin extends the Jupyter notebook server with a server that responds
with PNG images for WMS requests generated by ipyleaflet.
"""

format = traitlets.Unicode("image/png").tag(sync=True, o=True)
maxcellsize = traitlets.Float(10.0).tag(sync=True, o=True)
time = traitlets.Unicode("").tag(sync=True, o=True)
Expand Down
4 changes: 1 addition & 3 deletions dask_geomodeling/raster/elemwise.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,9 +267,7 @@ def math_process_func(process_kwargs, *args):
else:
nodata_mask |= _nodata_mask
else:
raise TypeError(
"Cannot apply math function to value {}".format(data)
)
raise TypeError("Cannot apply math function to value {}".format(data))

if dtype == np.dtype("bool"):
no_data_value = None
Expand Down
1 change: 0 additions & 1 deletion dask_geomodeling/raster/sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

from datetime import datetime, timedelta

from dask import config
from dask_geomodeling import utils

from .base import RasterBlock
Expand Down
1 change: 1 addition & 0 deletions dask_geomodeling/tests/test_geometry.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ def test_reproject(self):
bbox3857 = extent.transformed(get_sr("EPSG:3857")).bbox
result = self.source.get_data(geometry=box(*bbox3857), projection="EPSG:3857")
self.assertEqual("EPSG:3857", result["projection"])
self.assertEqual("epsg:3857", result["features"].crs["init"])
self.assertEqual(10, len(result["features"]))

def test_limit(self):
Expand Down

0 comments on commit bea39de

Please sign in to comment.