Skip to content

Commit

Permalink
Merge ac35769 into e6b0a05
Browse files Browse the repository at this point in the history
  • Loading branch information
Kirill888 committed Nov 22, 2018
2 parents e6b0a05 + ac35769 commit c9cd031
Show file tree
Hide file tree
Showing 8 changed files with 780 additions and 171 deletions.
84 changes: 11 additions & 73 deletions datacube/api/core.py
Original file line number Diff line number Diff line change
@@ -1,30 +1,17 @@
from __future__ import absolute_import, division, print_function

import logging
import os
import sys
import uuid
import warnings
from collections import namedtuple, OrderedDict
from itertools import groupby, repeat
from itertools import groupby
from math import ceil

import numpy
import pandas
import xarray

try:
import SharedArray as sa
except ImportError:
pass
from dask import array as da

try:
from pathos.threading import ThreadPool
except ImportError:
pass
from six.moves import zip

from datacube.config import LocalConfig
from datacube.compat import string_types
from datacube.storage.storage import reproject_and_fuse
Expand All @@ -33,8 +20,6 @@
from ..index import index_connect
from ..drivers import new_datasource

THREADING_REQS_AVAILABLE = ('SharedArray' in sys.modules and 'pathos.threading' in sys.modules)

Group = namedtuple('Group', ['key', 'datasets'])


Expand Down Expand Up @@ -143,7 +128,7 @@ def _list_measurements(self):

#: pylint: disable=too-many-arguments, too-many-locals
def load(self, product=None, measurements=None, output_crs=None, resolution=None, resampling=None,
dask_chunks=None, like=None, fuse_func=None, align=None, datasets=None, use_threads=False, **query):
dask_chunks=None, like=None, fuse_func=None, align=None, datasets=None, **query):
"""
Load data as an ``xarray`` object. Each measurement will be a data variable in the :class:`xarray.Dataset`.
Expand Down Expand Up @@ -270,12 +255,6 @@ def load(self, product=None, measurements=None, output_crs=None, resolution=None
Optional. If this is a non-empty list of :class:`datacube.model.Dataset` objects, these will be loaded
instead of performing a database lookup.
:param bool use_threads:
Optional. If this is set to True, IO will be multi-thread.
May not work for all drivers due to locking/GIL.
Default is False.
:param int limit:
Optional. If provided, limit the maximum number of datasets
returned. Useful for testing and debugging.
Expand Down Expand Up @@ -305,8 +284,7 @@ def load(self, product=None, measurements=None, output_crs=None, resolution=None
measurement_dicts,
resampling=resampling,
fuse_func=fuse_func,
dask_chunks=dask_chunks,
use_threads=use_threads)
dask_chunks=dask_chunks)

return apply_aliases(result, datacube_product, measurements)

Expand Down Expand Up @@ -388,7 +366,7 @@ def group_datasets(datasets, group_by):
return sources

@staticmethod
def create_storage(coords, geobox, measurements, data_func=None, use_threads=False):
def create_storage(coords, geobox, measurements, data_func=None):
"""
Create a :class:`xarray.Dataset` and (optionally) fill it with data.
Expand All @@ -409,12 +387,6 @@ def create_storage(coords, geobox, measurements, data_func=None, use_threads=Fal
as an argument. It should return an appropriately shaped numpy array. If not provided, an empty
:class:`xarray.Dataset` is returned.
:param bool use_threads:
Optional. If this is set to True, IO will be multi-thread.
May not work for all drivers due to locking/GIL.
Default is False.
:rtype: :class:`xarray.Dataset`
.. seealso:: :meth:`find_datasets` :meth:`group_datasets`
Expand All @@ -432,16 +404,7 @@ def empty_func(measurement_):
for name, coord in geobox.coordinates.items():
result[name] = (name, coord.values, {'units': coord.units})

def work_measurements(measurement, data_func):
return data_func(measurement)

use_threads = use_threads and THREADING_REQS_AVAILABLE

if use_threads:
pool = ThreadPool(32)
results = pool.map(work_measurements, measurements, repeat(data_func))
else:
results = [data_func(a) for a in measurements]
results = [data_func(a) for a in measurements]

for measurement in measurements:
data = results.pop(0)
Expand All @@ -460,8 +423,7 @@ def product_data(*args, **kwargs):

@staticmethod
def load_data(sources, geobox, measurements, resampling=None,
fuse_func=None, dask_chunks=None, skip_broken_datasets=False,
use_threads=False):
fuse_func=None, dask_chunks=None, skip_broken_datasets=False):
"""
Load data from :meth:`group_datasets` into an :class:`xarray.Dataset`.
Expand Down Expand Up @@ -494,40 +456,16 @@ def load_data(sources, geobox, measurements, resampling=None,
See the documentation on using `xarray with dask <http://xarray.pydata.org/en/stable/dask.html>`_
for more information.
:param bool use_threads:
Optional. If this is set to True, IO will be multi-thread.
May not work for all drivers due to locking/GIL.
Default is False.
:rtype: xarray.Dataset
.. seealso:: :meth:`find_datasets` :meth:`group_datasets`
"""
if use_threads and ('SharedArray' not in sys.modules or 'pathos.threading' not in sys.modules):
use_threads = False

if dask_chunks is None:
def data_func(measurement):
if not use_threads:
data = numpy.full(sources.shape + geobox.shape, measurement.nodata, dtype=measurement.dtype)
for index, datasets in numpy.ndenumerate(sources.values):
_fuse_measurement(data[index], datasets, geobox, measurement, fuse_func=fuse_func,
skip_broken_datasets=skip_broken_datasets)
else:
def work_load_data(array_name, index, datasets):
data = sa.attach(array_name)
_fuse_measurement(data[index], datasets, geobox, measurement, fuse_func=fuse_func,
skip_broken_datasets=skip_broken_datasets)

array_name = '_'.join(['DCCORE', str(uuid.uuid4()), str(os.getpid())])
sa.create(array_name, shape=sources.shape + geobox.shape, dtype=measurement.dtype)
data = sa.attach(array_name)
data[:] = measurement.nodata

pool = ThreadPool(32)
pool.map(work_load_data, repeat(array_name), *zip(*numpy.ndenumerate(sources.values)))
sa.delete(array_name)
data = numpy.full(sources.shape + geobox.shape, measurement.nodata, dtype=measurement.dtype)
for index, datasets in numpy.ndenumerate(sources.values):
_fuse_measurement(data[index], datasets, geobox, measurement, fuse_func=fuse_func,
skip_broken_datasets=skip_broken_datasets)
return data
else:
def data_func(measurement):
Expand All @@ -552,7 +490,7 @@ def with_resampling(m, resampling, default=None):
for m in measurements]

return Datacube.create_storage(OrderedDict((dim, sources.coords[dim]) for dim in sources.dims),
geobox, measurements, data_func, use_threads)
geobox, measurements, data_func)

@staticmethod
def measurement_data(sources, geobox, measurement, fuse_func=None, dask_chunks=None):
Expand Down
65 changes: 65 additions & 0 deletions datacube/utils/geometry/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
""" Geometric shapes and operations on them
"""

from ._base import (
Coordinate,
BoundingBox,
InvalidCRSError,
CRS,
Geometry,
GeoBox,
point,
multipoint,
line,
multiline,
polygon,
multipolygon,
box,
polygon_from_transform,
unary_union,
unary_intersection,
)

from .tools import (
roi_is_empty,
roi_shape,
scaled_down_geobox,
scaled_down_shape,
scaled_down_roi,
scaled_up_roi,
decompose_rws,
affine_from_pts,
get_scale_at_point,
native_pix_transform,
compute_reproject_roi,
)

__all__ = [
"Coordinate",
"BoundingBox",
"InvalidCRSError",
"CRS",
"Geometry",
"GeoBox",
"point",
"multipoint",
"line",
"multiline",
"polygon",
"multipolygon",
"box",
"polygon_from_transform",
"unary_union",
"unary_intersection",
"roi_is_empty",
"roi_shape",
"scaled_down_geobox",
"scaled_down_shape",
"scaled_down_roi",
"scaled_up_roi",
"decompose_rws",
"affine_from_pts",
"get_scale_at_point",
"native_pix_transform",
"compute_reproject_roi",
]
74 changes: 24 additions & 50 deletions datacube/utils/geometry.py → datacube/utils/geometry/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,47 +73,6 @@ class CRS(object):
"""
Wrapper around `osr.SpatialReference` providing a more pythonic interface
>>> crs = CRS('EPSG:3577')
>>> crs.geographic
False
>>> crs.projected
True
>>> crs.dimensions
('y', 'x')
>>> crs = CRS('EPSG:4326')
>>> crs.geographic
True
>>> crs.projected
False
>>> crs.epsg
4326
>>> crs.dimensions
('latitude', 'longitude')
>>> crs = CRS('EPSG:3577')
>>> crs.epsg
3577
>>> crs.dimensions
('y', 'x')
>>> CRS('EPSG:3577') == CRS('EPSG:3577')
True
>>> CRS('EPSG:3577') == CRS('EPSG:4326')
False
>>> # Due to Py 2 and 3 inconsistency in traceback formatting, we need to wrap the exceptions. Yuck.
>>> try:
... CRS('cupcakes')
... except InvalidCRSError as e:
... print(e)
Not a valid CRS: 'cupcakes'
>>> # This one validly parses, but returns "Corrupt data" from gdal when used.
>>> try:
... CRS('PROJCS["unnamed",'
... 'GEOGCS["WGS 84", DATUM["WGS_1984", SPHEROID["WGS 84",6378137,298.257223563, AUTHORITY["EPSG","7030"]],'
... 'AUTHORITY["EPSG","6326"]], PRIMEM["Greenwich",0, AUTHORITY["EPSG","8901"]],'
... 'UNIT["degree",0.0174532925199433, AUTHORITY["EPSG","9122"]], AUTHORITY["EPSG","4326"]]]')
... except InvalidCRSError as e:
... print(e)
Not a valid CRS: 'PROJCS["...
"""

def __init__(self, crs_str):
Expand Down Expand Up @@ -148,16 +107,18 @@ def wkt(self):
@property
def epsg(self):
"""
EPSG Code of the CRS
EPSG Code of the CRS or None
:type: int
:type: int | None
"""
if self.projected:
return int(self._crs.GetAuthorityCode('PROJCS'))
code = self._crs.GetAuthorityCode('PROJCS')
elif self.geographic:
code = self._crs.GetAuthorityCode('GEOGCS')
else:
code = None

if self.geographic:
return int(self._crs.GetAuthorityCode('GEOGCS'))
return None
return None if code is None else int(code)

@property
def proj(self):
Expand Down Expand Up @@ -244,6 +205,9 @@ def __ne__(self, other):
return self._crs.IsSame(other._crs) != 1 # pylint: disable=protected-access


def mk_osr_point_transform(src_crs, dst_crs):
return osr.CoordinateTransformation(src_crs._crs, dst_crs._crs) # pylint: disable=protected-access

###################################################
# Helper methods to build ogr.Geometry from geojson
###################################################
Expand Down Expand Up @@ -447,12 +411,22 @@ def __geo_interface__(self):
}

def segmented(self, resolution):
"""
Possibly add more points to the geometry so that no edge is longer than `resolution`
"""
clone = self._geom.Clone()
clone.Segmentize(resolution)
return _make_geom_from_ogr(clone, self.crs)

def interpolate(self, distance):
return _make_geom_from_ogr(self._geom.Value(distance), self.crs)
"""
Returns a point distance units along the line or None if underlying
geometry doesn't support this operation.
"""
geom = self._geom.Value(distance)
if geom is None:
return None
return _make_geom_from_ogr(geom, self.crs)

def buffer(self, distance, quadsecs=30):
return _make_geom_from_ogr(self._geom.Buffer(distance, quadsecs), self.crs)
Expand All @@ -477,11 +451,11 @@ def to_crs(self, crs, resolution=None, wrapdateline=False):
if resolution is None:
resolution = 1 if self.crs.geographic else 100000

transform = osr.CoordinateTransformation(self.crs._crs, crs._crs) # pylint: disable=protected-access
transform = mk_osr_point_transform(self.crs, crs)
clone = self._geom.Clone()

if wrapdateline and crs.geographic:
rtransform = osr.CoordinateTransformation(crs._crs, self.crs._crs) # pylint: disable=protected-access
rtransform = mk_osr_point_transform(crs, self.crs)
clone = _chop_along_antimeridian(clone, transform, rtransform)

clone.Segmentize(resolution)
Expand Down
Loading

0 comments on commit c9cd031

Please sign in to comment.