# Extra dependencies

## Find Files of interest

In [2]:
import fsspec

data_dir = "s3://nasa-waterinsight/NLDAS3/forcing/daily/"

# Use fsspec to list files in the S3 bucket
fs = fsspec.filesystem("s3", anon=True)
files = fs.glob(data_dir + "**/*.nc")

## Produce a virtual dataset from the list of files

In [4]:
from virtualizarr import open_virtual_dataset, open_virtual_mfdataset
from virtualizarr.parsers import HDFParser
from virtualizarr.registry import ObjectStoreRegistry

import obstore

bucket = "s3://nasa-waterinsight"
store = obstore.store.from_url(bucket, region="us-west-2", skip_signature=True)

registry = ObjectStoreRegistry({bucket:store})

parser = HDFParser()

urls = ['s3://'+file for file in files]
vds = open_virtual_mfdataset(urls, parser=parser, registry=registry, parallel='lithops')

2025-08-01 20:32:26,002 [INFO] config.py:139 -- Lithops v3.6.1 - Python3.11
2025-08-01 20:32:26,004 [INFO] localhost.py:39 -- Localhost storage client created
2025-08-01 20:32:26,004 [INFO] localhost.py:78 -- Localhost compute v2 client created
2025-08-01 20:32:26,051 [INFO] config.py:139 -- Lithops v3.6.1 - Python3.11
2025-08-01 20:32:26,052 [INFO] localhost.py:39 -- Localhost storage client created
2025-08-01 20:32:26,052 [INFO] localhost.py:78 -- Localhost compute v2 client created
2025-08-01 20:32:26,105 [INFO] invokers.py:119 -- ExecutorID d62bfe-1 | JobID M000 - Selected Runtime: python 
2025-08-01 20:32:27,621 [INFO] invokers.py:186 -- ExecutorID d62bfe-1 | JobID M000 - Starting function invocation: _open() - Total: 8399 activations
2025-08-01 20:34:45,252 [INFO] invokers.py:225 -- ExecutorID d62bfe-1 | JobID M000 - View execution logs at /tmp/lithops-root/logs/d62bfe-1-M000.log
2025-08-01 20:34:45,500 [INFO] executors.py:494 -- ExecutorID d62bfe-1 - Getting results from 8399 fu

    0%|          | 0/8399  

2025-08-01 22:07:17,678 [INFO] executors.py:618 -- ExecutorID d62bfe-1 - Cleaning temporary data
  super().__init__(**codec_config)


In [5]:
vds

## Write (commit) the virtual dataset into icechunk

In [9]:
import icechunk

In [10]:
storage = icechunk.s3_storage(
    bucket='nasa-veda-scratch',
    prefix=f"jbusecke/NLDAS3/",
    anonymous=False,
    from_env=True,
)

config = icechunk.RepositoryConfig.default()
config.set_virtual_chunk_container(
    icechunk.VirtualChunkContainer(
        "s3://nasa-waterinsight/NLDAS3/forcing/daily/202201/",
        icechunk.s3_store(region='us-west-2')
    )
)

virtual_credentials = icechunk.containers_credentials(
    {
        "s3://nasa-waterinsight/NLDAS3/forcing/daily/202201/": icechunk.s3_anonymous_credentials()
    }
)
    
repo = icechunk.Repository.open_or_create(
    storage=storage,
    config=config,
    authorize_virtual_chunk_access=virtual_credentials,
)

session = repo.writable_session('main')
vds.vz.to_icechunk(session.store)
session.commit('First Try full dataset')

'GXR61CSGXB589T1JPGN0'

## Read the data back into xarray

In [1]:
%time

# self contained read that works on the hub

import icechunk
import xarray as xr
import zarr
# zarr.config.set({'async.concurrency':128}) # does this improve compute speed for the timeseries


storage = icechunk.s3_storage(
    bucket='nasa-veda-scratch',
    prefix=f"jbusecke/nasa-waterinsight-test-full/",
    anonymous=False,
    from_env=True,
)

config = icechunk.RepositoryConfig.default()
config.set_virtual_chunk_container(
    icechunk.VirtualChunkContainer(
        "s3://nasa-waterinsight/NLDAS3/forcing/daily/",
        icechunk.s3_store(region='us-west-2')
    )
)

virtual_credentials = icechunk.containers_credentials(
    {
        "s3://nasa-waterinsight/NLDAS3/forcing/daily/202201/": icechunk.s3_anonymous_credentials()
    }
)
    
repo = icechunk.Repository.open(
    storage=storage,
    config=config,
    authorize_virtual_chunk_access=virtual_credentials,
)

session = repo.readonly_session('main')
ds = xr.open_zarr(session.store, consolidated=False, zarr_version=3)
ds

CPU times: user 0 ns, sys: 3 μs, total: 3 μs
Wall time: 5.01 μs


  ds = xr.open_zarr(session.store, consolidated=False, zarr_version=3)
  super().__init__(**codec_config)


Unnamed: 0,Array,Chunk
Bytes,4.65 TiB,3.43 MiB
Shape,"(8399, 6500, 11700)","(1, 500, 900)"
Dask graph,1419431 chunks in 2 graph layers,1419431 chunks in 2 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 4.65 TiB 3.43 MiB Shape (8399, 6500, 11700) (1, 500, 900) Dask graph 1419431 chunks in 2 graph layers Data type float64 numpy.ndarray",11700  6500  8399,

Unnamed: 0,Array,Chunk
Bytes,4.65 TiB,3.43 MiB
Shape,"(8399, 6500, 11700)","(1, 500, 900)"
Dask graph,1419431 chunks in 2 graph layers,1419431 chunks in 2 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,4.65 TiB,3.43 MiB
Shape,"(8399, 6500, 11700)","(1, 500, 900)"
Dask graph,1419431 chunks in 2 graph layers,1419431 chunks in 2 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 4.65 TiB 3.43 MiB Shape (8399, 6500, 11700) (1, 500, 900) Dask graph 1419431 chunks in 2 graph layers Data type float64 numpy.ndarray",11700  6500  8399,

Unnamed: 0,Array,Chunk
Bytes,4.65 TiB,3.43 MiB
Shape,"(8399, 6500, 11700)","(1, 500, 900)"
Dask graph,1419431 chunks in 2 graph layers,1419431 chunks in 2 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,4.65 TiB,3.43 MiB
Shape,"(8399, 6500, 11700)","(1, 500, 900)"
Dask graph,1419431 chunks in 2 graph layers,1419431 chunks in 2 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 4.65 TiB 3.43 MiB Shape (8399, 6500, 11700) (1, 500, 900) Dask graph 1419431 chunks in 2 graph layers Data type float64 numpy.ndarray",11700  6500  8399,

Unnamed: 0,Array,Chunk
Bytes,4.65 TiB,3.43 MiB
Shape,"(8399, 6500, 11700)","(1, 500, 900)"
Dask graph,1419431 chunks in 2 graph layers,1419431 chunks in 2 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,4.65 TiB,3.43 MiB
Shape,"(8399, 6500, 11700)","(1, 500, 900)"
Dask graph,1419431 chunks in 2 graph layers,1419431 chunks in 2 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 4.65 TiB 3.43 MiB Shape (8399, 6500, 11700) (1, 500, 900) Dask graph 1419431 chunks in 2 graph layers Data type float64 numpy.ndarray",11700  6500  8399,

Unnamed: 0,Array,Chunk
Bytes,4.65 TiB,3.43 MiB
Shape,"(8399, 6500, 11700)","(1, 500, 900)"
Dask graph,1419431 chunks in 2 graph layers,1419431 chunks in 2 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,4.65 TiB,3.43 MiB
Shape,"(8399, 6500, 11700)","(1, 500, 900)"
Dask graph,1419431 chunks in 2 graph layers,1419431 chunks in 2 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 4.65 TiB 3.43 MiB Shape (8399, 6500, 11700) (1, 500, 900) Dask graph 1419431 chunks in 2 graph layers Data type float64 numpy.ndarray",11700  6500  8399,

Unnamed: 0,Array,Chunk
Bytes,4.65 TiB,3.43 MiB
Shape,"(8399, 6500, 11700)","(1, 500, 900)"
Dask graph,1419431 chunks in 2 graph layers,1419431 chunks in 2 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,4.65 TiB,3.43 MiB
Shape,"(8399, 6500, 11700)","(1, 500, 900)"
Dask graph,1419431 chunks in 2 graph layers,1419431 chunks in 2 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 4.65 TiB 3.43 MiB Shape (8399, 6500, 11700) (1, 500, 900) Dask graph 1419431 chunks in 2 graph layers Data type float64 numpy.ndarray",11700  6500  8399,

Unnamed: 0,Array,Chunk
Bytes,4.65 TiB,3.43 MiB
Shape,"(8399, 6500, 11700)","(1, 500, 900)"
Dask graph,1419431 chunks in 2 graph layers,1419431 chunks in 2 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,4.65 TiB,3.43 MiB
Shape,"(8399, 6500, 11700)","(1, 500, 900)"
Dask graph,1419431 chunks in 2 graph layers,1419431 chunks in 2 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 4.65 TiB 3.43 MiB Shape (8399, 6500, 11700) (1, 500, 900) Dask graph 1419431 chunks in 2 graph layers Data type float64 numpy.ndarray",11700  6500  8399,

Unnamed: 0,Array,Chunk
Bytes,4.65 TiB,3.43 MiB
Shape,"(8399, 6500, 11700)","(1, 500, 900)"
Dask graph,1419431 chunks in 2 graph layers,1419431 chunks in 2 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,4.65 TiB,3.43 MiB
Shape,"(8399, 6500, 11700)","(1, 500, 900)"
Dask graph,1419431 chunks in 2 graph layers,1419431 chunks in 2 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 4.65 TiB 3.43 MiB Shape (8399, 6500, 11700) (1, 500, 900) Dask graph 1419431 chunks in 2 graph layers Data type float64 numpy.ndarray",11700  6500  8399,

Unnamed: 0,Array,Chunk
Bytes,4.65 TiB,3.43 MiB
Shape,"(8399, 6500, 11700)","(1, 500, 900)"
Dask graph,1419431 chunks in 2 graph layers,1419431 chunks in 2 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,4.65 TiB,3.43 MiB
Shape,"(8399, 6500, 11700)","(1, 500, 900)"
Dask graph,1419431 chunks in 2 graph layers,1419431 chunks in 2 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 4.65 TiB 3.43 MiB Shape (8399, 6500, 11700) (1, 500, 900) Dask graph 1419431 chunks in 2 graph layers Data type float64 numpy.ndarray",11700  6500  8399,

Unnamed: 0,Array,Chunk
Bytes,4.65 TiB,3.43 MiB
Shape,"(8399, 6500, 11700)","(1, 500, 900)"
Dask graph,1419431 chunks in 2 graph layers,1419431 chunks in 2 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,4.65 TiB,3.43 MiB
Shape,"(8399, 6500, 11700)","(1, 500, 900)"
Dask graph,1419431 chunks in 2 graph layers,1419431 chunks in 2 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 4.65 TiB 3.43 MiB Shape (8399, 6500, 11700) (1, 500, 900) Dask graph 1419431 chunks in 2 graph layers Data type float64 numpy.ndarray",11700  6500  8399,

Unnamed: 0,Array,Chunk
Bytes,4.65 TiB,3.43 MiB
Shape,"(8399, 6500, 11700)","(1, 500, 900)"
Dask graph,1419431 chunks in 2 graph layers,1419431 chunks in 2 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray


## Testing a timeseries at a single location

In [22]:
from dask.diagnostics import ProgressBar
with ProgressBar():
    timeseries = ds.sel(lon=-74.00, lat=40.71, method='nearest').load() # load timeseries close to NYC into memory

[########################################] | 100% Completed | 145.31 s


## Failed tests
### Dask distributed

Fails with a deserialization error. 

In [2]:
from distributed import Client
client = Client()
client

timeseries = ds.sel(lon=-74.00, lat=40.71, method='nearest').load() # load timeseries close to NYC into memory

0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: /user/jbusecke/proxy/8787/status,

0,1
Dashboard: /user/jbusecke/proxy/8787/status,Workers: 4
Total threads: 16,Total memory: 60.62 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:42319,Workers: 0
Dashboard: /user/jbusecke/proxy/8787/status,Total threads: 0
Started: Just now,Total memory: 0 B

0,1
Comm: tcp://127.0.0.1:43203,Total threads: 4
Dashboard: /user/jbusecke/proxy/35139/status,Memory: 15.16 GiB
Nanny: tcp://127.0.0.1:33509,
Local directory: /tmp/dask-scratch-space/worker-boon2ose,Local directory: /tmp/dask-scratch-space/worker-boon2ose

0,1
Comm: tcp://127.0.0.1:39587,Total threads: 4
Dashboard: /user/jbusecke/proxy/43501/status,Memory: 15.16 GiB
Nanny: tcp://127.0.0.1:39241,
Local directory: /tmp/dask-scratch-space/worker-rrkziwy0,Local directory: /tmp/dask-scratch-space/worker-rrkziwy0

0,1
Comm: tcp://127.0.0.1:41091,Total threads: 4
Dashboard: /user/jbusecke/proxy/34417/status,Memory: 15.16 GiB
Nanny: tcp://127.0.0.1:35687,
Local directory: /tmp/dask-scratch-space/worker-x34xqqyd,Local directory: /tmp/dask-scratch-space/worker-x34xqqyd

0,1
Comm: tcp://127.0.0.1:42073,Total threads: 4
Dashboard: /user/jbusecke/proxy/40087/status,Memory: 15.16 GiB
Nanny: tcp://127.0.0.1:38163,
Local directory: /tmp/dask-scratch-space/worker-x7uhg1rw,Local directory: /tmp/dask-scratch-space/worker-x7uhg1rw


2025-08-01 22:33:15,140 - distributed.scheduler - ERROR - Error during deserialization of the task graph. This frequently
occurs if the Scheduler and Client have different environments.
For more information, see
https://docs.dask.org/en/stable/deployment-considerations.html#consistent-software-environments



In [None]:
timeseries.plot()

### Testing performance impact of concurrency

I changed the `async.concurrency` from 10 to 128 with no noticeable difference
- 142.56 s before changing zarr config
- 145.31 s with concurrency 128