## Distributed geospatial

In this section, we'll learn how to use a cluster of machines to create a cloud-free composite. Because of the amount of data involved, we'll distribute the workload on a cluster of machines.


As a reminder, Dask works by using high-level APIs (like xarray) to build task graphs. These task graphs are then executed by a scheduler:

![](https://docs.dask.org/en/latest/_images/dask-overview.svg)

Ideally, we don't need to change our use of high-level APIs like xarray or Dask DataFrame at all. We just swap out a distributed scheduler / cluster for the single-machine scheduler.

That said, it's worth remembering that going distributed makes many things more challenging. You need to think more about how to start workers, communication, distributed filesystems, environments, .... If you don't *need* a cluster, then you should avoid it. But for large enough problems, it's nice to have the option.

## Create a cluster

This JupyterHub is configured with [Dask Gateway](https://gateway.dask.org/), a convenient way to create Dask clusters.

In [None]:
import dask_gateway

cluster = dask_gateway.GatewayCluster()
client = cluster.get_client()
cluster.scale(16)

cluster

Since we created a Client connected to that cluster, any operations using Dask will use that cluster. 

## Access Earth systems science data

We'll work with [monthly Daymet for North America](https://planetarycomputer.microsoft.com/dataset/daymet-monthly-na) data, providing summaries of temperature, precipitation, and other variables.

In [None]:
import pystac_client

stac = pystac_client.Client.open("https://planetarycomputer.microsoft.com/api/stac/v1/")
c = stac.get_collection("daymet-monthly-na")
c

The data is provided in Zarr format.

In [None]:
asset = c.assets["zarr-https"]
asset

Which can be loaded into xarray.

In [None]:
import xarray as xr

ds = xr.open_zarr(asset.href, **asset.extra_fields["xarray:open_kwargs"])
ds

This is a pretty large Dataset. We can access subsets of the data and aggregate it, for example by taking the mean over time.

Since we connected a client to our cluster earlier, the computation will happen on our Dask cluster.

In [None]:
import hvplot.xarray  # noqa

x = ds["prcp"].sel(time=slice("1985", None)).mean(dim="time").compute()
x.hvplot.image(x="x", y="y", rasterize=True, width=900, height=500)

Or we can aggregate over space to get a timeseries.

In [None]:
ts = ds["tmax"].mean(dim=("y", "x")).compute()
ts.plot(figsize=(12, 6));

In [None]:
ts.groupby("time.year").mean().plot();

Overall we see a (bumpy) increase in temperature since 1980. But not all spots in North America have warmed equally. We can get a sense for which areas

In [None]:
import dask

summer_months = [6, 7, 8]
summer = ds.tmax.where(ds.time.dt.month.isin(summer_months), drop=True)

early_period = slice(None, "1988-12-31")
late_period = slice("1988-01-01", "2018-12-31")

early, late = dask.compute(
    summer.sel(time=early_period).mean(dim="time"),
    summer.sel(time=late_period).mean(dim="time"),
)

In [None]:
increase = (late - early).coarsen(y=8, x=8, boundary="trim").mean()

In [None]:
import matplotlib.pyplot as plt

fig, ax = plt.subplots(figsize=(16, 12))

increase.plot(ax=ax, vmin=-6, vmax=6, cmap="RdBu")
ax.set_axis_off()

Let's free up some memory

In [None]:
client.restart();

## Access remote sensing data

Now we'll make a cloudless composite from a bunch of setallite imagery.

In [None]:
import pystac_client

bbox = [-59.69, -34.74, -58.24, -33.65]
stac = pystac_client.Client.open("https://planetarycomputer-staging.microsoft.com/api/stac/v1")

search = stac.search(
    bbox=bbox,
    datetime="2018-01-01/2020-12-31",
    collections=["sentinel-2-l2a"],
    limit=500,  # fetch items in batches of 500
    query={"eo:cloud_cover": {"lt": 25}},
)

In [None]:
%%time
items = list(search.get_all_items())
print(len(items))

In [None]:
import planetary_computer

signed_items = [planetary_computer.sign(item).to_dict() for item in items]

In [None]:
import stackstac
import rasterio.enums

ds = stackstac.stack(
    signed_items,
    assets=["B04", "B03", "B02"],
    epsg=32721,
    bounds_latlon=bbox,
    resolution=100,
    resampling=rasterio.enums.Resampling.bilinear,
).where(lambda x: x > 0)
ds

In [None]:
monthly_rgb = ds.resample(time="MS").median(dim="time")
monthly_rgb

In [None]:
rgb = monthly_rgb.compute()

In [None]:
rgb.plot.imshow(row="time", rgb="band", robust=True, size=6);