# Creating a zarr store for the RACMO 2km snowmelt (2010-2022)
Ben Davison put many netcdfs containing 2km downscaled output from RACMO. They contain the snowmelt product as 2km from 2010-2022. 

This notebook steps through the process of getting all those data into one zarr store locally and in the bucket:
1. find the files
2. load each netcdf while unzipping
3. save each to a local zarr
4. load all the zarrs lazily and concatenate
5. write the whole dataset to disk and/or the bucket using rechunker

In [None]:
import xarray as xr
import os
import hvplot as hv
import fsspec
import pickle
import gzip
import tqdm
from rechunker import rechunk
from dask.diagnostics import ProgressBar
import json
import zarr

## find the netcdf files in the bucket
(dont bother searching if we have previously saved a list of them in a pickle file)

In [2]:
# if files.pkl exists, load it
if os.path.exists("files.pkl"):
    with open("files.pkl", "rb") as f:
        files = pickle.load(f)
else:

    fs = fsspec.filesystem("gcs")
    files = fs.glob("ldeo-glaciology/RACMO_2km/snowmelt/*.nc.gz")
    files = [f"gcs://{file}" for file in files]
    len(files)
    # save the list files to disk using pickle
    with open("files.pkl", "wb") as f:
        pickle.dump(files, f)

## Define some functions for opening netcdfs that are compressed with gz

In [3]:
def load_gcs_gz_dataset(gcs_url):
    """
    Load a gzipped NetCDF dataset from Google Cloud Storage (GCS) using xarray.
    
    Parameters:
    gcs_url (str): The GCS URL of the gzipped NetCDF file.
    
    Returns:
    xarray.Dataarray: The loaded dataset.
    """
    # Open the GCS file as binary
    with fsspec.open(gcs_url, mode='rb') as gcs_file:
        # Decompress with gzip
        with gzip.GzipFile(fileobj=gcs_file, mode='rb') as gz_file:
            # Load with xarray using netCDF4 engine
            return xr.open_dataset(gz_file, chunks={"x": -1, "y": -1, "time": 1})

def extract_snowmelt_DA(gcs_url):
    ds = load_gcs_gz_dataset(gcs_url)
    ds.snowmeltcorr.attrs = ds.snowmeltcorr.attrs | ds.attrs
    return ds


## load each netcdf and save a zarr locally

In [5]:
#!rm -rf temp_zarrs/*
for i, file in enumerate(tqdm.tqdm(files)):
    #print(f"Processing file {i+1}/{len(files)}: {file}")
    ds = extract_snowmelt_DA(file)
    
    ds.to_zarr(f"temp_zarrs/snowmelt{i}.zarr") # save each to a zarr on my laptop
    

100%|██████████| 52/52 [15:14<00:00, 17.58s/it]


## open each zarr lazily and put them in a list

In [None]:
ds_list = [xr.open_dataset(f"temp_zarrs/snowmelt{i}.zarr", chunks={"x": -1, "y": -1, "time": 1}, decode_cf=False) for i in range(len(files))]

## concatenate the datasets 

In [83]:
all = xr.concat(ds_list, dim="time")

## get a token for writing to the bucket

In [None]:
with open('/Users/jkingslake/Documents/science/ldeo-glaciology-bc97b12df06b.json') as token_file:
    token = json.load(token_file)

## start a local dask cluster

In [52]:
from dask.distributed import Client, LocalCluster
client = Client()
client

0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status,

0,1
Dashboard: http://127.0.0.1:8787/status,Workers: 4
Total threads: 8,Total memory: 24.00 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:49290,Workers: 0
Dashboard: http://127.0.0.1:8787/status,Total threads: 0
Started: Just now,Total memory: 0 B

0,1
Comm: tcp://127.0.0.1:49303,Total threads: 2
Dashboard: http://127.0.0.1:49311/status,Memory: 6.00 GiB
Nanny: tcp://127.0.0.1:49293,
Local directory: /var/folders/kl/3mt9f4qs1559xwy3mr60s7980000gp/T/dask-scratch-space/worker-83luve8n,Local directory: /var/folders/kl/3mt9f4qs1559xwy3mr60s7980000gp/T/dask-scratch-space/worker-83luve8n

0,1
Comm: tcp://127.0.0.1:49304,Total threads: 2
Dashboard: http://127.0.0.1:49305/status,Memory: 6.00 GiB
Nanny: tcp://127.0.0.1:49295,
Local directory: /var/folders/kl/3mt9f4qs1559xwy3mr60s7980000gp/T/dask-scratch-space/worker-3t5q7paa,Local directory: /var/folders/kl/3mt9f4qs1559xwy3mr60s7980000gp/T/dask-scratch-space/worker-3t5q7paa

0,1
Comm: tcp://127.0.0.1:49301,Total threads: 2
Dashboard: http://127.0.0.1:49307/status,Memory: 6.00 GiB
Nanny: tcp://127.0.0.1:49297,
Local directory: /var/folders/kl/3mt9f4qs1559xwy3mr60s7980000gp/T/dask-scratch-space/worker-93j8po1_,Local directory: /var/folders/kl/3mt9f4qs1559xwy3mr60s7980000gp/T/dask-scratch-space/worker-93j8po1_

0,1
Comm: tcp://127.0.0.1:49302,Total threads: 2
Dashboard: http://127.0.0.1:49306/status,Memory: 6.00 GiB
Nanny: tcp://127.0.0.1:49299,
Local directory: /var/folders/kl/3mt9f4qs1559xwy3mr60s7980000gp/T/dask-scratch-space/worker-qjle37sf,Local directory: /var/folders/kl/3mt9f4qs1559xwy3mr60s7980000gp/T/dask-scratch-space/worker-qjle37sf


## Save to zarr with desired chunking to google cloud

In [None]:
mapper = fsspec.get_mapper('gs://ldeo-glaciology/RACMO_2km/snowmelt.zarr', mode='w', token=token)

array_plan = rechunk(source = all, 
                     target_chunks = {"x": -1, "y": -1, "time": 10}, 
                     max_mem = "500MB", 
                     target_store = mapper,
                     temp_store='rechunking_temp')
array_plan

Unnamed: 0,Array,Chunk
Bytes,113.16 GiB,24.41 MiB
Shape,"(4748, 2303, 2778)","(1, 2303, 2778)"
Dask graph,4748 chunks in 105 graph layers,4748 chunks in 105 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 113.16 GiB 24.41 MiB Shape (4748, 2303, 2778) (1, 2303, 2778) Dask graph 4748 chunks in 105 graph layers Data type float32 numpy.ndarray",2778  2303  4748,

Unnamed: 0,Array,Chunk
Bytes,113.16 GiB,24.41 MiB
Shape,"(4748, 2303, 2778)","(1, 2303, 2778)"
Dask graph,4748 chunks in 105 graph layers,4748 chunks in 105 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray


In [100]:
with ProgressBar():
    array_plan.execute()
zarr.consolidate_metadata(mapper)

<zarr.hierarchy.Group '/'>

## check we can reload from the bucket

In [None]:
all_reloaded = xr.open_dataset(mapper, engine = 'zarr', chunks = {})
all_reloaded

Unnamed: 0,Array,Chunk
Bytes,113.16 GiB,244.05 MiB
Shape,"(4748, 2303, 2778)","(10, 2303, 2778)"
Dask graph,475 chunks in 2 graph layers,475 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 113.16 GiB 244.05 MiB Shape (4748, 2303, 2778) (10, 2303, 2778) Dask graph 475 chunks in 2 graph layers Data type float32 numpy.ndarray",2778  2303  4748,

Unnamed: 0,Array,Chunk
Bytes,113.16 GiB,244.05 MiB
Shape,"(4748, 2303, 2778)","(10, 2303, 2778)"
Dask graph,475 chunks in 2 graph layers,475 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray


## save a local copy too

In [102]:
mapper = fsspec.get_mapper('full_2km_zarr/nowmelt.zarr', mode='w', token=token)

array_plan = rechunk(source = all, 
                     target_chunks = {"x": -1, "y": -1, "time": 10}, 
                     max_mem = "500MB", 
                     target_store = mapper,
                     temp_store='rechunking_temp')
array_plan.execute()
zarr.consolidate_metadata(mapper)

<zarr.hierarchy.Group '/'>

## plotting

In [96]:
all.isel(time=200).snowmeltcorr.hvplot(y='y', x='x', aspect='equal', cmap='viridis', rasterize=True)

BokehModel(combine_events=True, render_bundle={'docs_json': {'b7f1b662-7e3c-478b-8b8b-a32a2915ff74': {'version…