# GeoPython 2022 - Introduction to `dask-geopandas`

**Martin Fleischmann, Joris van den Bossche**

22/06/2022, Basel

## Setup

Follow the Readme to set-up the environment correctly. You should have these packages installed:

```
- geopandas
- dask-geopandas
- pyogrio
- pyarrow
- python-graphviz
- esda
- dask-labextension # optionally, if using JupyterLab
```

## GeoPandas refresh

Let's start with a quick refresh of GeoPandas.

### What is GeoPandas?

**Easy, fast and scalable geospatial analysis in Python**

From the docs:

> The goal of GeoPandas is to make working with geospatial data in python easier. It combines the capabilities of pandas and shapely, providing geospatial operations in pandas and a high-level interface to multiple geometries to shapely. GeoPandas enables you to easily do operations in python that would otherwise require a spatial database such as PostGIS.

A quick demo:

In [None]:
import geopandas

GeoPandas includes some built-in data, we can use them as an illustration.

In [None]:
path = geopandas.datasets.get_path("naturalearth_lowres")
path

In [None]:
world = geopandas.read_file(path)
world.head()

For the sake of simplicity here, we can remove Antarctica and re-project the data to the EPSG 3857, which will not complain about measuring the area (but never use EPSG 3857 to measure the actual area as it is extremely skewed).

In [None]:
world = world.query("continent != 'Antarctica'").to_crs(3857)

GeoPandas GeoDataFrame can carry one or more geometry columns and brings the support of geospatial operations on these columns. Like a creation of a convex hull.

In [None]:
world['convex_hull'] = world.convex_hull

This is equal to the code above as GeoPandas exposes geometry methods of the active geometry column to the GeoDataFrame level:

In [None]:
world['convex_hull'] = world.geometry.convex_hull  

Now you can see that we have two geometry columns stored in our `world` GeoDataFrame but only the original one is treated as an _active_ geometry (that is the one accessible directly, without getting the column first).

In [None]:
world.head()

We can also plot the results. Both Russia and Fiji are a bit weird as they cross the anti-meridian.

In [None]:
ax = world.plot(figsize=(12, 12))
world.convex_hull.plot(ax=ax, facecolor='none', edgecolor='black')

## What is Dask

From the docs:

> Dask provides advanced parallelism and distributed out-of-core computation with a dask.dataframe module designed to scale pandas. Since GeoPandas is an extension to the pandas DataFrame, the same way Dask scales pandas can also be applied to GeoPandas.

We will cover the high-level API of Dask. For more, see the [Dask tutorial](https://tutorial.dask.org).

Let's import `numpy` and `pandas` for a comparison and three high-level Dask modules - `bag`, `array`, and `dataframe`.

In [None]:
import numpy as np
import pandas as pd

import dask.dataframe as dd
import dask.array as da
import dask.bag as db

Before we explore those, let's introduce the dask `Client` as it will allow us to see how dask manages all its tasks.

Here we create a Client on top of a local (automatically created) cluster with 4 workers (the laptop we use has 4 performance cores).

In [None]:
from dask.distributed import Client

client = Client(n_workers=4)

In [None]:
client

We can open the Dask dashboard to watch what is happenning in real-time using the link above, in the Client details. But if you have a [JupyterLab extension for Dask](https://github.com/dask/dask-labextension), you can watch different components directly from the JupyterLab interface.

### dask.bag

With the Client and cluster in place, we can properly explore Dask. Let's start with a `dask.bag`, the simplest of the objects. You can imagine it as a distributed list.

In [None]:
b = db.from_sequence([1, 2, 3, 4, 5, 6, 2, 1], npartitions=2)
b

Now, note that when we try to call `b`, we don't see its contents. 

Let's check what happens with `sum`.

In [None]:
b.sum()

Again, we don't see the answer, but some abstract `Item` object instead. That is because Dask usually runs all the operations lazily and waits for a `compute` call before it does the actual computation.

Instead, it plans what it should do and create a task graph. That looks like this:

In [None]:
b.sum().visualize()

We can see individual partitions (rectangles), operations (circles), and movement of data between partitions.

When we call `compute`, this task graph is executed and Dask returns the expected value.

In [None]:
b.sum().compute()

### dask.array

Let's move onto an array. Where bag is partitioned along 1 dimension (the sequence is essentially cut into pieces), array is like a numpy array split along both dimension. In practice, each of the partitions is a numpy array and dask array combines them together. Each partition can be then processed separately. 

In [None]:
data = np.arange(100_000).reshape(200, 500)
a = da.from_array(data, chunks=(100, 100))
a

We see some dimensions, dtypes and sizes here but not the data. Beacause again, all is done lazily.

In [None]:
a[:50, 200]

Even indexing requires `compute` to return values, otherwised it still give a dask array.

In [None]:
a[:50, 200].compute()

Simlarly for `mean`.

In [None]:
a.mean().compute()

Since the mean is not super straightforward to parallelise (you can't just do mean in partitions and then mean of that), we can check how dask implements its logic.

In [None]:
a.mean().visualize()

Quite complex, right? Let's compare it to the indexing we did before.

In [None]:
a[:50, 200].visualize()

You can see that dask efficiently accesses only that one partition it needs at this moment.

### dask.dataframe

Finally, we move to the parallelised DataFrame. It mirrors the logic of the array implementation, with a difference that individual partitions are pandas.DataFrames and partitioning happens along a single axis (rows).

Dask.dataframe tries to mirror the pandas API. The same approach as we will later see with dask-geopandas.

In [None]:
df = pd.read_csv("data/airports.csv")
df.head()

In this specific case (`head()`), dask actually reads those 5 rows and shows them but that tends to be an exception, likely because it is a very cheap operation.

In [None]:
df = dd.read_csv("data/airports.csv")
df.head()

If you try to show the whole DataFrame, you get a placeholder that tells you how many partitions you have, which columns and what are their dtypes.

In [None]:
df

Since the `airports.csv` is a single file on disk, dask gives us a single partion. To create a partitioned data frame, we can repartition it and even save to a partitioned CSV.

In [None]:
df.repartition(4).to_csv("data/airports_csv/*.csv", index=False)

When we have more of CSV files in a folder, typically one per month or a country, we can read each as a partition. 

In [None]:
df = dd.read_csv("data/airports_csv/*.csv")
df

As before, all the computation is done lazily. Take a look at the computation of mean elevation.

In [None]:
elevation = df.elevation_ft.mean()
elevation

We get a dask Scalar object here but as before, we don't get the value until we call `compute()`.

In [None]:
elevation.compute()

You can probably notice the similarity of the graph with the one calculating mean over an array.

In [None]:
elevation.visualize()

We can also quickly compare it to a task graph for something easier, like a sum.

In [None]:
df.elevation_ft.sum().visualize()

Not that it makes much sense to compute a sum of elevations, but we can do that and if you're checking the dashboard, you'll notice very little communication as the task is easy to parallelise and we need to gather the results only in the final step.

In [None]:
df.elevation_ft.sum().compute()

## Dask-GeoPandas

Dask-GeoPandas follows exactly the same model as `dask.dataframe` adopted for scaling `pandas.DataFrame`. We have a single `dask_geopandas.GeoDataFrame`, composed of individual partitions where each is a `geopandas.GeoDataFrame`.

In [None]:
import dask_geopandas

## Create dask GeoDataFrame

We have a plenty of options how to build a `dask_geopandas.GeoDataFrame`. From in-memory `geopandas.GeoDataFrame`, reading the GIS file (using pyogrio under the hood), reading GeoParquet or Feather, or from dask.dataframe.

In [None]:
world_ddf = dask_geopandas.from_geopandas(world, npartitions=4)
world_ddf

In [None]:
world_ddf_file = dask_geopandas.read_file(path, npartitions=4)
world_ddf_file

### Partitioned IO

Since we are working with individual partitions, it is useful to save the dataframe already partitioned. The ideal file format for that is a GeoParquet.

In [None]:
world_ddf.to_parquet("data/world/")
world_ddf.to_crs(4326).to_parquet("data/world_4326/")  # later we will need the dataset in EPSG:4326 so we can already prepare it.

For more complex tasks, we recommend using Parquet IO as an intermediate step to avoid large task graphs.

In [None]:
world_ddf = dask_geopandas.read_parquet("data/world/")
world_ddf

## Embarrassingly parallel computation

The first type of operations where you can benefit from parallelisation is so-called embarrassingly parallel computation. That is a computation where we treat individual partitions or individual rows indenpendently of the other, meaning there is no inter-worker communication and no data need to be sent elsewhere.

One example of that is a calculation of area.

In [None]:
area = world_ddf.area
area.visualize()

Similar one, this time returing a `dask_geopandas.GeoSeries` instead of a `dask.dataframe.Series` would be a `convex_hull` method.

In [None]:
convex_hull = world_ddf.convex_hull
convex_hull.visualize()

Since both are creating a series, we can assing both as individual columns. Let's see how that changes the task graph.

In [None]:
world_ddf['area'] = world_ddf.area
world_ddf['convex_hull'] = world_ddf.convex_hull
world_ddf.visualize()

Finally, we can call `compute()` and get all the results.

In [None]:
r = world_ddf.compute()

## Spatial join

If you have to deal with a large dataframes and need a spatial join, dask-geopandas can help. Let's try to illustrate the logic of spatial join on the partitioned data using the locations of airports from around the world.

In [None]:
airports = pd.read_csv("data/airports.csv")
airports.head()

The data comes as a CSV, so we first need to create a GeoDataFrame.

In [None]:
airports = geopandas.GeoDataFrame(
    airports,
    geometry=geopandas.GeoSeries.from_xy(
        airports["longitude_deg"],
        airports["latitude_deg"],
        crs=4326,
    )
)

And from that, we can create a partitioned `dask_geopandas.GeoDataFrame`. Note that we could also read the CSV with dask.dataframe and create a GeoDataFrame from that using the `dask_geopandas.from_dask_dataframe` function and `dask_geopandas.points_from_xy` to create the geometry. But since it all comfortably fits in memory, we can pick whichever option we like.

In [None]:
airports_ddf = dask_geopandas.from_geopandas(
    airports,
    npartitions=12
)

We will join the point data of airports with the `naturalearth_lowres` dataset we have stored as an already partitioned parquet.

In [None]:
world_ddf = dask_geopandas.read_parquet("data/world_4326/")
world_ddf

The API of the `sjoin` is exactly the same as you know it from geopandas. Just in this case, it currently only creates a task graph.

In [None]:
joined = airports_ddf.sjoin(world_ddf, predicate="within")

We started from 12 partitions of `airports_ddf` and 4 partitions of `world_ddf`. Since we haven't told Dask how are these partitions spatially distributed, it just plans to do the join of each partition from one dataframe to each partition form the other one. 12x4 = 48 partitions in the end. We can easily check that with the `npartitions` attribute.

In [None]:
joined.npartitions

The whole logic can also be represented by a task graph that illustates the inneficiency of such an approach.

In [None]:
joined.visualize()

## Spatial partitioning

Luckily, dask-geopandas supports spatial partitioning. It means that it can calculate the spatial extent of each partition (as the overall convex hull of the partition) and use it internally to do smarter decisions when creating the task graph. 

But first, we need to calculate these paritions. This operation is done eagerly and involves immediate reading of all geometries.

In [None]:
airports_ddf.calculate_spatial_partitions()

The resulting `spatial_partitions` attribute is a `geopandas.GeoSeries`.

In [None]:
airports_ddf.spatial_partitions

In [None]:
airports_ddf.spatial_partitions.explore()

As you can see from the plot above, our partitions are not very homogenous in terms of their spatial distribution. Each contains points from nearly whole world. And that does not help with simplification of a task graph.

### The goal

We need our partitions to be spatially coherent to minimise the amount of inter-worker communication. So we have to find a way of reshuffling the data in between workers.

### Hilbert curve

One way of doing so is to follow the Hilbert space-filling curve, which is a 2-dimensional curve like the one below along which we can map our geometries (usually points). The distance along the Hilbert curve then represents a spatial proximity. Two points with a similar Hilbert distance are therefore ensured to be close in space.

![Hilbert](fig/Hilbert-curve_rounded-gradient-animated.gif)

(Animation by Tim Sauder, https://en.wikipedia.org/wiki/Hilbert_curve#/media/File:Hilbert-curve_rounded-gradient-animated.gif)

`dask-geopandas` (as of 0.1.0) implements Hilbert curve and two other methods based on a similar concept of space-filling (Morton curve and Geohash). You can either compute them directly or let `dask-geopandas` use them under the hood in a `spatial_shuffle` method that computes the distance along the curve and uses it to reshuffle the dataframe into spatially homogenous chunks. (Note that geometries are abstracted to the midpoint of their bounding box for the purpose of measuring the distance along the curve.)

In [None]:
hilbert_distance = airports_ddf.hilbert_distance()
hilbert_distance

In [None]:
hilbert_distance.compute()

`spatial_shuffle` uses by default `hilbert_distance` and partitions the dataframe based on this Series.

In [None]:
airports_ddf = airports_ddf.spatial_shuffle()

We can now check how the new partitions look like in space.

In [None]:
airports_ddf.spatial_partitions.explore()

When we are reading parquet file, its metadata already contain the information on the extent of each partition and we therefore don't have to calculate them by reading all the geometries. We can quickly check that.

In [None]:
world_ddf.spatial_partitions is not None

The world dataset has known partitions but is not spatially shuffled.

In [None]:
world_ddf.spatial_partitions.explore()

Even without doing that, we can already see that the resulting number of partitions is now 33, instead of 48 as some of the joins that would result in an empty dataframe are simply filtered out.

In [None]:
joined = airports_ddf.sjoin(world_ddf, predicate="within")

In [None]:
joined.npartitions

In [None]:
%%time
joined.compute()

### What about a larger problem?

Dropping down from 48 to 33 partitions doesn't sound like a big deal. But you usually want to use dask-geopandas to tackle a bit larger problems. To simulate one, We can load a GADM dataset containd detailed administrative boundaries of the whole world (around 2GB GPKG) and join our airport data to that.

_Note that this dataset is not part of the repository so you will not be able to run these two cells._

_If you want to run the code you need to download the `gadm404.gpkg` from [GADM.org](https://gadm.org/download_world.html) and unzip it to the `data` folder._

_To create the `gadm_spatial` used below, you need to read the GPKG, spatially shuffle it and save it as a GeoParquet:_

```
gadm_ddf = dask_geopandas.read_file('data/gadm404.gpkg', npartitions=64)
gadm_ddf.spatial_shuffle().to_parquet("data/gadm_spatial/")
```

In [None]:
gadm_ddf = dask_geopandas.read_file('data/gadm404.gpkg', npartitions=64)
joined = airports_ddf.sjoin(gadm_ddf, predicate="within")
joined.npartitions

Without any spatial sorting of the GADM dataset, we have to do 12*64 joins.

But we can load the same dataset that has been spatially sorted.

In [None]:
gadm_sorted = dask_geopandas.read_parquet("data/gadm_spatial/")
joined = airports_ddf.sjoin(gadm_sorted, predicate="within")
joined.npartitions

The resulting number of partitions is 151, filtering out more than 80% of spatial joins that no longer need to be done.

## Aggregations with dissolve

Dissolve is a typical operation when you usually need to do some shuffling of data between workers to ensure that all observations within the same category (specified using the `by` keyword) end up in the same partition so they can actually be dissolved into a single polygon. As you can imagine, proper spatial partitions may help as well but in this case, they help only in the computation, not in the task graph.

Let's have a look at an example.

In [None]:
world_ddf = dask_geopandas.read_parquet("data/world/")

continents = world_ddf.dissolve('continent', split_out=6)
continents

Above, we are using the API we know from GeoPandas with one new keyword - `split_out`. That specifies into how many partitions should we send the dissolved result. The whole method is based on the `groupby`, exactly as the original one, which returns a single partition by default. We rarely want that to happen.

In [None]:
continents.visualize()

The task graph shows exactly what happens. Since Dask doesn't know which categories are where, it designs the task graph to move potentially shuffle data from every original partition to every new one. In reality, some of these will be empty. And the better spatial partitions we have, the more of them will be empty, hence our operation will be cheaper.

## Custom functions with `map_partitions`

Not every function you may need is built-in and you often need to apply a custom one to the partitioned dataframe. The most common way of doing that is the `map_partitions` method.

The typical use case is below. We want to describe the shape of each polygon using the `shape_index` from the `esda` package. With geopandas, it would look like this.

In [None]:
from esda.shape import shape_index

world['shape_idx'] = shape_index(world)
world.explore('shape_idx')

But when you try to use the same code with a `dask_geopandas.GeoDataFrame`, it will not work.

In [None]:
# THIS WILL FAIL
world_ddf = dask_geopandas.read_parquet("data/world/")
world_ddf['shape_idx'] = shape_index(world_ddf)

In fact, it doesn't even give us a meaningful error message. Simply, because `esda` does not expect `dask_geopandas.GeoDataFrame` here, but expects `geopandas.GeoDataFrame` or some form of an in-memory array of geometries. Then it returns an array of floats.

In this case, we can use `map_partitions` to _map_ the `shape_index()` function across individual partitions. A dummy code would look something like this:

```py
results = []

for partition in ddf:
    results.append(shape_index(partition))
```

The actual code is of course not a loop like this but principle is the same. We take each partition and apply the function.

In [None]:
world_ddf = dask_geopandas.read_parquet("data/world/")
shape_idx = world_ddf.map_partitions(shape_index)

The resulting task graph is the same as we have seen before in simple cases of embarrasingly parralel computation. `map_partitions` will always be embarrasingly parralel.

In [None]:
shape_idx.visualize()

In [None]:
r = shape_idx.compute()

### Custom function

We can also write our custom functions to be used with `map_partitions`. The only rule is that everything needs to happen within a single GeoDataFrame, i.e. within a single partition independently of the other. But we don't need to return a new column but we can also return a single value for each partition. Like a sum of area covered by polygons in each partition. 

In [None]:
def my_fn(gdf):
    """get a sum of area covered by polygons in a gdf
    
    Parameters
    ----------
    gdf : GeoDataFrame
    
    Returns
    -------
    float
    
    """
    area = gdf.geometry.area
    return sum(area)

We cannot assign this as a new column as we did above, but we also don't need it.

In [None]:
world_ddf.map_partitions(my_fn).compute()

The task graph is the same as before, even though we return only a single value per partition.

In [None]:
world_ddf.map_partitions(my_fn).visualize()

If you want to assign the result as a new column of your dataframe, you need to ensure you return an array or a Series of the correct length.

In [None]:
def get_hull_area(gdf):
    """Get area of each convex hull and return pandas.Series
    
    Parameters
    ----------
    gdf : GeoDataFrame
    
    Returns
    -------
    pandas.Series
    """
    
    hulls = gdf.convex_hull
    return hulls.area

In [None]:
world_ddf['hull_area'] = world_ddf.map_partitions(get_hull_area)

In [None]:
world_ddf.compute()

The task graph now includes the `assign` step, assigning the new column to the dataframe.

In [None]:
world_ddf.visualize()

### Specifying meta-data

To build a task graph, Dask doesn't need to see the data. But it needs to understand their general structure and dtypes. With simple `map_partitions` cases, you don't need to worry about that as Dask is often able to figure that out itself. But sometimes it struggles. 

Let's look a bit under the hood here.

In [None]:
world_ddf = dask_geopandas.read_parquet("data/world_4326/")

Each object contains its `_meta` data. For GeoDataFrame, that is usually an empty frame with columns and their dtypes set. Like this one:

In [None]:
world_ddf._meta

You can see that all dtypes are set.

In [None]:
world_ddf._meta.dtypes

Now, we can try to implement our own version of `dissolve` that works well when all data fit in memory, to make the exmaple a bit more complicated. We need two steps:

1. Shuffle the data into partitions based on the `continent` column. That ensures that all observations from the same continent are within a single partition.
2. Use `map_partitions` to apply `dissolve` from geopandas.

We can use the `shuffle` method to do the first step. Note that this is also a form of spatial partitioning and it may be useful to follow the attribute if you have one and the final partitions are of a roughly the same size.

In [None]:
shuffled = world_ddf.shuffle(
    "continent", npartitions=7, ignore_index=True
)

In [None]:
shuffled.calculate_spatial_partitions()
shuffled.spatial_partitions.explore()

Sometimes the inference of the meta DataFrame just fails, mostly because it is empty. If that happens, you can manually specify the meta data data frame and pass it to the `map_partitions`. Even if it does not fail, like in this case, it is often better to directly pass it as it can be chaper and you avoid potential issues that may come as a result of a wrong inference.

In [None]:
meta = world_ddf._meta.dissolve(by="continent", as_index=False)
meta

With the `meta` defined, we can take the `geopandas.GeoDataFrame.dissolve` method and pass it to `map_partitions`. All `**kwargs` are passed as attributes to the method/function you are applying.

In [None]:
dissolved = shuffled.map_partitions(
    geopandas.GeoDataFrame.dissolve, by="continent", as_index=False, meta=meta
)

In [None]:
dissolved.visualize()

In [None]:
dissolved.compute()

When you are done, you can shut down the Dask client. If you want to do the exercises below, do not do that yet.

In [None]:
client.shutdown()

## Limits and caveats

Truth to be told, we are now playing with the version 0.1 of dask-geopandas and not everything is as polished as we would like it to be. So there are some things which are not yet fully supported.

- **Overlapping computation** - With `dask.dataframe` and `dask.array` you can use `map_overlap` to do some overlapping computations for which you need observations from neighbouring partitions. With dask-geopandas, we would need this overlap to be spatial and that is not yet supported. That means that whatever depends on topology or similar operation is currently not very easy to parallelise.
- **Spatial indexing** - While you can use spatial indexing over spatial partitions and then within individual partitions as we use it under the hood in `sjoin`, it requires a bit of low-level dask code to make it correctly run. We hope to make that easier at some point in the future. We also want to expand the use of the spatial partitioning information to more methods (currently only `sjoin` makes use of it).
- **Memory management** - Even though Dask can work out-of-core and you may seem dask-geopadnas behaving that way sometimes, we still have some unresolved memory issues due to geometries being stored as C objects, hence their actual size is not directly visible to Dask.

There are also the same, or at least very similar, rules when not to use dask-geopandas as they apply to vanilla dask.dataframe (from [Dask documentation](https://docs.dask.org/en/stable/dataframe-best-practices.html)):

- For data that fits into RAM, geopandas can often be faster and easier to use than Dask. While “Big Data” tools can be exciting, they are almost always worse than normal data tools while those remain appropriate. But for embarrasinlgy parallel computation, it will often bring speedup with minimal overhead.
- Similar to above, even if you have a large dataset there may be a point in your computation where you’ve reduced things to a more manageable level. You may want to switch to (geo)pandas at this point.

```
df = dd.read_parquet('my-giant-file.parquet')
df = df[df.name == 'Alice']              # Select a subsection
result = df.groupby('id').value.mean()   # Reduce to a smaller size
result = result.compute()                # Convert to pandas dataframe
result...                                # Continue working with pandas
```

- Usual pandas and GeoPandas performance tips like avoiding apply, using vectorized operations, using categoricals, etc., all apply equally to Dask DataFrame and dask-geopandas.

See more best practices in the [Dask documentation](https://docs.dask.org/en/stable/dataframe-best-practices.html).

## Exercises

Use the `data/airports_csv` folder and try to do the following using Dask:

- Read the contents as a `dask.dataframe`
- Create a valid `dask_geopandas.GeoDataFrame`
- Calculate and explore spatial partitions. If you think there's a need to spatially shuffle the data, do so.
    - Try comparing different sorting methods (check the docs!). Which one is the best an why?
- How many airports are there per continent? And how many per country?
- Are there any points not falling onto ground? How many?
- Where would be the ideal single airport in each country if you had to build only one?