# Rechunker Tutorial

This tutorial notebook explains how to use rechunker with real datasets. We will also use xarray to make some things easier and prettier, but we note that xarray is not a dependency for rechunker.

## Toy Example

### Create Example Data

Here we load one of xarray's tutorial datasets and write it to Zarr. This is not actually a big dataset, so rechunker is not really needed here. But it's a convenient example.

In [1]:
import xarray as xr
xr.set_options(display_style='text')
import zarr
import dask.array as dsa

ds = xr.tutorial.open_dataset("air_temperature")
# create initial chunk structure
ds = ds.chunk({'time': 100})
ds.air.encoding = {} # helps when writing to zarr
ds

We can examine the chunk structure of the data variable using Dask's pretty Array repr.

In [2]:
ds.air.data

Unnamed: 0,Array,Chunk
Bytes,15.48 MB,530.00 kB
Shape,"(2920, 25, 53)","(100, 25, 53)"
Count,31 Tasks,30 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 15.48 MB 530.00 kB Shape (2920, 25, 53) (100, 25, 53) Count 31 Tasks 30 Chunks Type float32 numpy.ndarray",53  25  2920,

Unnamed: 0,Array,Chunk
Bytes,15.48 MB,530.00 kB
Shape,"(2920, 25, 53)","(100, 25, 53)"
Count,31 Tasks,30 Chunks
Type,float32,numpy.ndarray


In [3]:
! rm -rf *.zarr # clean up any existing temporary data
ds.to_zarr('air_temperature.zarr')

<xarray.backends.zarr.ZarrStore at 0x7f4eeb901230>

Now we open up a Zarr Group and Array that we will use as inputs to rechunker.

In [4]:
source_group = zarr.open('air_temperature.zarr')
print(source_group.tree())

/
 ├── air (2920, 25, 53) float32
 ├── lat (25,) float32
 ├── lon (53,) float32
 └── time (2920,) float32


In [5]:
source_array = source_group['air']
source_array.info

0,1
Name,/air
Type,zarr.core.Array
Data type,float32
Shape,"(2920, 25, 53)"
Chunk shape,"(100, 25, 53)"
Order,C
Read-only,False
Compressor,"Blosc(cname='lz4', clevel=5, shuffle=SHUFFLE, blocksize=0)"
Store type,zarr.storage.DirectoryStore
No. bytes,15476000 (14.8M)


### Rechunk a single Array

The original array has chunks of (100, 25, 53). Let's rechunk it to be contiguous in time, but chunked in space.
We specify a small value of `max_mem` in order to force rechunker to create an intermediate dataset. We also have to specify a place to store the final and intermediate data.

We use the [rechunk](api.rst#rechunker.rechunk) function, which returns a [Rechunked](api.rst#rechunker.Rechunked) object.

In [6]:
from rechunker import rechunk

target_chunks = (2920, 25, 1)
max_mem = '1MB'

target_store = 'air_rechunked.zarr'
temp_store = 'air_rechunked-tmp.zarr'

array_plan = rechunk(source_array, target_chunks, max_mem, target_store, temp_store=temp_store)
array_plan

Unnamed: 0,Array,Chunk
Bytes,15.48 MB,876.00 kB
Shape,"(2920, 25, 53)","(2920, 25, 3)"
Count,19 Tasks,18 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 15.48 MB 876.00 kB Shape (2920, 25, 53) (2920, 25, 3) Count 19 Tasks 18 Chunks Type float32 numpy.ndarray",53  25  2920,

Unnamed: 0,Array,Chunk
Bytes,15.48 MB,876.00 kB
Shape,"(2920, 25, 53)","(2920, 25, 3)"
Count,19 Tasks,18 Chunks
Type,float32,numpy.ndarray


Since this array has dimensions, we can also specify the chunks using a dictionary syntax.

In [7]:
target_chunks_dict = {'time': 2920, 'lat': 25, 'lon': 1}

# need to remove the existing stores or it won't work
!rm -rf air_rechunked.zarr air_rechunked-tmp.zarr
array_plan = rechunk(source_array, target_chunks_dict, max_mem, target_store, temp_store=temp_store)
array_plan

Unnamed: 0,Array,Chunk
Bytes,15.48 MB,876.00 kB
Shape,"(2920, 25, 53)","(2920, 25, 3)"
Count,19 Tasks,18 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 15.48 MB 876.00 kB Shape (2920, 25, 53) (2920, 25, 3) Count 19 Tasks 18 Chunks Type float32 numpy.ndarray",53  25  2920,

Unnamed: 0,Array,Chunk
Bytes,15.48 MB,876.00 kB
Shape,"(2920, 25, 53)","(2920, 25, 3)"
Count,19 Tasks,18 Chunks
Type,float32,numpy.ndarray


The `array_plan` is a `Rechunked` object.
It has not actually performed the rechunking yet.
To do this, we need to call the `execute` method.
This will use Dask to perform the rechunking.

In [8]:
result = array_plan.execute()
result.chunks

(2920, 25, 1)

By default, Dask will use the multi-threaded scheduler.
Since rechunking can take a long time, we might want to use a progress bar.

In [9]:
from dask.diagnostics import ProgressBar

with ProgressBar():
    array_plan.execute()

[########################################] | 100% Completed |  6.2s


If we create a distributed cluster, then rechunker will use that when it executes.

In [10]:
from dask.distributed import Client, LocalCluster, progress

cluster = LocalCluster()
client = Client(cluster)

In [11]:
future = array_plan.persist()
progress(future)

VBox()

Now that it is written to disk, we can open the rechunked array however we please. Using Zarr...

In [12]:
target_array = zarr.open('air_rechunked.zarr')
target_array

<zarr.core.Array (2920, 25, 53) float32>

...or Dask

In [13]:
target_array_dask = dsa.from_zarr('air_rechunked.zarr')
target_array_dask

Unnamed: 0,Array,Chunk
Bytes,15.48 MB,292.00 kB
Shape,"(2920, 25, 53)","(2920, 25, 1)"
Count,54 Tasks,53 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 15.48 MB 292.00 kB Shape (2920, 25, 53) (2920, 25, 1) Count 54 Tasks 53 Chunks Type float32 numpy.ndarray",53  25  2920,

Unnamed: 0,Array,Chunk
Bytes,15.48 MB,292.00 kB
Shape,"(2920, 25, 53)","(2920, 25, 1)"
Count,54 Tasks,53 Chunks
Type,float32,numpy.ndarray


### Rechunk a Group

In the example above, we only rechunked a single array.
We can open it with Dask, but not Xarray, because it doesn't contain any coordinates or metadata.

Rechunker also supports rechunking entire groups.
In this case, `target_chunks` must be a dictionary.

In [14]:
target_chunks = {
    'air': {'time': 2920, 'lat': 25, 'lon': 1},
    'time': None, # don't rechunk this array
    'lon': None,
    'lat': None,
}
max_mem = '1MB'

target_store = 'group_rechunked.zarr'
temp_store = 'group_rechunked-tmp.zarr'

array_plan = rechunk(source_group, target_chunks, max_mem, target_store, temp_store=temp_store)
array_plan

In [15]:
array_plan.execute()

<zarr.hierarchy.Group '/'>

Now that we have written a group, we can open it back up with Xarray.

In [16]:
xr.open_zarr('group_rechunked.zarr')

## Cloud Example

In this example we use real data from Pangeo's [Cloud Data Catalog](http://catalog.pangeo.io/).
This dataset is stored in Google Cloud Storage.
We also use a [Dask Gateway](https://gateway.dask.org/) distributed cluster to scale up our processing.
This part of the tutorial won't work for you unless you are in a [Pangeo Cloud](http://pangeo.io/cloud.html) environment or binder.

In [23]:
from dask_gateway import GatewayCluster
cluster = GatewayCluster()
cluster.scale(20)
cluster

VBox(children=(HTML(value='<h2>GatewayCluster</h2>'), HBox(children=(HTML(value='\n<div>\n<style scoped>\n    …

In [24]:
from dask.distributed import Client
client = Client(cluster)
client

0,1
Client  Scheduler: gateway://traefik-gcp-uscentral1b-prod-dask-gateway.prod:80/prod.8460d93791a9479090798dba8650ac36  Dashboard: https://us-central1-b.gcp.pangeo.io/services/dask-gateway/clusters/prod.8460d93791a9479090798dba8650ac36/status,Cluster  Workers: 0  Cores: 0  Memory: 0 B


In [25]:
import gcsfs
# a zarr group lives here
url = 'gs://pangeo-cmems-duacs'
gcs = gcsfs.GCSFileSystem(requester_pays=True)
source_store = gcs.get_mapper(url)

### Open Zarr Array

In [26]:
group = zarr.open_consolidated(source_store, mode='r')
source_array = group['sla']
source_array

<zarr.core.Array '/sla' (8901, 720, 1440) float64 read-only>

In [27]:
source_array.chunks

(5, 720, 1440)

### Make a Rechunking Plan

In [30]:
max_mem = '1GB'
target_chunks = (8901, 72, 72)
# you must have write access to this location
store_tmp = gcs.get_mapper('pangeo-scratch/rabernat/rechunker_demo/temp.zarr')
store_target = gcs.get_mapper('pangeo-scratch/rabernat/rechunker_demo/target.zarr')
r = rechunk(source_array, target_chunks, max_mem,
                      store_target, temp_store=store_tmp)
r

Unnamed: 0,Array,Chunk
Bytes,73.83 GB,738.28 MB
Shape,"(8901, 720, 1440)","(8901, 72, 144)"
Count,101 Tasks,100 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 73.83 GB 738.28 MB Shape (8901, 720, 1440) (8901, 72, 144) Count 101 Tasks 100 Chunks Type float64 numpy.ndarray",1440  720  8901,

Unnamed: 0,Array,Chunk
Bytes,73.83 GB,738.28 MB
Shape,"(8901, 720, 1440)","(8901, 72, 144)"
Count,101 Tasks,100 Chunks
Type,float64,numpy.ndarray


### Execute the Plan

In [33]:
result = r.execute()
result

<zarr.core.Array (8901, 720, 1440) float64>

In [35]:
dsa.from_zarr(result)

Unnamed: 0,Array,Chunk
Bytes,73.83 GB,369.14 MB
Shape,"(8901, 720, 1440)","(8901, 72, 72)"
Count,201 Tasks,200 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 73.83 GB 369.14 MB Shape (8901, 720, 1440) (8901, 72, 72) Count 201 Tasks 200 Chunks Type float64 numpy.ndarray",1440  720  8901,

Unnamed: 0,Array,Chunk
Bytes,73.83 GB,369.14 MB
Shape,"(8901, 720, 1440)","(8901, 72, 72)"
Count,201 Tasks,200 Chunks
Type,float64,numpy.ndarray
