# (Distributed) areal interpolation

In this notebook, we compare the single-core version in `tobler.area_weighted.area_interpolate` with the distributed version in `tobler.area_weighted.area_interpolate_dask`. 

In [1]:
import os
os.environ['USE_PYGEOS'] = '1'

import pandas
import geopandas
import dask_geopandas
import tobler
from libpysal.examples import load_example
import numpy as np

from dask.distributed import Client, LocalCluster

## Setup

Load example data from `pysal`:

In [2]:
c1 = load_example('Charleston1')
c2 = load_example('Charleston2')

crs = 6569  # https://epsg.io/6569

tracts = geopandas.read_file(c1.get_path('sc_final_census2.shp')).to_crs(crs)
zip_codes = geopandas.read_file(c2.get_path('CharlestonMSA2.shp')).to_crs(crs)

We make up a categorical variable with four classes distributed randomly across the dataset:

In [3]:
rng = np.random.default_rng(seed=42)

tracts['rando'] = pandas.Series(
    rng.integers(0, 4, len(tracts)), dtype='category'
)

We will set up a local Dask cluster so you can follow the computations on the dashboard (`http://localhost:8787` by default):

In [2]:
client = Client(LocalCluster(n_workers=8))

Finally, for Dask, we need to provide `dask_geopandas.GeoDataFrame` objects with spatial partitions and categorical variables properly set up:

In [7]:
dtracts = (
    dask_geopandas.from_geopandas(tracts[['geometry', 'rando']], npartitions=16)
    .spatial_shuffle(by='hilbert', shuffle="tasks")
)

dzips = (
    dask_geopandas.from_geopandas(zip_codes[['ZIP', 'geometry']], npartitions=16)
    .spatial_shuffle(by='hilbert', shuffle="tasks")
)

---

**IMPORTANT** - At this point, only *categorical* variables are implemented, so those are what we will test.

---

## Correctness

- Single core

In [8]:
cat_sc = tobler.area_weighted.area_interpolate(
    tracts, zip_codes, categorical_variables=['rando']
)

- Dask

In [9]:
cat_dk = tobler.area_weighted.area_interpolate_dask(
    dtracts, dzips, 'ZIP', categorical_variables=['rando']
).compute()

And we can compare both results are the same:

In [8]:
a = (
    cat_dk
    .set_index('ZIP')
    .reindex(zip_codes['ZIP'].values)
    .drop(columns='geometry')
)

b = (
    cat_sc
    .drop(columns='geometry')
    [['rando_0', 'rando_1', 'rando_2', 'rando_3']]
)
b.index = a.index

(a - b).max()

rando_0    4.188295e-08
rando_1    5.328575e-08
rando_2    5.396667e-08
rando_3    2.935173e-08
dtype: float64

The differences in the estimates for the proportions of each area start at the 8th decimal, and thus likely rounding errors derived from the different approaches used to compute the interpolation (the single core does it in one-shot, while Dask computes parts and brings them together later with a sum).

## Performance

---

**NOTE** - Timings below do _not_ include computation time required for spatial shuffling and partitioning (which can be substantial with large datasets), or converting from `geopandas`. These are "sunk costs" that'll only make this approach preferable with large datasets, although they can be computed once and the result stored in disk efficiently (e.g., as Parquet files). Having said that, when "larger" is large enough is not very large in modern terms: from a handful of thousand observations the gains will be substantial if several cores/workers are available.

---

We can now time the example above:


In [12]:
%%timeit
cat_sc = tobler.area_weighted.area_interpolate(
    tracts, zip_codes, categorical_variables=['rando']
)

85 ms ± 1.51 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)


In [13]:
%%timeit
cat_dk = tobler.area_weighted.area_interpolate_dask(
    dtracts, dzips, 'ZIP', categorical_variables=['rando']
).compute()

1.41 s ± 51.9 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


This is notably slower (about 5x!). For such a small dataset, the overhead in distributing computations and collecting them overcomes any gains in parallelism.

Now we can artificially increase the size of the datasets by concatenating them several times and re-computing (this time we only time one execution):

In [17]:
sizeup = 40
tracts_lrg = pandas.concat([tracts] * sizeup)
zips_lrg = pandas.concat([zip_codes] * sizeup)
print(
    f'{sizeup}x increase | N. tracts: {len(tracts_lrg)} | N. ZIPs: {len(zips_lrg)}'
)

dtracts_lrg = (
    dask_geopandas.from_geopandas(tracts_lrg[['geometry', 'rando']], chunksize=500)
    .spatial_shuffle(by='hilbert', shuffle="tasks")
)

dzips_lrg = (
    dask_geopandas.from_geopandas(zips_lrg[['ZIP', 'geometry']], chunksize=500)
    .spatial_shuffle(by='hilbert', shuffle="tasks")
)

40x increase | N. tracts: 4680 | N. ZIPs: 1680


This may cause some slowdown.
Consider scattering data ahead of time and using futures.
This may cause some slowdown.
Consider scattering data ahead of time and using futures.


And re-compute the timings:

---

### 10x

In [14]:
%%time
print(f'Computing for a sizeup of {sizeup}x')
cat_sc = tobler.area_weighted.area_interpolate(
    tracts_lrg, zips_lrg, categorical_variables=['rando']
)

Computing for a sizeup of 10x
CPU times: user 7.21 s, sys: 11.3 ms, total: 7.23 s
Wall time: 6.95 s


In [16]:
%%time
print(f'Computing for a sizeup of {sizeup}x')
cat_dk = tobler.area_weighted.area_interpolate_dask(
    dtracts_lrg, dzips_lrg, 'ZIP', categorical_variables=['rando']
).compute()

Computing for a sizeup of 10x
CPU times: user 548 ms, sys: 18 ms, total: 566 ms
Wall time: 3.56 s


---

### 20x

In [18]:
%%time
print(f'Computing for a sizeup of {sizeup}x')
cat_sc = tobler.area_weighted.area_interpolate(
    tracts_lrg, zips_lrg, categorical_variables=['rando']
)

Computing for a sizeup of 20x
CPU times: user 28.6 s, sys: 26.1 ms, total: 28.7 s
Wall time: 27.6 s


In [24]:
%%time
print(f'Computing for a sizeup of {sizeup}x')
cat_dk = tobler.area_weighted.area_interpolate_dask(
    dtracts_lrg, dzips_lrg, 'ZIP', categorical_variables=['rando']
).compute()

Computing for a sizeup of 20x


This may cause some slowdown.
Consider scattering data ahead of time and using futures.


CPU times: user 1.32 s, sys: 65.3 ms, total: 1.38 s
Wall time: 9.86 s


---

### 30x

In [26]:
%%time
print(f'Computing for a sizeup of {sizeup}x')
cat_sc = tobler.area_weighted.area_interpolate(
    tracts_lrg, zips_lrg, categorical_variables=['rando']
)

Computing for a sizeup of 30x
CPU times: user 1min 4s, sys: 176 ms, total: 1min 4s
Wall time: 1min 1s


In [7]:
%%time
print(f'Computing for a sizeup of {sizeup}x')
cat_dk = tobler.area_weighted.area_interpolate_dask(
    dtracts_lrg, dzips_lrg, 'ZIP', categorical_variables=['rando']
).compute()

Computing for a sizeup of 30x


This may cause some slowdown.
Consider scattering data ahead of time and using futures.


CPU times: user 1.91 s, sys: 58.8 ms, total: 1.97 s
Wall time: 14.6 s


---

### 40x

In [17]:
%%time
print(f'Computing for a sizeup of {sizeup}x')
cat_sc = tobler.area_weighted.area_interpolate(
    tracts_lrg, zips_lrg, categorical_variables=['rando']
)

Computing for a sizeup of 40x
CPU times: user 2min 2s, sys: 1.71 s, total: 2min 3s
Wall time: 1min 53s


In [18]:
%%time
print(f'Computing for a sizeup of {sizeup}x')
cat_dk = tobler.area_weighted.area_interpolate_dask(
    dtracts_lrg, dzips_lrg, 'ZIP', categorical_variables=['rando']
).compute()

Computing for a sizeup of 40x


This may cause some slowdown.
Consider scattering data ahead of time and using futures.


CPU times: user 6.99 s, sys: 512 ms, total: 7.5 s
Wall time: 30.5 s
