### Create zarr stores

This notebook is used for creating zarr stores from large collections of netcdf files. 
Due to the large memory use when using raw netcdf outputs (croco outputs could be 1GB per month without problems), i've found that transforming all the dataset to a zarr store as the first postprocess task it will highly increase the performance of the next operations, especially when using dask for parallel computing. This notebook is heavily based on the Raphael Dussing article about modern tools for large ROMS simulations https://raphaeldussin.medium.com/modern-python-tools-for-the-roms-ocean-model-bfca8642db01

In [1]:
import warnings
warnings.filterwarnings("ignore")

In [2]:
import xarray as xr
import pandas as pd
import numpy as np
import datetime

from glob import glob
import sys
sys.path.append('../')
from utils import *
from load import *
from numerics import *

from dask.distributed import LocalCluster, Client
import gc
cluster = LocalCluster(n_workers=20, threads_per_worker=1, memory_limit='auto')
client  = Client(cluster,asynchronous=True)
client

0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status,

0,1
Dashboard: http://127.0.0.1:8787/status,Workers: 20
Total threads: 20,Total memory: 52.43 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:43611,Workers: 20
Dashboard: http://127.0.0.1:8787/status,Total threads: 20
Started: Just now,Total memory: 52.43 GiB

0,1
Comm: tcp://127.0.0.1:42002,Total threads: 1
Dashboard: http://127.0.0.1:38427/status,Memory: 2.62 GiB
Nanny: tcp://127.0.0.1:39033,
Local directory: /tmp/dask-worker-space/worker-ff8cw09o,Local directory: /tmp/dask-worker-space/worker-ff8cw09o

0,1
Comm: tcp://127.0.0.1:33802,Total threads: 1
Dashboard: http://127.0.0.1:38154/status,Memory: 2.62 GiB
Nanny: tcp://127.0.0.1:38936,
Local directory: /tmp/dask-worker-space/worker-ru0y5__o,Local directory: /tmp/dask-worker-space/worker-ru0y5__o

0,1
Comm: tcp://127.0.0.1:38466,Total threads: 1
Dashboard: http://127.0.0.1:43498/status,Memory: 2.62 GiB
Nanny: tcp://127.0.0.1:46759,
Local directory: /tmp/dask-worker-space/worker-1huala49,Local directory: /tmp/dask-worker-space/worker-1huala49

0,1
Comm: tcp://127.0.0.1:45700,Total threads: 1
Dashboard: http://127.0.0.1:42613/status,Memory: 2.62 GiB
Nanny: tcp://127.0.0.1:34990,
Local directory: /tmp/dask-worker-space/worker-cd3fxshc,Local directory: /tmp/dask-worker-space/worker-cd3fxshc

0,1
Comm: tcp://127.0.0.1:34634,Total threads: 1
Dashboard: http://127.0.0.1:35923/status,Memory: 2.62 GiB
Nanny: tcp://127.0.0.1:45513,
Local directory: /tmp/dask-worker-space/worker-2antk2bv,Local directory: /tmp/dask-worker-space/worker-2antk2bv

0,1
Comm: tcp://127.0.0.1:39275,Total threads: 1
Dashboard: http://127.0.0.1:44661/status,Memory: 2.62 GiB
Nanny: tcp://127.0.0.1:44979,
Local directory: /tmp/dask-worker-space/worker-g8ysk8n9,Local directory: /tmp/dask-worker-space/worker-g8ysk8n9

0,1
Comm: tcp://127.0.0.1:42451,Total threads: 1
Dashboard: http://127.0.0.1:46358/status,Memory: 2.62 GiB
Nanny: tcp://127.0.0.1:34039,
Local directory: /tmp/dask-worker-space/worker-ld36q830,Local directory: /tmp/dask-worker-space/worker-ld36q830

0,1
Comm: tcp://127.0.0.1:35936,Total threads: 1
Dashboard: http://127.0.0.1:38821/status,Memory: 2.62 GiB
Nanny: tcp://127.0.0.1:41798,
Local directory: /tmp/dask-worker-space/worker-jhx6_4te,Local directory: /tmp/dask-worker-space/worker-jhx6_4te

0,1
Comm: tcp://127.0.0.1:35728,Total threads: 1
Dashboard: http://127.0.0.1:39125/status,Memory: 2.62 GiB
Nanny: tcp://127.0.0.1:38538,
Local directory: /tmp/dask-worker-space/worker-twvpaow9,Local directory: /tmp/dask-worker-space/worker-twvpaow9

0,1
Comm: tcp://127.0.0.1:41274,Total threads: 1
Dashboard: http://127.0.0.1:39872/status,Memory: 2.62 GiB
Nanny: tcp://127.0.0.1:32774,
Local directory: /tmp/dask-worker-space/worker-eirlywhx,Local directory: /tmp/dask-worker-space/worker-eirlywhx

0,1
Comm: tcp://127.0.0.1:43212,Total threads: 1
Dashboard: http://127.0.0.1:43965/status,Memory: 2.62 GiB
Nanny: tcp://127.0.0.1:41536,
Local directory: /tmp/dask-worker-space/worker-yge6wgoi,Local directory: /tmp/dask-worker-space/worker-yge6wgoi

0,1
Comm: tcp://127.0.0.1:41024,Total threads: 1
Dashboard: http://127.0.0.1:37207/status,Memory: 2.62 GiB
Nanny: tcp://127.0.0.1:35301,
Local directory: /tmp/dask-worker-space/worker-gytcp4o0,Local directory: /tmp/dask-worker-space/worker-gytcp4o0

0,1
Comm: tcp://127.0.0.1:33107,Total threads: 1
Dashboard: http://127.0.0.1:46242/status,Memory: 2.62 GiB
Nanny: tcp://127.0.0.1:43673,
Local directory: /tmp/dask-worker-space/worker-0wek65fm,Local directory: /tmp/dask-worker-space/worker-0wek65fm

0,1
Comm: tcp://127.0.0.1:40068,Total threads: 1
Dashboard: http://127.0.0.1:45289/status,Memory: 2.62 GiB
Nanny: tcp://127.0.0.1:38822,
Local directory: /tmp/dask-worker-space/worker-x87zsl1t,Local directory: /tmp/dask-worker-space/worker-x87zsl1t

0,1
Comm: tcp://127.0.0.1:40720,Total threads: 1
Dashboard: http://127.0.0.1:33276/status,Memory: 2.62 GiB
Nanny: tcp://127.0.0.1:44413,
Local directory: /tmp/dask-worker-space/worker-uljme192,Local directory: /tmp/dask-worker-space/worker-uljme192

0,1
Comm: tcp://127.0.0.1:45933,Total threads: 1
Dashboard: http://127.0.0.1:34780/status,Memory: 2.62 GiB
Nanny: tcp://127.0.0.1:46411,
Local directory: /tmp/dask-worker-space/worker-yug2_5gw,Local directory: /tmp/dask-worker-space/worker-yug2_5gw

0,1
Comm: tcp://127.0.0.1:39718,Total threads: 1
Dashboard: http://127.0.0.1:34657/status,Memory: 2.62 GiB
Nanny: tcp://127.0.0.1:44746,
Local directory: /tmp/dask-worker-space/worker-4y3ya7y0,Local directory: /tmp/dask-worker-space/worker-4y3ya7y0

0,1
Comm: tcp://127.0.0.1:45313,Total threads: 1
Dashboard: http://127.0.0.1:36338/status,Memory: 2.62 GiB
Nanny: tcp://127.0.0.1:33303,
Local directory: /tmp/dask-worker-space/worker-gw58p1e1,Local directory: /tmp/dask-worker-space/worker-gw58p1e1

0,1
Comm: tcp://127.0.0.1:37971,Total threads: 1
Dashboard: http://127.0.0.1:43391/status,Memory: 2.62 GiB
Nanny: tcp://127.0.0.1:35240,
Local directory: /tmp/dask-worker-space/worker-sxl1zlb6,Local directory: /tmp/dask-worker-space/worker-sxl1zlb6

0,1
Comm: tcp://127.0.0.1:44233,Total threads: 1
Dashboard: http://127.0.0.1:41602/status,Memory: 2.62 GiB
Nanny: tcp://127.0.0.1:42785,
Local directory: /tmp/dask-worker-space/worker-m9iewbe3,Local directory: /tmp/dask-worker-space/worker-m9iewbe3


---
### RUND0

In [27]:
#Load crocod0 grid
gridd0 = xr.open_dataset('data/DESALADORAS_RUND0/crocod0_grd.nc')
gridd0 = gridd0[['x_rho','y_rho','lon_rho','lat_rho','h','hraw','f','mask_rho']].squeeze()
gridd0

In [26]:
gridd0.to_zarr('data/ZARR/simsequia_rund0_control', consolidated=True, mode='w')
gridd0.to_zarr('data/ZARR/simsequia_rund0_rivers', consolidated=True, mode='w')

<xarray.backends.zarr.ZarrStore at 0x7f30f5487e40>

In [21]:
#Load croco simulations
paths_control   = sorted(glob('data/DESALADORAS_RUND0/*avg*'))[24:]
paths_rivers    = sorted(glob('data/DESALADORAS_RUND0_RIOS/*avg*'))

for v in ['zeta']:
    print('Processing variable: ',v)
    control = []
    rivers  = []
    for pc,pr in zip(paths_control, paths_rivers):
        print('control: '+pc.split('_')[-1],'rivers: '+pr.split('_')[-1], sep='\t')
        datac = load_croco(pc, YORIG='1949-12-31 00:00:00', chunks={'time':1}) #load_croco -> load.py
        datar = load_croco(pr, YORIG='1949-12-31 00:00:00', chunks={'time':1})
        
        datac = datac[v]
        datar = datar[v]
    
        control.append(datac)
        rivers.append(datar)
        del datac,datar
        client.run(trim_memory) # Check utils.py
    print('Joining...')
    control = xr.concat(control, 'time').sortby('time').chunk({'time':1, 'eta_rho':-1, 'xi_rho':-1})
    rivers  = xr.concat(rivers, 'time').sortby('time').chunk({'time':1, 'eta_rho':-1, 'xi_rho':-1})
    print('Saving to disk...')
    print('control')
    control.to_dataset(name=v).to_zarr('data/ZARR/simsequia_rund0_control', consolidated=True, mode='a')
    client.run(trim_memory)
    print('rivers')
    rivers.to_dataset(name=v).to_zarr('data/ZARR/simsequia_rund0_rivers', consolidated=True, mode='a')
    client.run(trim_memory)
    del control,rivers
    print('Done')
    

Processing variable:  zeta
control: Y2001M1.nc	rivers: Y2001M1.nc
control: Y2001M10.nc	rivers: Y2001M10.nc
control: Y2001M11.nc	rivers: Y2001M11.nc
control: Y2001M12.nc	rivers: Y2001M12.nc
control: Y2001M2.nc	rivers: Y2001M2.nc
control: Y2001M3.nc	rivers: Y2001M3.nc
control: Y2001M4.nc	rivers: Y2001M4.nc
control: Y2001M5.nc	rivers: Y2001M5.nc
control: Y2001M6.nc	rivers: Y2001M6.nc
control: Y2001M7.nc	rivers: Y2001M7.nc
control: Y2001M8.nc	rivers: Y2001M8.nc
control: Y2001M9.nc	rivers: Y2001M9.nc
control: Y2002M1.nc	rivers: Y2002M1.nc
control: Y2002M10.nc	rivers: Y2002M10.nc
control: Y2002M11.nc	rivers: Y2002M11.nc
control: Y2002M12.nc	rivers: Y2002M12.nc
control: Y2002M2.nc	rivers: Y2002M2.nc
control: Y2002M3.nc	rivers: Y2002M3.nc
control: Y2002M4.nc	rivers: Y2002M4.nc
control: Y2002M5.nc	rivers: Y2002M5.nc
control: Y2002M6.nc	rivers: Y2002M6.nc
control: Y2002M7.nc	rivers: Y2002M7.nc
control: Y2002M8.nc	rivers: Y2002M8.nc
control: Y2002M9.nc	rivers: Y2002M9.nc
control: Y2003M1.nc	river

In [22]:
Cs_r = xr.open_dataset('data/DESALADORAS_RUND0/crocod0_avg_Y1999M1.nc').Cs_r
hc   = xr.open_dataset('data/DESALADORAS_RUND0/crocod0_avg_Y1999M1.nc').hc

In [30]:
control = xr.open_zarr('data/ZARR/simsequia_rund0_control/')
rivers  = xr.open_zarr('data/ZARR/simsequia_rund0_rivers/')

zrho_control = rhopoints_depths(gridd0.h.expand_dims(dim={'s_rho':control.s_rho}), control.zeta, control.s_rho, Cs_r, hc)
zrho_rivers  = rhopoints_depths(gridd0.h.expand_dims(dim={'s_rho':rivers.s_rho}), rivers.zeta, rivers.s_rho, Cs_r, hc)

In [31]:
zrho_control.to_dataset(name='z_rho').to_zarr('data/ZARR/simsequia_rund0_control', consolidated=True, mode='a')
zrho_rivers.to_dataset(name='z_rho').to_zarr('data/ZARR/simsequia_rund0_rivers', consolidated=True, mode='a')

<xarray.backends.zarr.ZarrStore at 0x7f5b87e55740>

---

### RUND1

In [3]:
#Load crocod1 grid
gridd1 = xr.open_dataset('data/DESALADORAS_RUND1/crocod1_grd.nc')
gridd1 = gridd1[['x_rho','y_rho','lon_rho','lat_rho','h','hraw','f','mask_rho']].squeeze()
gridd1

In [4]:
gridd1.to_zarr('data/ZARR/simsequia_rund1_control', consolidated=True, mode='w')
gridd1.to_zarr('data/ZARR/simsequia_rund1_rivers', consolidated=True, mode='w')

<xarray.backends.zarr.ZarrStore at 0x7f4a9044eeb0>

In [21]:
#Load croco simulations
paths_control   = sorted(glob('data/DESALADORAS_RUND1/*avg*'))[12:]
paths_rivers    = sorted(glob('data/DESALADORAS_RUND1_RIOS/*avg*'))

for v in ['hbl','zeta','u','v','temp','salt']:
    print('Processing variable: ',v)
    control = []
    rivers  = []
    for pc,pr in zip(paths_control, paths_rivers):
        print('control: '+pc.split('_')[-1],'rivers: '+pr.split('_')[-1], sep='\t')
        datac = load_croco(pc, YORIG='1949-12-31 00:00:00', chunks={'time':1}) #load_croco -> load.py
        datar = load_croco(pr, YORIG='1949-12-31 00:00:00', chunks={'time':1})
        
        datac = datac[v]
        datar = datar[v]
    
        control.append(datac)
        rivers.append(datar)
        del datac,datar
        client.run(trim_memory) # Check utils.py
    print('Joining...')
    control = xr.concat(control, 'time').sortby('time').chunk({'time':1, 'eta_rho':-1, 'xi_rho':-1})
    rivers  = xr.concat(rivers, 'time').sortby('time').chunk({'time':1, 'eta_rho':-1, 'xi_rho':-1})
    print('Saving to disk...')
    print('control')
    control.to_dataset(name=v).to_zarr('data/ZARR/simsequia_rund1_control', consolidated=True, mode='a')
    client.run(trim_memory)
    print('rivers')
    rivers.to_dataset(name=v).to_zarr('data/ZARR/simsequia_rund1_rivers', consolidated=True, mode='a')
    client.run(trim_memory)
    del control,rivers
    print('Done')
    

Processing variable:  hbl
control: Y2001M1.nc	rivers: Y2001M1.nc
control: Y2001M10.nc	rivers: Y2001M10.nc
control: Y2001M11.nc	rivers: Y2001M11.nc
control: Y2001M12.nc	rivers: Y2001M12.nc
control: Y2001M2.nc	rivers: Y2001M2.nc
control: Y2001M3.nc	rivers: Y2001M3.nc
control: Y2001M4.nc	rivers: Y2001M4.nc
control: Y2001M5.nc	rivers: Y2001M5.nc
control: Y2001M6.nc	rivers: Y2001M6.nc
control: Y2001M7.nc	rivers: Y2001M7.nc
control: Y2001M8.nc	rivers: Y2001M8.nc
control: Y2001M9.nc	rivers: Y2001M9.nc
Joining...
Saving to disk...
control
rivers
Done
Processing variable:  zeta
control: Y2001M1.nc	rivers: Y2001M1.nc
control: Y2001M10.nc	rivers: Y2001M10.nc
control: Y2001M11.nc	rivers: Y2001M11.nc
control: Y2001M12.nc	rivers: Y2001M12.nc
control: Y2001M2.nc	rivers: Y2001M2.nc
control: Y2001M3.nc	rivers: Y2001M3.nc
control: Y2001M4.nc	rivers: Y2001M4.nc
control: Y2001M5.nc	rivers: Y2001M5.nc
control: Y2001M6.nc	rivers: Y2001M6.nc
control: Y2001M7.nc	rivers: Y2001M7.nc
control: Y2001M8.nc	rivers: 

In [22]:
Cs_r = xr.open_dataset('data/DESALADORAS_RUND1/crocod1_avg_Y2001M1.nc').Cs_r
hc   = xr.open_dataset('data/DESALADORAS_RUND1/crocod1_avg_Y2001M1.nc').hc

In [23]:
control = xr.open_zarr('data/ZARR/simsequia_rund1_control/')
rivers  = xr.open_zarr('data/ZARR/simsequia_rund1_rivers/')

zrho_control = rhopoints_depths(gridd1.h.expand_dims(dim={'s_rho':control.s_rho}), control.zeta, control.s_rho, Cs_r, hc)
zrho_rivers  = rhopoints_depths(gridd1.h.expand_dims(dim={'s_rho':rivers.s_rho}), rivers.zeta, rivers.s_rho, Cs_r, hc)

In [31]:
zrho_control.chunk({'time':1, 'xi_rho':-1, 'eta_rho':-1, 's_rho':-1}).to_dataset(name='z_rho').to_zarr('data/ZARR/simsequia_rund1_control', consolidated=True, mode='a')
zrho_rivers.chunk({'time':1, 'xi_rho':-1, 'eta_rho':-1, 's_rho':-1}).to_dataset(name='z_rho').to_zarr('data/ZARR/simsequia_rund1_rivers', consolidated=True, mode='a')

<xarray.backends.zarr.ZarrStore at 0x7f48a3c3bac0>

---

In [32]:
client.shutdown()

2023-03-27 08:55:20,266 - distributed.client - ERROR - Failed to reconnect to scheduler after 30.00 seconds, closing client
