# More climatology reductions

This one is motivated by
[this Pangeo Discourse post](https://discourse.pangeo.io/t/dask-xarray-and-swap-memory-polution-on-local-linux-cluster/2453/5)
and follows
[this notebook](https://nbviewer.ipython.org/gist/fmaussion/95d1b9c9a3113db2f987b91e842cb8e0)

The task is to compute an hourly climatology from an hourly dataset with 744
hours in each chunk.

We choose the "map-reduce" strategy because:

1. all hours (groups) are present in each chunk;
2. a groupby reduction applied blockwise will result in arrays of shape (X,
   Y, 744) being reduced to (X, Y, 24) i.e. 744/24=31x decrease in chunk size,
   so this should work well memory wise.


In [None]:
import dask.array
import numpy as np
import pandas as pd
import xarray as xr
from dask.distributed import Client
from distributed import performance_report

import flox.xarray

# Setup a local cluster.
# By default this sets up 1 worker per core
client = Client(memory_limit="2 GiB", threads_per_worker=1, n_workers=4)
client.cluster

Perhaps you already have a cluster running?
Hosting the HTTP server on port 51613 instead


Tab(children=(HTML(value='<div class="jp-RenderedHTMLCommon jp-RenderedHTML jp-mod-trusted jp-OutputArea-outpu…

In [None]:
%load_ext watermark


%watermark -iv

dask  : 2022.3.0
pandas: 1.3.5
numpy : 1.21.5
xarray: 0.20.3.dev137+g3f3a197c8



## Create data


In [None]:
ds = xr.Dataset(
    {
        "tp": (
            ("time", "latitude", "longitude"),
            dask.array.ones((8760, 721, 1440), chunks=(744, 50, 1440), dtype=np.float32),
        )
    },
    coords={"time": pd.date_range("2021-01-01", "2021-12-31 23:59", freq="H")},
)
ds

Unnamed: 0,Array,Chunk
Bytes,33.88 GiB,204.35 MiB
Shape,"(8760, 721, 1440)","(744, 50, 1440)"
Count,180 Tasks,180 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 33.88 GiB 204.35 MiB Shape (8760, 721, 1440) (744, 50, 1440) Count 180 Tasks 180 Chunks Type float32 numpy.ndarray",1440  721  8760,

Unnamed: 0,Array,Chunk
Bytes,33.88 GiB,204.35 MiB
Shape,"(8760, 721, 1440)","(744, 50, 1440)"
Count,180 Tasks,180 Chunks
Type,float32,numpy.ndarray


Here's just plain xarray: 10000 tasks and one chunk per hour in the output


In [None]:
ds.tp.groupby("time.hour").mean()

Unnamed: 0,Array,Chunk
Bytes,95.05 MiB,281.25 kiB
Shape,"(24, 721, 1440)","(1, 50, 1440)"
Count,10620 Tasks,360 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 95.05 MiB 281.25 kiB Shape (24, 721, 1440) (1, 50, 1440) Count 10620 Tasks 360 Chunks Type float32 numpy.ndarray",1440  721  24,

Unnamed: 0,Array,Chunk
Bytes,95.05 MiB,281.25 kiB
Shape,"(24, 721, 1440)","(1, 50, 1440)"
Count,10620 Tasks,360 Chunks
Type,float32,numpy.ndarray


And flox: 600 tasks and all hours in a single chunk


In [None]:
hourly = flox.xarray.xarray_reduce(ds.tp, ds.time.dt.hour, func="mean")
hourly

Unnamed: 0,Array,Chunk
Bytes,95.05 MiB,6.59 MiB
Shape,"(24, 721, 1440)","(24, 50, 1440)"
Count,642 Tasks,15 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 95.05 MiB 6.59 MiB Shape (24, 721, 1440) (24, 50, 1440) Count 642 Tasks 15 Chunks Type float32 numpy.ndarray",1440  721  24,

Unnamed: 0,Array,Chunk
Bytes,95.05 MiB,6.59 MiB
Shape,"(24, 721, 1440)","(24, 50, 1440)"
Count,642 Tasks,15 Chunks
Type,float32,numpy.ndarray


In [None]:
with performance_report("hourly-climatology.html"):
    hourly.compute()

View the performance report
[here](https://rawcdn.githack.com/dcherian/flox/592c46ba0bb859f732968b68426b6332caebc213/docs/source/user-stories/hourly-climatology.html),
and a video of the dask dashboard
[here](https://drive.google.com/file/d/1uY36DiTbv1w7TefbrCEyBcOli5NiaNUP/view?usp=sharing)
