In [9]:
import dask
from dask.distributed import Client, progress, LocalCluster, performance_report
import numpy as np
import pathlib
import pandas as pd
from rechunker import rechunk
import shutil
import fsspec
import xarray as xr
import zarr
from datetime import datetime

In [10]:
n_workers = 10
scheduler = LocalCluster(n_workers=n_workers, threads_per_worker=2, processes=False)
client = Client(scheduler)
client.dashboard_link

Perhaps you already have a cluster running?
Hosting the HTTP server on port 59495 instead


'http://10.104.4.82:59495/status'

In [11]:
nwm_url = 's3://noaa-nwm-retrospective-2-1-zarr-pds/chrtout.zarr'
start_date = datetime.fromisoformat("2000-01-01")
end_date = datetime.fromisoformat("2002-12-31")
variables = ["streamflow", "velocity"]
# comids = [2043493, 20873280, 7590453]
comids = [2043493]

start_string = f"{start_date.year}-{start_date.month}-{start_date.day}"
end_string = f"{end_date.year}-{end_date.month}-{end_date.day}"

start_n = np.datetime64(start_date)
end_n = np.datetime64(end_date)

time_steps = int((end_date - start_date).total_seconds() / 3600)
time_steps

26280

In [12]:
%%time
ds0 = xr.open_zarr(fsspec.get_mapper(nwm_url, anon=True), consolidated=True, chunks='auto', decode_times=True)
ds0

CPU times: total: 17.8 s
Wall time: 52.6 s


Unnamed: 0,Array,Chunk
Bytes,10.59 MiB,10.59 MiB
Shape,"(2776738,)","(2776738,)"
Count,2 Tasks,1 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 10.59 MiB 10.59 MiB Shape (2776738,) (2776738,) Count 2 Tasks 1 Chunks Type float32 numpy.ndarray",2776738  1,

Unnamed: 0,Array,Chunk
Bytes,10.59 MiB,10.59 MiB
Shape,"(2776738,)","(2776738,)"
Count,2 Tasks,1 Chunks
Type,float32,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,39.72 MiB,39.72 MiB
Shape,"(2776738,)","(2776738,)"
Count,2 Tasks,1 Chunks
Type,|S15,numpy.ndarray
"Array Chunk Bytes 39.72 MiB 39.72 MiB Shape (2776738,) (2776738,) Count 2 Tasks 1 Chunks Type |S15 numpy.ndarray",2776738  1,

Unnamed: 0,Array,Chunk
Bytes,39.72 MiB,39.72 MiB
Shape,"(2776738,)","(2776738,)"
Count,2 Tasks,1 Chunks
Type,|S15,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,10.59 MiB,10.59 MiB
Shape,"(2776738,)","(2776738,)"
Count,2 Tasks,1 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 10.59 MiB 10.59 MiB Shape (2776738,) (2776738,) Count 2 Tasks 1 Chunks Type float32 numpy.ndarray",2776738  1,

Unnamed: 0,Array,Chunk
Bytes,10.59 MiB,10.59 MiB
Shape,"(2776738,)","(2776738,)"
Count,2 Tasks,1 Chunks
Type,float32,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,10.59 MiB,10.59 MiB
Shape,"(2776738,)","(2776738,)"
Count,2 Tasks,1 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 10.59 MiB 10.59 MiB Shape (2776738,) (2776738,) Count 2 Tasks 1 Chunks Type float32 numpy.ndarray",2776738  1,

Unnamed: 0,Array,Chunk
Bytes,10.59 MiB,10.59 MiB
Shape,"(2776738,)","(2776738,)"
Count,2 Tasks,1 Chunks
Type,float32,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,10.59 MiB,10.59 MiB
Shape,"(2776738,)","(2776738,)"
Count,2 Tasks,1 Chunks
Type,int32,numpy.ndarray
"Array Chunk Bytes 10.59 MiB 10.59 MiB Shape (2776738,) (2776738,) Count 2 Tasks 1 Chunks Type int32 numpy.ndarray",2776738  1,

Unnamed: 0,Array,Chunk
Bytes,10.59 MiB,10.59 MiB
Shape,"(2776738,)","(2776738,)"
Count,2 Tasks,1 Chunks
Type,int32,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,7.42 TiB,153.81 MiB
Shape,"(367439, 2776738)","(672, 30000)"
Count,50872 Tasks,50871 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 7.42 TiB 153.81 MiB Shape (367439, 2776738) (672, 30000) Count 50872 Tasks 50871 Chunks Type float64 numpy.ndarray",2776738  367439,

Unnamed: 0,Array,Chunk
Bytes,7.42 TiB,153.81 MiB
Shape,"(367439, 2776738)","(672, 30000)"
Count,50872 Tasks,50871 Chunks
Type,float64,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,7.42 TiB,153.81 MiB
Shape,"(367439, 2776738)","(672, 30000)"
Count,50872 Tasks,50871 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 7.42 TiB 153.81 MiB Shape (367439, 2776738) (672, 30000) Count 50872 Tasks 50871 Chunks Type float64 numpy.ndarray",2776738  367439,

Unnamed: 0,Array,Chunk
Bytes,7.42 TiB,153.81 MiB
Shape,"(367439, 2776738)","(672, 30000)"
Count,50872 Tasks,50871 Chunks
Type,float64,numpy.ndarray


In [5]:
%%time
import copy
ds1 = (ds0.where(ds0.feature_id.isin(comids), drop=True).where(ds0.time>=start_n, drop=True).where(ds0.time<=end_n, drop=True))
# ds1 = copy.copy(ds0)
ds1 = ds1.drop('crs')
ds1

CPU times: total: 594 ms
Wall time: 767 ms


Unnamed: 0,Array,Chunk
Bytes,4 B,4 B
Shape,"(1,)","(1,)"
Count,6 Tasks,1 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 4 B 4 B Shape (1,) (1,) Count 6 Tasks 1 Chunks Type float32 numpy.ndarray",1  1,

Unnamed: 0,Array,Chunk
Bytes,4 B,4 B
Shape,"(1,)","(1,)"
Count,6 Tasks,1 Chunks
Type,float32,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,15 B,15 B
Shape,"(1,)","(1,)"
Count,6 Tasks,1 Chunks
Type,|S15,numpy.ndarray
"Array Chunk Bytes 15 B 15 B Shape (1,) (1,) Count 6 Tasks 1 Chunks Type |S15 numpy.ndarray",1  1,

Unnamed: 0,Array,Chunk
Bytes,15 B,15 B
Shape,"(1,)","(1,)"
Count,6 Tasks,1 Chunks
Type,|S15,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,4 B,4 B
Shape,"(1,)","(1,)"
Count,6 Tasks,1 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 4 B 4 B Shape (1,) (1,) Count 6 Tasks 1 Chunks Type float32 numpy.ndarray",1  1,

Unnamed: 0,Array,Chunk
Bytes,4 B,4 B
Shape,"(1,)","(1,)"
Count,6 Tasks,1 Chunks
Type,float32,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,4 B,4 B
Shape,"(1,)","(1,)"
Count,6 Tasks,1 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 4 B 4 B Shape (1,) (1,) Count 6 Tasks 1 Chunks Type float32 numpy.ndarray",1  1,

Unnamed: 0,Array,Chunk
Bytes,4 B,4 B
Shape,"(1,)","(1,)"
Count,6 Tasks,1 Chunks
Type,float32,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,4 B,4 B
Shape,"(1,)","(1,)"
Count,6 Tasks,1 Chunks
Type,int32,numpy.ndarray
"Array Chunk Bytes 4 B 4 B Shape (1,) (1,) Count 6 Tasks 1 Chunks Type int32 numpy.ndarray",1  1,

Unnamed: 0,Array,Chunk
Bytes,4 B,4 B
Shape,"(1,)","(1,)"
Count,6 Tasks,1 Chunks
Type,int32,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,205.32 kiB,5.25 kiB
Shape,"(26281, 1)","(672, 1)"
Count,53229 Tasks,40 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 205.32 kiB 5.25 kiB Shape (26281, 1) (672, 1) Count 53229 Tasks 40 Chunks Type float64 numpy.ndarray",1  26281,

Unnamed: 0,Array,Chunk
Bytes,205.32 kiB,5.25 kiB
Shape,"(26281, 1)","(672, 1)"
Count,53229 Tasks,40 Chunks
Type,float64,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,205.32 kiB,5.25 kiB
Shape,"(26281, 1)","(672, 1)"
Count,53229 Tasks,40 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 205.32 kiB 5.25 kiB Shape (26281, 1) (672, 1) Count 53229 Tasks 40 Chunks Type float64 numpy.ndarray",1  26281,

Unnamed: 0,Array,Chunk
Bytes,205.32 kiB,5.25 kiB
Shape,"(26281, 1)","(672, 1)"
Count,53229 Tasks,40 Chunks
Type,float64,numpy.ndarray


In [6]:
%%time
with dask.config.set(**{'array.slicing.split_large_chunks': True}):
    ds_data = ds1[variables].load(optimize_graph=True, traverse=False)
ds_data

CPU times: total: 2min 43s
Wall time: 3min 35s


In [7]:
%%time
ds_final = ds_data.squeeze('feature_id').to_dataframe()
ds_final

CPU times: total: 0 ns
Wall time: 6 ms


Unnamed: 0_level_0,streamflow,velocity,elevation,feature_id,gage_id,latitude,longitude,order
time,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1
2000-01-01 00:00:00,0.44,0.39,347.089996,2043493,b' ',34.431168,-83.742455,2
2000-01-01 01:00:00,0.44,0.39,347.089996,2043493,b' ',34.431168,-83.742455,2
2000-01-01 02:00:00,0.44,0.39,347.089996,2043493,b' ',34.431168,-83.742455,2
2000-01-01 03:00:00,0.44,0.39,347.089996,2043493,b' ',34.431168,-83.742455,2
2000-01-01 04:00:00,0.44,0.39,347.089996,2043493,b' ',34.431168,-83.742455,2
...,...,...,...,...,...,...,...,...
2002-12-30 20:00:00,1.07,0.53,347.089996,2043493,b' ',34.431168,-83.742455,2
2002-12-30 21:00:00,1.07,0.53,347.089996,2043493,b' ',34.431168,-83.742455,2
2002-12-30 22:00:00,1.02,0.52,347.089996,2043493,b' ',34.431168,-83.742455,2
2002-12-30 23:00:00,1.02,0.52,347.089996,2043493,b' ',34.431168,-83.742455,2


In [8]:
%%time
with dask.config.set(**{'array.slicing.split_large_chunks': True}):
    ds_data = ds1.sel(time=slice(start_string, end_string)).compute(optimize_graph=True, traverse=False)
ds_data

CPU times: total: 0 ns
Wall time: 10 ms


In [None]:
%%time
ds_data2 = (ds1.where(ds1.time.isin(slice(start_date, end_date))).compute(optimize_graph=True, traverse=False))
ds_data2

In [None]:
dim_chunk_sizes = {'feature_id': 1, 'time': time_steps}
ds2 = ds1.chunk(chunks=dim_chunk_sizes)

chunk_plan = {}
for vv in ds2.variables:
    if vv in ['streamflow', 'velocity']:
        chunk_plan[vv] = tuple((dim_chunk_sizes[tt] for tt in ds2[vv].dims))
    else: 
        chunk_plan[vv] = ds2[vv].shape
    ds2[vv].encoding['chunks'] = None  # seems redundant, with ds.chunk() ?
    

In [None]:
%%time
z_store = zarr.storage.MemoryStore()
temp_store = zarr.storage.MemoryStore()
ds2_rechunked = rechunk(source=ds2, target_chunks=chunk_plan, max_mem="16Gb", target_store=z_store, temp_store=temp_store, executor='dask')
ds2_rechunked.execute()

In [None]:
%%time
ds_data = ds2_rechunked.isel(feature_id=comids).load(optimize_graph=False, traverse=False)
ds_data

In [None]:
%%time
ds_final = ds_data.squeeze('feature_id').to_dataframe()
ds_final

In [None]:
%%time
with dask.config.set(**{'array.slicing.split_large_chunks': True}):
    ds_data = ds[variables].sel(feature_id=comids).sel(time=slice(start_string, end_string)).load(optimize_graph=False, traverse=False)
ds_data

In [None]:
%%time
ds_data = ds[variables].sel(feature_id=comids).sel(time=slice(start_string, end_string)).load(optimize_graph=True, traverse=False)
ds_data

In [None]:
%%time
with dask.config.set(**{'array.slicing.split_large_chunks': True}):
    ds_data = ds[variables].sel(feature_id=comids).sel(time=slice(start_string, end_string)).load(optimize_graph=True, traverse=False)
ds_data