In [1]:
import datetime
import geopandas as gpd
import pandas as pd
import xarray as xr
import numpy as np
import xvec
import dask
from shapely.geometry import Polygon

In [3]:
max_line_length = 88
file_path = './accessor.py'

with open(file_path, 'r') as file:
    for line_number, line in enumerate(file, start=1):
        if len(line) > max_line_length:
            print(f"Line {line_number}: {line.strip()}")

Line 269: * geom     (geom) object POINT (111319.49079327357 222684.20850554405) POIN...
Line 302: * geom     (geom) object POINT (111319.49079327357 222684.20850554405) POIN...
Line 533: bounding box of a geometry in an :class:`~xvec.GeometryIndex`. If a predicate is
Line 534: provided, the tree geometries are first queried based on the bounding box of the
Line 539: Bounding boxes are limited to two dimensions and are axis-aligned (equivalent to
Line 540: the bounds property of a geometry); any Z values present in input geometries are
Line 931: The CRS of the raster and that of points need to be in wgs84. Xvec does not verify
Line 941: Affine transformer representing the geometric transformation applied to the data.
Line 985: "Check your Python installation for any issues as it is an integral part of Python's core functionality."
Line 995: mask = rasterio.features.geometry_mask(geometry_array, out_shape=(xar_chunk.shape[0], xar_chunk.shape[1]), transform= trans)
Line 1054: The CRS of 

### Create Sample Dask Dataset & set of geometries 

In [10]:

# Create a dataset with 2 variables and 3 time steps 
np.random.seed(0)

temperature = 15 + 8 * np.random.randn(20, 20, 3)
precipitation = 15 + 10 * np.random.randn(20, 20,3)
lat = np.linspace(30,40,20)
lon = np.linspace(10,20,20)



time = pd.date_range("2014-09-06", periods=3)
reference_time = pd.Timestamp("2014-09-05")


ds = xr.Dataset(
    data_vars=dict(
        temperature=(["x", "y", "time"], temperature),
        precipitation=(["x", "y", "time"], precipitation),
    ),
    coords=dict(
        x=lon,
        y=lat,
        time=time,
        reference_time=reference_time,
    ),
    attrs=dict(description="Weather related data."),
)
ds

In [5]:
# Create geometries over the dataset

from shapely.geometry import Polygon
num_polygons = 2  # Adjust the number of polygons as needed
polygons = []

for _ in range(num_polygons):
    # Generate random polygon coordinates within the bounding box of the downsampled dataset
    lon = np.random.uniform(ds.x.min(), ds.x.max(), 4)
    lat = np.random.uniform(ds.y.min(), ds.y.max(), 4)
    polygons.append(Polygon(zip(lon, lat)))


geoseries = gpd.GeoSeries(polygons)
gdf = gpd.GeoDataFrame(geometry=geoseries)

gdf = gdf.set_geometry('geometry')
gdf.crs = 'EPSG:4326'

polys = gdf.geometry.values

In [15]:
polys

<GeometryArray>
[<POLYGON ((12.485 35.704, 19.979 36.109, 16.94 39.689, 14.772 31.617, 12.485...>, <POLYGON ((12.724 33.465, 17.096 31.808, 12.399 30.476, 18.713 35.956, 12.72...>]
Length: 2, dtype: geometry

### Extract values from a dataset indexed by a set of geometries

In [12]:
# In case the input dataset is small and does not need dask
extracted = ds.xvec.zonal_stats(polys, stat="mean", dask = False, n_jobs = -1)
extracted

100%|█████████████████████████████████████████████| 1/1 [00:00<00:00,  1.30it/s]
100%|█████████████████████████████████████████████| 1/1 [00:00<00:00,  1.38it/s]


In [13]:
ds1 = ds.chunk(dict(x=4,y=4))

extracted = ds1.xvec.zonal_stats(polys, stat="mean", dask = True, n_jobs = -1)
extracted

100%|█████████████████████████████████████████████| 1/1 [00:01<00:00,  1.27s/it]
100%|█████████████████████████████████████████████| 1/1 [00:00<00:00,  1.37it/s]


In [18]:
max_line_length = 88
file_path = './xvec/xvec/accessor.py'

with open(file_path, 'r') as file:
    for line_number, line in enumerate(file, start=1):
        if len(line) > max_line_length:
            print(f"Line {line_number}: {line.strip()}")

FileNotFoundError: [Errno 2] No such file or directory: './xvec/xvec/accessor.py'

### Testing

In [17]:
import rioxarray

In [18]:
# import xarray as xr
# import numpy as np

# # Assuming you already have an xarray dataset with geospatial information,
# # replace the following line with your dataset
# your_xarray_dataset = ...

# # Extract coordinate information
# x_coord = ds['x']
# y_coord = ds['y']

# # Calculate pixel size
# x_resolution = np.abs(x_coord.diff('x').mean().item())
# y_resolution = np.abs(y_coord.diff('y').mean().item())

# # Get the upper-left corner coordinates
# x_origin = x_coord.min().item() - x_resolution / 2.0
# y_origin = y_coord.max().item() + y_resolution / 2.0

# # Create the affine transformation matrix
# # Create the affine transformation matrix
# affine_transform = [x_resolution, 0.0, x_origin, 0.0, -y_resolution, y_origin]

# # Reshape the list into a 2D array
# affine_matrix = np.array(affine_transform).reshape(2, 3)

# # Apply the transformation to the dataset
# #transformed_dataset = your_xarray_dataset.rio.write_crs("EPSG:4326", inplace=False).rio.set_transform(transform_matrix)

# # Now you can use transformed_dataset for further analysis
# affine_matrix

In [19]:
transform = ds.rio.transform()

In [20]:
transform

Affine(0.5263157894736842, 0.0, 9.736842105263158,
       0.0, 0.5263157894736842, 29.736842105263158)

In [21]:
chunk_size = 2
polys

<GeometryArray>
[<POLYGON ((12.485 35.704, 19.979 36.109, 16.94 39.689, 14.772 31.617, 12.485...>, <POLYGON ((12.724 33.465, 17.096 31.808, 12.399 30.476, 18.713 35.956, 12.72...>]
Length: 2, dtype: geometry

In [22]:
geometry_chunks = [polys[i:i + chunk_size] for i in range(0, len(polys), chunk_size)]

In [23]:
geom = polys[0]

In [24]:
geo_series = gpd.GeoSeries(geom)

In [25]:
geometry_array = geo_series.geometry.array

In [26]:
var = 'temperature'

In [27]:
import rasterio
import gc

In [28]:
trans = transform

In [40]:
type(transform)

affine.Affine

In [29]:
xar_chunk = ds[var]
mask = rasterio.features.geometry_mask(geometry_array, out_shape=(xar_chunk.shape[0], xar_chunk.shape[1]), transform= trans)
masked_data = xar_chunk * mask[:, :, np.newaxis]
del mask, xar_chunk; gc.collect()

313

In [30]:
stat = 'mean'

In [31]:
masked_data.mean(axis=(0, 1)).values

array([13.18786542, 13.1236781 , 12.4042418 ])

In [32]:
stat_within_polygons = masked_data.mean(axis=(0, 1))
result = stat_within_polygons.values

In [33]:
import dask.array as da

In [34]:
stat_within_polygons = da.mean(masked_data, axis=(0, 1))
result = stat_within_polygons.compute()
result

array([13.18786542, 13.1236781 , 12.4042418 ])

In [34]:
stat_within_polygons

Unnamed: 0,Array,Chunk
Bytes,24 B,24 B
Shape,"(3,)","(3,)"
Dask graph,1 chunks in 3 graph layers,1 chunks in 3 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 24 B 24 B Shape (3,) (3,) Dask graph 1 chunks in 3 graph layers Data type float64 numpy.ndarray",3  1,

Unnamed: 0,Array,Chunk
Bytes,24 B,24 B
Shape,"(3,)","(3,)"
Dask graph,1 chunks in 3 graph layers,1 chunks in 3 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray


In [36]:
type(geometry_array[0])

shapely.geometry.polygon.Polygon

In [37]:
type(geometry_array)

geopandas.array.GeometryArray

In [38]:
"Check your Python installation or environment for any issues as it is an integral part of Python's core functionality."
It is recommended to set this to small number for a big set of geometries or big datacube. 

<GeometryArray>
[<POLYGON ((12.485 35.704, 19.979 36.109, 16.94 39.689, 14.772 31.617, 12.485...>]
Length: 1, dtype: geometry

In [None]:
    def agg_geom(
        self,
        geom,
        trans,
        var: str,
        stat: str ='mean',
        dask: bool = True):
        
        """Aggregate the values from a dataset over a polygon geometry. 

        The CRS of the raster and that of points need to be in wgs84. Xvec does not verify
        their equality.

        Parameters
        ----------
        geom : Polygon[shapely.Geometry]
            An arrray-like (1-D) of shapely geometry, like a numpy array or GeoPandas
            GeoSeries.

        trans : affine.Affine
            Affine transformer representing the geometric transformation applied to the data.
            
        var : Hashable
            Name of the variable in the dataset to aggregate its values. 
            
        stat : Hashable
            Spatial aggregation statistic method, by default "mean". It supports the 
            following statistcs: ['mean', 'median', 'min', 'max', 'sum']
                
        dask : bool, 
            If the input is dask array or not.
            

        Returns
        -------
        Array 
            Aggregated values over the geometry.  
            
        """  
        try:
            import geopandas as gpd
        except ImportError as err:
            raise ImportError(
                "The geopandas package is required for `xvec.agg_geom()`. "
                "You can install it using 'conda install -c conda-forge geopandas' or "
                "'pip install geopandas'."
            ) from err

        try:
            import rasterio
        except ImportError as err:
            raise ImportError(
                "The rasterio package is required for `xvec.agg_geom()`. "
                "You can install it using 'conda install -c conda-forge rasterio' or "
                "'pip install rasterio'."
            ) from err


        try:
            import gc
        except ImportError as err:
            raise ImportError(
                "The gc package is required for `xvec.agg_geom()`. "
                "Make sure 'gc' is included in the standard library"
                "Check your Python installation or environment for any issues as it is an integral part of Python's core functionality."
            ) from err

        #Create a GeoSeries from the geometry
        geo_series = gpd.GeoSeries(geom)

        # Convert the GeoSeries to a GeometryArray
        geometry_array = geo_series.geometry.array

        xar_chunk = self._obj[var]
        mask = rasterio.features.geometry_mask(geometry_array, out_shape=(xar_chunk.shape[0], xar_chunk.shape[1]), transform= trans)
        masked_data = xar_chunk * mask[:, :, np.newaxis]
        del mask, xar_chunk; gc.collect()


        if dask:
            try:
                import dask.array as da 
            except ImportError as err:
                raise ImportError(
                    "The dask package is required for This step. "
                    "You can install it using 'conda install -c conda-forge dask' or "
                    "'pip install dask'."
                ) from err


            if stat == 'sum':
                stat_within_polygons = da.sum(masked_data, axis=(0, 1))
            elif stat == 'mean':
                stat_within_polygons = da.mean(masked_data, axis=(0, 1))
            elif stat == 'median':
                stat_within_polygons = da.median(masked_data, axis=(0, 1))
            elif stat == 'max':
                stat_within_polygons = da.max(masked_data, axis=(0, 1))
            elif stat == 'min':
                stat_within_polygons = da.min(masked_data, axis=(0, 1))

            result = stat_within_polygons.compute()

        else:
            if stat == 'sum':
                stat_within_polygons = masked_data.sum(axis=(0, 1))
            elif stat == 'mean':
                stat_within_polygons = masked_data.mean(axis=(0, 1))
            elif stat == 'median':
                stat_within_polygons = masked_data.median(axis=(0, 1))
            elif stat == 'max':
                stat_within_polygons = masked_data.max(axis=(0, 1))
            elif stat == 'min':
                stat_within_polygons = masked_data.min(axis=(0, 1))

            result = stat_within_polygons.values

        del masked_data, stat_within_polygons; gc.collect()

        return result 



    def spatial_agg(
        self,
        geometries: Sequence[shapely.Geometry],,
        stat: str ='mean',
        chunk_size: int = 2,
        dask: bool = True,
        n_jobs: int = -1):
        
        """Aggregate the values from a dataset over a polygon geometry. 

        The CRS of the raster and that of points need to be in wgs84. Xvec does not verify
        their equality.

        Parameters
        ----------
        geometries : Sequence[shapely.Geometry]
            An arrray-like (1-D) of shapely geometries, like a numpy array or GeoPandas
            GeoSeries.

        stat : Hashable
            Spatial aggregation statistic method, by default "mean". It supports the 
            following statistcs: ['mean', 'median', 'min', 'max', 'sum']

        chunk_size : int
            Chunk size in case have a big set of geometries. 
            It is recommended to set this to small number for a big set of geometries or big datacube. 
                
        dask : bool, 
            If the input is dask array or not.

        n_jobs : int, optional
            Number of parallel threads to use.
            

        Returns
        -------
        Dataset
            A subset of the original object with N-1 dimensions indexed by
            the the GeometryIndex.
            
        """  
            
        
        try:
            import geopandas as gpd
            import gc
        except ImportError as err:
            raise ImportError(
                "The geopandas package is required for `xvec.spatial_agg()`. "
                "You can install it using 'conda install -c conda-forge geopandas' or "
                "'pip install geopandas'."
            ) from err

        try:
            import rioxarray # noqa
        except ImportError as err:
            raise ImportError(
                "The rioxarray package is required for `xvec.spatial_agg()`. "
                "You can install it using 'conda install -c conda-forge rioxarray' or "
                "'pip install rioxarray'."
            ) from err


        try:
            from joblib import Parallel, delayed
        except ImportError as err:
            raise ImportError(
                "The joblib package is required for `xvec.spatial_agg()`. "
                "You can install it using 'conda install -c conda-forge joblib' or "
                "'pip install joblib'."
            ) from err



        try:
            from tqdm import tqdm 
        except ImportError as err:
            raise ImportError(
                "The tqdm package is required for `xvec.spatial_agg()`. "
                "You can install it using 'conda install -c conda-forge tqdm' or "
                "'pip install tqdm'."
            ) from err


        try:
            import gc
        except ImportError as err:
            raise ImportError(
                "The gc package is required for `xvec.spatial_agg()`. "
                "Make sure 'gc' is included in the standard library"
                "Check your Python installation or environment for any issues as it is an integral part of Python's core functionality."
            ) from err




        transform = self._obj.rio.transform()
        geometry_chunks = [geometries[i:i + chunk_size] for i in range(0, len(geometries), chunk_size)]

        stats_dic = {}
        for var in self._obj.data_vars:
            stats_dic[var] = []

            computed_results = []
            for chunk in tqdm(geometry_chunks):
                # Create a list of delayed objects for the current chunk
                chunk_results =  Parallel(n_jobs=n_jobs)(
                    delayed(self.agg_geom)(geom, transform, var,  stat=stat, dask= dask) for geom in chunk
                )
                computed_results.extend(chunk_results)
            stats_dic[var] = computed_results

            # Clean the space
            gc.collect()

        # Unpack the results into VectorCube
        df = pd.DataFrame()
        keys_items = {}
        for k in stats_dic.keys():

            s = stats_dic[k]
            columns = []
            for t in range(len(self._obj.time)):
                columns.append(f'{k}{t+1}')
            keys_items[k] = columns    
            # Create a new DataFrame with the current data and columns
            df_k = pd.DataFrame(s, columns=columns)
            # Concatenate the new DataFrame with the existing DataFrame
            df = pd.concat([df, df_k], axis=1)


        df = gpd.GeoDataFrame(df, geometry=geometries)
        times = list(self._obj.time.values)

        data_vars = {}
        for key in keys_items.keys():
            data_vars[key] = (["geometry", "time"], df[keys_items[key]])

        ## Create VectorCube    
        vec_cube = xr.Dataset(data_vars=data_vars, coords=dict(geometry=df.geometry, time=times)
                         ).xvec.set_geom_indexes("geometry", crs=df.crs) 


        return vec_cube

In [None]:
    def zonal_stats(
        self,     
        polygons: Sequence[shapely.Geometry],
        stat: str = 'mean', 
        name: str = "geometry",
        dask: bool = True,
        chunk_size: int = 2,
        n_jobs: int = -1,
    ):
        
        """Extract the values from a dataset indexed by a set of geometries

        The CRS of the raster and that of points need to be in wgs84. Xvec does not verify
        their equality.

        Parameters
        ----------
        polygons : Sequence[shapely.Geometry]
            An arrray-like (1-D) of shapely geometries, like a numpy array or GeoPandas
            GeoSeries.
        stat : Hashable
            Spatial aggregation statistic method, by default "mean". It supports the 
            following statistcs: ['mean', 'median', 'min', 'max', 'sum']
        name : Hashable, optional
            Name of the dimension that will hold the ``polygons``, by default "geometry"
            
        dask : bool, 
            If the input is dask array or not.
            
        chunk_size : int
            Chunk size in case have a big set of geometries. 
            It is recommended to set this to small number for a big set of geometries or big datacube. 
            
        n_jobs : int, optional
            Number of parallel threads to use. 
            For better performance, it is recommended to set this to the number of physical cores in the CPU. 

        Returns
        -------
        Dataset
            A subset of the original object with N-1 dimensions indexed by
            the the GeometryIndex.
            
        """  
        
        vec_cube = self._obj.xvec.spatial_agg(polygons, stat='mean', chunk_size = 2, dask= dask, n_jobs = n_jobs)
        
        return vec_cube

### Pytest

In [10]:
import geopandas as gpd
import numpy as np
import pandas as pd
import pytest
import shapely
import xarray as xr
from geopandas.testing import assert_geodataframe_equal
from pandas.testing import assert_frame_equal

import xvec  # noqa
from xvec import GeometryIndex

In [15]:
from shapely.geometry import Polygon

In [32]:
def test_aggregate_raster_cubes():
    #### This test for spatial aggregation using list of geometries - using sum aggregation ####
    # Create the dataset
    da = xr.DataArray(
        np.zeros((10, 10, 5)),
        coords={
            "x": range(10),
            "y": range(20, 30),
            "time": pd.date_range("2023-01-01", periods=5),
        },
    )
    da = da.to_dataset(name="test")
    
    # Create the polygons
    polygon1 = Polygon([(1, 22), (4, 22), (4, 26), (1, 26)])  
    polygon2 = Polygon([(6, 22), (9, 22), (9, 26), (6, 26)])
    polygons = gpd.GeoSeries([polygon1, polygon2], crs="EPSG:4326")

    
    # Expected results
    expected = xr.DataArray(
        np.zeros((2, 5)),
        coords={
            "geometry": polygons,
            "time": pd.date_range("2023-01-01", periods=5),
        },
    ).xvec.set_geom_indexes("geometry", crs="EPSG:4326")

    expected = expected.to_dataset(name="test")
    expected = expected.set_coords("geometry")
    
    # Actual results
    actual = da.xvec.agg_polys(polygons, stat="sum")
    
    # Testing
    xr.testing.assert_identical(actual, expected)