In [None]:
import os
os.environ['USE_PYGEOS'] = '0'

# Dask-GeoPandas Demo

**Martin Fleischmann**

<sup>1</sup>Urban and Regional Laboratory, Charles University, CZ<br>
<sup>2</sup>Geographic Data Science Lab, University of Liverpool, UK<br>

15/06/2023, Dask Demo Day

## What is GeoPandas

GeoPandas, as the name suggests, extends the popular data science library [pandas](https://pandas.pydata.org) by adding support for geospatial data.

The core data structure in GeoPandas is the `geopandas.GeoDataFrame`, a subclass of `pandas.DataFrame`, that can store geometry columns and perform spatial operations. The `geopandas.GeoSeries`, a subclass of `pandas.Series`, handles the geometries. Therefore, your `GeoDataFrame` is a combination of `pandas.Series`, with traditional data (numerical, boolean, text etc.), and `geopandas.GeoSeries`, with geometries (points, polygons etc.). You can have as many columns with geometries as you wish; there's no limit typical for desktop GIS software.

![geodataframe schema](fig/dataframe.svg)

Each `GeoSeries` can contain any geometry type (you can even mix them within a single array) and has a `GeoSeries.crs` attribute, which stores information about the projection (CRS stands for Coordinate Reference System). Therefore, each `GeoSeries` in a `GeoDataFrame` can be in a different projection, allowing you to have, for example, multiple versions (different projections) of the same geometry.

Only one `GeoSeries` in a `GeoDataFrame` is considered the _active_ geometry, which means that all geometric operations applied to a `GeoDataFrame` operate on this _active_ column.

But when we talk about GeoPandas, we first have to talk about pandas.

## The pieces we bring together

Any software that has an ambition to properly deal with geospatial data needs to cover four aspects:

- *data*
    - We just learned that this part is covered by `pandas`.
- *geometry*
    - Handling of the actual geometries and operations on them. In FOSS, there is one leading engine, `GEOS`, but given it is written in C++, we need a Python wrapper. That is called `shapely`.
- *projections*
    - We need to know which place on the Earth (or other planet) each coordinates refer to. That is what projections, or Coordinate Reference Systems (CRS), take care of. The main FOSS engine here is `PROJ`, again written in C++ and in Python wrapped by `pyproj`.
- *reading and writing files*
    - Geospatial world knows a lot of file formats to store the data, from Shapefile and GeoJSON to GeoParquet. The best way to read and write into them is to use `GDAL/OGR`, another C++ library, which is avaliable either via `fiona` or recently via `pyogrio`.

![ecosystem diagram](fig/ecosystem.png)

Let's see all of them in action within GeoPandas.

In [None]:
import geopandas

GeoPandas includes some built-in data, we can use them as an illustration.

In [None]:
path = geopandas.datasets.get_path("naturalearth_lowres")
path

In [None]:
world = geopandas.read_file(path)
world.head()

For the sake of simplicity here, we can remove Antarctica and re-project the data to the EPSG 3857, which will not complain about measuring the area (but never use EPSG 3857 to measure the actual area as it is extremely skewed).

In [None]:
world = world.query("continent != 'Antarctica'").to_crs(3857)

GeoPandas GeoDataFrame can carry one or more geometry columns and brings the support of geospatial operations on these columns. Like a creation of a convex hull.

In [None]:
world['convex_hull'] = world.convex_hull

This is equal to the code above as GeoPandas exposes geometry methods of the active geometry column to the GeoDataFrame level:

In [None]:
world['convex_hull'] = world.geometry.convex_hull  

Now you can see that we have two geometry columns stored in our `world` GeoDataFrame but only the original one is treated as an _active_ geometry (that is the one accessible directly, without getting the column first).

In [None]:
world.head()

We can also plot the results. Both Russia and Fiji are a bit weird as they cross the anti-meridian.

In [None]:
ax = world.plot(figsize=(12, 12))
world.convex_hull.plot(ax=ax, facecolor='none', edgecolor='black')

## Dask-GeoPandas

Dask-GeoPandas follows exactly the same model as `dask.dataframe` adopted for scaling `pandas.DataFrame`. We have a single `dask_geopandas.GeoDataFrame`, composed of individual partitions where each is a `geopandas.GeoDataFrame`.

## Create dask GeoDataFrame

We have a plenty of options how to build a `dask_geopandas.GeoDataFrame`. From in-memory `geopandas.GeoDataFrame`, reading the GIS file (using pyogrio under the hood), reading GeoParquet or Feather, or from dask.dataframe.

In [None]:
import dask_geopandas

In [None]:
world_ddf = dask_geopandas.from_geopandas(world, npartitions=4)
world_ddf

In [None]:
world_ddf_file = dask_geopandas.read_file(path, npartitions=4)
world_ddf_file

### Partitioned IO

Since we are working with individual partitions, it is useful to save the dataframe already partitioned. The ideal file format for that is a **GeoParquet**.

In [None]:
world_ddf.to_parquet("data/world/")
world_ddf.to_crs(4326).to_parquet("data/world_4326/")  # later we will need the dataset in EPSG:4326 so we can already prepare it.

For more complex tasks, we recommend using Parquet IO as an intermediate step to avoid large task graphs.

In [None]:
world_ddf = dask_geopandas.read_parquet("data/world/")
world_ddf

## Embarrassingly parallel computation

The first type of operations where you can benefit from parallelisation is so-called embarrassingly parallel computation. That is a computation where we treat individual partitions or individual rows indenpendently of the other, meaning there is no inter-worker communication and no data need to be sent elsewhere.

One example of that is a calculation of area.

In [None]:
area = world_ddf.area
area.visualize()

Similar one, this time returing a `dask_geopandas.GeoSeries` instead of a `dask.dataframe.Series` would be a `convex_hull` method.

In [None]:
convex_hull = world_ddf.convex_hull
convex_hull.visualize()

Since both are creating a series, we can assing both as individual columns. Let's see how that changes the task graph.

In [None]:
world_ddf['area'] = world_ddf.area
world_ddf['convex_hull'] = world_ddf.convex_hull
world_ddf.visualize()

Finally, we can call `compute()` and get all the results.

In [None]:
r = world_ddf.compute()

## Spatial join

If you have to deal with a large dataframes and need a spatial join, dask-geopandas can help. Let's try to illustrate the logic of spatial join on the partitioned data using the locations of airports from around the world.

In [None]:
import pandas as pd

In [None]:
airports = pd.read_csv("data/airports.csv")
airports.head()

The data comes as a CSV, so we first need to create a GeoDataFrame.

In [None]:
airports = geopandas.GeoDataFrame(
    airports,
    geometry=geopandas.GeoSeries.from_xy(
        airports["longitude_deg"],
        airports["latitude_deg"],
        crs=4326,
    )
)

And from that, we can create a partitioned `dask_geopandas.GeoDataFrame`. Note that we could also read the CSV with dask.dataframe and create a GeoDataFrame from that using the `dask_geopandas.from_dask_dataframe` function and `dask_geopandas.points_from_xy` to create the geometry. But since it all comfortably fits in memory, we can pick whichever option we like.

In [None]:
airports_ddf = dask_geopandas.from_geopandas(
    airports,
    npartitions=12
)

We will join the point data of airports with the `naturalearth_lowres` dataset we have stored as an already partitioned parquet.

In [None]:
world_ddf = dask_geopandas.read_parquet("data/world_4326/")
world_ddf

The API of the `sjoin` is exactly the same as you know it from geopandas. Just in this case, it currently only creates a task graph.

In [None]:
joined = airports_ddf.sjoin(world_ddf, predicate="within")

We started from 12 partitions of `airports_ddf` and 4 partitions of `world_ddf`. Since we haven't told Dask how are these partitions spatially distributed, it just plans to do the join of each partition from one dataframe to each partition form the other one. 12x4 = 48 partitions in the end. We can easily check that with the `npartitions` attribute.

In [None]:
joined.npartitions

The whole logic can also be represented by a task graph that illusrates the inneficiency of such an approach.

In [None]:
joined.visualize()

## Spatial partitioning

Luckily, dask-geopandas supports spatial partitioning. It means that it can calculate the spatial extent of each partition (as the overall convex hull of the partition) and use it internally to do smarter decisions when creating the task graph. 

But first, we need to calculate these paritions. This operation is done eagerly and involves immediate reading of all geometries.

In [None]:
airports_ddf.calculate_spatial_partitions()

The resulting `spatial_partitions` attribute is a `geopandas.GeoSeries`.

In [None]:
airports_ddf.spatial_partitions

In [None]:
airports_ddf.spatial_partitions.explore()

As you can see from the plot above, our partitions are not very homogenous in terms of their spatial distribution. Each contains points from nearly whole world. And that does not help with simplification of a task graph.

### The goal

We need our partitions to be spatially coherent to minimise the amount of inter-worker communication. So we have to find a way of reshuffling the data in between workers.

### Hilbert curve

One way of doing so is to follow the Hilbert space-filling curve, which is a 2-dimensional curve like the one below along which we can map our geometries (usually points). The distance along the Hilbert curve then represents a spatial proximity. Two points with a similar Hilbert distance are therefore ensured to be close in space.

![Hilbert](fig/Hilbert-curve_rounded-gradient-animated.gif)


(Animation by Tim Sauder, https://en.wikipedia.org/wiki/Hilbert_curve#/media/File:Hilbert-curve_rounded-gradient-animated.gif)

`dask-geopandas` (as of 0.1.0) implements Hilbert curve and two other methods based on a similar concept of space-filling (Morton curve and Geohash). You can either compute them directly or let `dask-geopandas` use them under the hood in a `spatial_shuffle` method that computes the distance along the curve and uses it to reshuffle the dataframe into spatially homogenous chunks. (Note that geometries are abstracted to the midpoint of their bounding box for the purpose of measuring the distance along the curve.)

In [None]:
hilbert_distance = airports_ddf.hilbert_distance()
hilbert_distance

In [None]:
hilbert_distance.compute()

`spatial_shuffle` uses by default `hilbert_distance` and partitions the dataframe based on this Series.

In [None]:
airports_ddf = airports_ddf.spatial_shuffle()

We can now check how the new partitions look like in space.

In [None]:
airports_ddf.spatial_partitions.explore()

When we are reading parquet file, its metadata already contain the information on the extent of each partition and we therefore don't have to calculate them by reading all the geometries. We can quickly check that.

In [None]:
world_ddf.spatial_partitions is not None

The world dataset has known partitions but is not spatially shuffled.

In [None]:
world_ddf.spatial_partitions.explore()

Even without doing that, we can already see that the resulting number of partitions is now 33, instead of 48 as some of the joins that would result in an empty dataframe are simply filtered out.

In [None]:
joined = airports_ddf.sjoin(world_ddf, predicate="within")

In [None]:
joined.npartitions

In [None]:
%%time
joined.compute()

## Aggregations with dissolve

Dissolve is a typical operation when you usually need to do some shuffling of data between workers to ensure that all observations within the same category (specified using the `by` keyword) end up in the same partition so they can actually be dissolved into a single polygon. As you can imagine, proper spatial partitions may help as well but in this case, they help only in the computation, not in the task graph.

Let's have a look at an example.

In [None]:
world_ddf = dask_geopandas.read_parquet("data/world/")

continents = world_ddf.dissolve('continent', split_out=6, sort=False)
continents

Above, we are using the API we know from GeoPandas with one new keyword - `split_out`. That specifies into how many partitions should we send the dissolved result. The whole method is based on the `groupby`, exactly as the original one, which returns a single partition by default. We rarely want that to happen.

In [None]:
continents.visualize()

The task graph shows exactly what happens. Since Dask doesn't know which categories are where, it designs the task graph to move potentially shuffle data from every original partition to every new one. In reality, some of these will be empty. And the better spatial partitions we have, the more of them will be empty, hence our operation will be cheaper.

## Limits and caveats

Truth to be told, we are now playing with the version 0.1 of dask-geopandas and not everything is as polished as we would like it to be. So there are some things which are not yet fully supported.

- **Overlapping computation** - With `dask.dataframe` and `dask.array` you can use `map_overlap` to do some overlapping computations for which you need observations from neighbouring partitions. With dask-geopandas, we would need this overlap to be spatial and that is not yet supported. That means that whatever depends on topology or similar operation is currently not very easy to parallelise.
- **Spatial indexing** - While you can use spatial indexing over spatial partitions and then within individual partitions as we use it under the hood in `sjoin`, it requires a bit of low-level dask code to make it correctly run. We hope to make that easier at some point in the future. We also want to expand the use of the spatial partitioning information to more methods (currently only `sjoin` makes use of it).
- **Memory management** - Even though Dask can work out-of-core and you may seem dask-geopandas behaving that way sometimes, we still have some unresolved memory issues due to geometries being stored as C objects, hence their actual size is not directly visible to Dask.

There are also the same, or at least very similar, rules when not to use dask-geopandas as they apply to vanilla dask.dataframe (from [Dask documentation](https://docs.dask.org/en/stable/dataframe-best-practices.html)):

- For data that fits into RAM, geopandas can often be faster and easier to use than Dask. While “Big Data” tools can be exciting, they are almost always worse than normal data tools while those remain appropriate. But for embarrasinlgy parallel computation, it will often bring speedup with minimal overhead.
- Similar to above, even if you have a large dataset there may be a point in your computation where you’ve reduced things to a more manageable level. You may want to switch to (geo)pandas at this point.

```
df = dd.read_parquet('my-giant-file.parquet')
df = df[df.name == 'Alice']              # Select a subsection
result = df.groupby('id').value.mean()   # Reduce to a smaller size
result = result.compute()                # Convert to pandas dataframe
result...                                # Continue working with pandas
```

- Usual pandas and GeoPandas performance tips like avoiding apply, using vectorized operations, using categoricals, etc., all apply equally to Dask DataFrame and dask-geopandas.

See more best practices in the [Dask documentation](https://docs.dask.org/en/stable/dataframe-best-practices.html).