# Create references for [C]Worthy OAE dataset using VirtualiZarr

Tom Nicholas

17th December 2024

Running this in a 32GB Coiled notebook in the same aws region as the data (us-west-2)

## Problem:

- Create a virtual zarr store pointing to ~500,000 files on S3, representing ~50TB altogether.
- Each file has the same structure, but the files need to be concatenated along 3 independent ensemble dimensions to make a logical datacube.
- The large data variables need to be saved as virtual references, but the small variables should just be loaded and inlined directly in the store.

In [1]:
!pip install -U s3fs fsspec aiobotocore boto3

Collecting boto3
  Using cached boto3-1.37.37-py3-none-any.whl.metadata (6.7 kB)
INFO: pip is looking at multiple versions of boto3 to determine which version is compatible with other requirements. This could take a while.
  Using cached boto3-1.37.36-py3-none-any.whl.metadata (6.7 kB)
  Using cached boto3-1.37.35-py3-none-any.whl.metadata (6.7 kB)
  Using cached boto3-1.37.34-py3-none-any.whl.metadata (6.7 kB)
  Using cached boto3-1.37.33-py3-none-any.whl.metadata (6.7 kB)
  Using cached boto3-1.37.32-py3-none-any.whl.metadata (6.7 kB)
  Using cached boto3-1.37.31-py3-none-any.whl.metadata (6.7 kB)
  Using cached boto3-1.37.30-py3-none-any.whl.metadata (6.7 kB)
INFO: pip is still looking at multiple versions of boto3 to determine which version is compatible with other requirements. This could take a while.
  Using cached boto3-1.37.29-py3-none-any.whl.metadata (6.7 kB)
  Using cached boto3-1.37.28-py3-none-any.whl.metadata (6.7 kB)
  Using cached boto3-1.37.27-py3-none-any.whl.metadat

In [2]:
import xarray as xr
xr.__version__

'2025.3.1'

In [3]:
import s3fs

In [4]:
import glob

In [5]:
import virtualizarr as vz
vz.__version__

'1.3.3.dev40+ge92c0ef'

In [6]:
import zarr
zarr.__version__

'3.0.6'

## Open example file using xarray

Let's just look at what's inside one of these files first.

The data is stored at [https://source.coop/repositories/cworthy/oae-efficiency-atlas/](https://source.coop/repositories/cworthy/oae-efficiency-atlas/)

In [7]:
bucket_url = 's3://cworthy/oae-efficiency-atlas/'

In [8]:
single_file_url = bucket_url + 'data/experiments/000/01/alk-forcing.000-1999-01.pop.h.0347-01.nc'

In [9]:
s3fs_kwargs = {'anon': True, 'endpoint_url': 'https://data.source.coop'}

In [10]:
fs = s3fs.S3FileSystem(**s3fs_kwargs)

In [13]:
%%time
ds = xr.open_dataset(fs.open(single_file_url), engine='h5netcdf', decode_times=False)
ds

CPU times: user 6.95 s, sys: 2.51 s, total: 9.46 s
Wall time: 45.5 s


Whilst we can access each file like this, the issues with this approach are:
- The files comprise a logical datacube, and it would be much clearer for users to open that up as a lazily representation of the entire dataset immediately
- We could ask users to run `xr.open_mfdataset` with a specific incantation, but to open and combine 500,000 files with xarray would take hours, even if parallelized, because of
  - the need to scan the entirety of all the all the non-cloud-optimized netCDF files
  - the need to perform alignment checks when concatenating the contents of all the files together
- This expensive opening and combining operation would be needed every single time a user wants to access the dataset.

Instead we're going to open and combine only once, generating pointers to the chunks inside the files in the form of virtual references which we then cache to persistent storage.

### Specify which variables we want to load and which to generate virtual references for

In [11]:
# needed to do combine_by_coords, and each has only one value on each dataset
ENSEMBLE_DIMENSION_COORDS = [
    'elapsed_time',
    'polygon_id',
    'injection_date',
]
OTHER_DIMENSION_COORDS = [
    'time_bounds',
    'z_t',
    'z_w',
    'z_t_150m',
    'z_w_bot',
    'z_w_top',
]
# so tiny it takes less memory to store their values than to store references to them
SCALARS = [
    'T0_Kelvin',
    'cp_air',
    'cp_sw',
    'days_in_norm_year',
    'fwflux_factor',
    'grav',
    'heat_to_PW',
    'hflux_factor',
    'latent_heat_fusion',
    'latent_heat_fusion_mks',
    'latent_heat_vapor',
    'mass_to_Sv',
    'momentum_factor',
    'nsurface_t',
    'nsurface_u',
    'ocn_ref_salinity',
    'omega',
    'ppt_to_salt',
    'radius',
    'rho_air',
    'rho_fw',
    'salinity_factor',
    'salt_to_Svppt',
    'salt_to_mmday',
    'salt_to_ppt',
    'sea_ice_salinity',
    'sflux_factor',
    'sound',
    'vonkar',
    'stefan_boltzmann'
]
LOW_DIMENSIONAL_VARS = ENSEMBLE_DIMENSION_COORDS + OTHER_DIMENSION_COORDS+ SCALARS

## Open example file as virtual refs

In [17]:
%%time
vds = vz.open_virtual_dataset(
    single_file_url, 
    loadable_variables=LOW_DIMENSIONAL_VARS,
    decode_times=False,
    reader_options={'storage_options': s3fs_kwargs}
)
vds

  super().__init__(**codec_config)
  super().__init__(**codec_config)
  super().__init__(**codec_config)
  super().__init__(**codec_config)
  super().__init__(**codec_config)
  super().__init__(**codec_config)
  super().__init__(**codec_config)
  super().__init__(**codec_config)
  super().__init__(**codec_config)
  super().__init__(**codec_config)
  super().__init__(**codec_config)
  super().__init__(**codec_config)
  super().__init__(**codec_config)
  super().__init__(**codec_config)
  super().__init__(**codec_config)
  super().__init__(**codec_config)
  super().__init__(**codec_config)
  super().__init__(**codec_config)
  super().__init__(**codec_config)
  super().__init__(**codec_config)
  super().__init__(**codec_config)
  super().__init__(**codec_config)
  super().__init__(**codec_config)
  super().__init__(**codec_config)
  super().__init__(**codec_config)
  super().__init__(**codec_config)
  super().__init__(**codec_config)
  super().__init__(**codec_config)
  super().__init__(*

CPU times: user 14 s, sys: 4.87 s, total: 18.9 s
Wall time: 1min 30s


(Note: this is approximately twice as slow as lazily opening the xarray dataset with `xr.open_dataset` because the current implementation of `open_virtual_dataset` scans the entire file to generate the virtual references, then calls `xr.open_dataset` all over again to read the `loadable_variables`, so the whole file is being scanned twice. Implementing [virtualizarr issue #542](https://github.com/zarr-developers/VirtualiZarr/issues/542) would therefore likely cut this scanning time in half.)

### Check correctness

Let's double check that serializing the references then opening the data via the references gives the same result as just opening with xarray directly.

In [18]:
refs_dict = vds.virtualize.to_kerchunk(format='dict')



TODO use an in-memory icechunk store instead

In [19]:
result_ds = xr.open_dataset(refs_dict, engine="kerchunk")

ValueError: unrecognized engine 'kerchunk' must be one of your download engines: ['h5netcdf', 'store', 'zarr']. To install additional dependencies, see:
https://docs.xarray.dev/en/stable/user-guide/io.html 
https://docs.xarray.dev/en/stable/getting-started-guide/installing.html

### Estimate RAM use to virtualize entire dataset

How much RAM does it take to store this virtual dataset on the client?

In [20]:
vds.virtualize.nbytes / 1e3  # in kB

3.204

In [21]:
import sys

sys.getsizeof(vds)

120

Only 24kB. We need to do this for ~500,000 files, send all the results back over the network, then store them all in RAM in this notebook process. So if we would expect this Coiled notebook's machine to need about 500,000 * 24kB ~= 11.8GB of RAM to hold the virtual references to our entire 50TB dataset in memory at once.

## Paths to many files

Each directory contains 180 files, comprising one simulation. These need to be concatenated along time, before they can be concatenated with files in other directories.

In [12]:
single_simulation_path = bucket_url + 'data/experiments/000/01/'

In [13]:
single_simulation_paths = fs.ls(single_simulation_path)
single_simulation_urls = ['s3://' + path for path in single_simulation_paths]
single_simulation_urls[0:10]

['s3://cworthy/oae-efficiency-atlas/data/experiments/000/01/alk-forcing.000-1999-01.pop.h.0347-01.nc',
 's3://cworthy/oae-efficiency-atlas/data/experiments/000/01/alk-forcing.000-1999-01.pop.h.0347-02.nc',
 's3://cworthy/oae-efficiency-atlas/data/experiments/000/01/alk-forcing.000-1999-01.pop.h.0347-03.nc',
 's3://cworthy/oae-efficiency-atlas/data/experiments/000/01/alk-forcing.000-1999-01.pop.h.0347-04.nc',
 's3://cworthy/oae-efficiency-atlas/data/experiments/000/01/alk-forcing.000-1999-01.pop.h.0347-05.nc',
 's3://cworthy/oae-efficiency-atlas/data/experiments/000/01/alk-forcing.000-1999-01.pop.h.0347-06.nc',
 's3://cworthy/oae-efficiency-atlas/data/experiments/000/01/alk-forcing.000-1999-01.pop.h.0347-07.nc',
 's3://cworthy/oae-efficiency-atlas/data/experiments/000/01/alk-forcing.000-1999-01.pop.h.0347-08.nc',
 's3://cworthy/oae-efficiency-atlas/data/experiments/000/01/alk-forcing.000-1999-01.pop.h.0347-09.nc',
 's3://cworthy/oae-efficiency-atlas/data/experiments/000/01/alk-forcing.0

In [20]:
from xarray.backends.common import _find_absolute_paths

In [21]:
single_simulation_urls[0:1]

['s3://cworthy/oae-efficiency-atlas/data/experiments/000/01/alk-forcing.000-1999-01.pop.h.0347-01.nc']

In [22]:
_find_absolute_paths(single_simulation_urls[0:1])

['s3://cworthy/oae-efficiency-atlas/data/experiments/000/01/alk-forcing.000-1999-01.pop.h.0347-01.nc']

In [23]:
_find_absolute_paths(single_simulation_urls[0])

[<fsspec.mapping.FSMap at 0x7f7dec0e9c90>]

In [68]:
def is_globbable(path: str) -> bool:
    return any(el in path for el in ['*', '**', '[', ']'])
    

def absolute_paths(paths: str) -> list[str]:
    import fsspec
    
    fs, _, _ = fsspec.core.get_fs_token_paths(
        paths,
        mode="rb",
        storage_options={}.get("backend_kwargs", {}).get(
            "storage_options", {}
        ),
        expand=False,
    )
    
    if is_globbable(paths):
        print(fs._strip_protocol(paths))
        tmp_paths = fs.glob(fs._strip_protocol(paths))  # finds directories
        print(tmp_paths)
    else:
        tmp_paths = [paths]  # single string representing remote non-http uri that is absolute rather than a glob
    return [fs.get_mapper(path) for path in tmp_paths]

In [69]:
absolute_paths(single_simulation_urls[0])

[<fsspec.mapping.FSMap at 0x7f575690fd50>]

In [24]:
%%time
combined_vds_single_simulation = open_virtual_mfdataset(
    single_simulation_urls[0:1],
    combine="by_coords",
    coords="minimal", 
    compat="override",
    loadable_variables=LOW_DIMENSIONAL_VARS,
    decode_times=False,
    reader_options={'storage_options': s3fs_kwargs}
)
combined_vds_single_simulation

  super().__init__(**codec_config)
  super().__init__(**codec_config)
  super().__init__(**codec_config)
  super().__init__(**codec_config)
  super().__init__(**codec_config)
  super().__init__(**codec_config)
  super().__init__(**codec_config)
  super().__init__(**codec_config)
  super().__init__(**codec_config)
  super().__init__(**codec_config)
  super().__init__(**codec_config)
  super().__init__(**codec_config)
  super().__init__(**codec_config)
  super().__init__(**codec_config)
  super().__init__(**codec_config)
  super().__init__(**codec_config)
  super().__init__(**codec_config)
  super().__init__(**codec_config)
  super().__init__(**codec_config)
  super().__init__(**codec_config)
  super().__init__(**codec_config)
  super().__init__(**codec_config)
  super().__init__(**codec_config)
  super().__init__(**codec_config)
  super().__init__(**codec_config)
  super().__init__(**codec_config)
  super().__init__(**codec_config)
  super().__init__(**codec_config)
  super().__init__(*

CPU times: user 13.7 s, sys: 4.78 s, total: 18.5 s
Wall time: 1min 36s


In [25]:
%%time
combined_vds_single_simulation = open_virtual_mfdataset(
    single_simulation_urls[0:2],
    combine="by_coords",
    coords="minimal", compat="override",
    loadable_variables=LOW_DIMENSIONAL_VARS,
    decode_times=False,
    reader_options={'storage_options': s3fs_kwargs}
)
combined_vds_single_simulation

  super().__init__(**codec_config)
  super().__init__(**codec_config)
  super().__init__(**codec_config)
  super().__init__(**codec_config)
  super().__init__(**codec_config)
  super().__init__(**codec_config)
  super().__init__(**codec_config)
  super().__init__(**codec_config)
  super().__init__(**codec_config)
  super().__init__(**codec_config)
  super().__init__(**codec_config)
  super().__init__(**codec_config)
  super().__init__(**codec_config)
  super().__init__(**codec_config)
  super().__init__(**codec_config)
  super().__init__(**codec_config)
  super().__init__(**codec_config)
  super().__init__(**codec_config)
  super().__init__(**codec_config)
  super().__init__(**codec_config)
  super().__init__(**codec_config)
  super().__init__(**codec_config)
  super().__init__(**codec_config)
  super().__init__(**codec_config)
  super().__init__(**codec_config)
  super().__init__(**codec_config)
  super().__init__(**codec_config)
  super().__init__(**codec_config)
  super().__init__(*

CPU times: user 27.3 s, sys: 10 s, total: 37.3 s
Wall time: 3min 15s


  super().__init__(**codec_config)
  super().__init__(**codec_config)
  super().__init__(**codec_config)
  super().__init__(**codec_config)
  super().__init__(**codec_config)
  super().__init__(**codec_config)
  super().__init__(**codec_config)
  super().__init__(**codec_config)
  super().__init__(**codec_config)
  super().__init__(**codec_config)
  super().__init__(**codec_config)
  super().__init__(**codec_config)
  super().__init__(**codec_config)
  super().__init__(**codec_config)
  super().__init__(**codec_config)
  super().__init__(**codec_config)
  super().__init__(**codec_config)
  super().__init__(**codec_config)
  super().__init__(**codec_config)
  super().__init__(**codec_config)
  super().__init__(**codec_config)
  super().__init__(**codec_config)
  super().__init__(**codec_config)
  super().__init__(**codec_config)
  super().__init__(**codec_config)
  super().__init__(**codec_config)
  super().__init__(**codec_config)
  super().__init__(**codec_config)
  super().__init__(*

In [27]:
combined_vds_single_simulation.virtualize.nbytes / 1e3  # in kB

4.364

(Good - the virtual dataset grows in size < proportionally to the number of references)

### Open all virtual references for one simulation in parallel

Let's try parallelizing the generation of virtual references for each file.

We're going to use Lithops to do this using serverless AWS Lambda functions, so we need to have a `.lithops_config` file in the root of the directory we are running the notebook from.

(Or we can just enter temporary credentials as environment variables)

In [21]:
!export AWS_ACCESS_KEY_ID="ASIAS6J7QP4KNIRXGXUH"
!export AWS_SECRET_ACCESS_KEY="Glh+W/MC2eN1/iYz8BfGgBZDq9L6NvnQOD/pUkeO"
!export AWS_SESSION_TOKEN="IQoJb3JpZ2luX2VjEDgaCXVzLWVhc3QtMSJIMEYCIQD/7ilvW5S8ePBdDA7S5yBknkk6lFYbWTBGFtGwj/Z9YQIhAMfPbq1noRSifolXGds/PNDLi1jUx+Fx5ALPT6upJMTUKokDCMH//////////wEQABoMMjAyNTMzNTM1NTA4IgwZB2H87s7Dx9Js34Iq3QILlumXGH0aOsiOvbe5mrEx41nKlma48XrfUifNSaXCpgrYYHqNK7Xtz6hCCnVIWAkwNHa0bmYW8gmOA3dVxVj5LDJ84TvvyiJ23Rfl6BxDFc66I28zSnxgffXUiqTJ3XGn3e3Kn6CjUkYdaE3LC0XWYn3SDZTi0ycwWyZpiPmngtluLQV0+LEufq/Nl4R9t9ech492cnO7UvtH+bp8n0A75DZtKweGX7FPOveZRG//uLQMZX/HFW4wcBaxPp/lBBIBJW1kMJuUthg6VY25Xbs5NdlRePagxoma7QEUcpeOuGowK9k3wEwnPsDrHsZGg+wt+8Ai5sad4gdXza7RD+5GueWAIIoLM+WEyow5QWH+UcA/NRzoeADt7Lt9t3kPA99vNYGqd3M8648lq2GnaZCgHRPUqQO9PQT1QWe6iov9BHu1wlfu0uiKfcPAoFgHOVZzJcQlM9K1srmNCcjsMJnLmcAGOqUBGVqLwNqzMzsSF9Q71NU9hkT7ugE1CCJ63K8yJHH20Sgv2gGvQQCi5uQyl7tllD6r1HbWhP9ySTrPc+ViLyjY8oiKSucb37Jn/TNMVQtcfO/vhiR2w4JJeMLPgfyi0MlhMcyWM849Jbp+lgWNLOX3vRAKNMNnEXG0ZySIh9yHrPyp/iyPgz2GCesiDCKosZWEHtmpoFYIffrwZgjCvxuzYuHuSBq8"

We also need to build a lithops runtime in order to package up the dependencies that each serverless function will need (i.e. virtualizarr and its dependencies). But we did this earlier.

Now we can run reference generation in parallel

Try it for just two files first

In [14]:
%%time
combined_vds_single_simulation = vz.open_virtual_mfdataset(
    single_simulation_urls[0:2],
    combine="by_coords",
    coords="minimal", compat="override",
    loadable_variables=LOW_DIMENSIONAL_VARS,
    decode_times=False,
    reader_options={'storage_options': s3fs_kwargs},
    parallel='lithops',
)
combined_vds_single_simulation

2025-04-21 11:41:20,558 [INFO] config.py:139 -- Lithops v3.6.1.dev0 - Python3.12
2025-04-21 11:41:20,722 [INFO] aws_s3.py:59 -- S3 client created - Region: us-west-2
2025-04-21 11:41:21,489 [INFO] aws_lambda.py:97 -- AWS Lambda client created - Region: us-west-2
2025-04-21 11:41:21,492 [INFO] config.py:139 -- Lithops v3.6.1.dev0 - Python3.12
2025-04-21 11:41:21,578 [INFO] aws_s3.py:59 -- S3 client created - Region: us-west-2
2025-04-21 11:41:22,096 [INFO] aws_lambda.py:97 -- AWS Lambda client created - Region: us-west-2
2025-04-21 11:41:22,099 [INFO] invokers.py:119 -- ExecutorID a73d22-1 | JobID M000 - Selected Runtime: virtualizarr-runtime - 1000MB


Exception: The indicated runtime 'virtualizarr-runtime' is running Python 3.10 and it is not compatible with the local Python version 3.12

### The whole dataset

Now in theory we could create virtual references for the entire dataset just using a single function call!:

```python
combined_vds = open_virtual_mfdataset(
    's3://cworthy/oae-efficiency-atlas/data/experiments/**',
    combine="by_coords",
    coords="minimal", compat="override",
    loadable_variables=LOW_DIMENSIONAL_VARS,
    decode_times=True,
    reader_options={'storage_options': s3fs_kwargs}
    parallel='lithops',
)
```

This would automatically use 500,000 AWS Lambda functions to create references for each of our files, send them all back to the client (this notebook) and then combine them here.

However in practice that would be a little silly, because:
1. AWS accounts are only allowed to run a maximum of 1000 Lambdas concurrently by default, and I don't know anyone at AWS to ask nicely to up that default limit. That means our 500,000 functions would actually run in 500 batches of 1000.
2. 500 batches x 2 minutes each = 16 hours, which is long enough that if something unexpected goes wrong I would like not to have to start all over again.

Instead let's generate references for part of the dataset at a time, and commit those references once generated, so that if something goes wrong we can pick up from the last commit. We will still use `open_virtual_mfdataset`, but on 4 simulations at once (so 180 * 4 = 720 files at a time), because that's <1000 so each set of references will still be maximally parallelized.

We will write each batch of references out into an Icechunk store, making an immutable commit before we start to generate the next batch.

In [None]:
import icechunk

storage_config = icechunk.StorageConfig.s3_from_env(
    bucket="icechunk-test",
    prefix="quickstart-demo-1"
)
repo = icechunk.Repository.create(storage_config)

In [None]:
session = repo.writable_session("main")
store = session.store()

In [None]:
polygon_ids = ['000', '001']  # etc...

In [None]:
%%time
# loop over the entire dataset
for polygon_id in polygon_ids:
    
    # generate references for 720 files in parallel and combine them
    polygon_vds = open_virtual_mfdataset(
        f's3://cworthy/oae-efficiency-atlas/data/experiments/{polygon_id}/**',
        combine="by_coords",
        coords="minimal", compat="override",
        loadable_variables=LOW_DIMENSIONAL_VARS,
        decode_times=True,
        reader_options={'storage_options': s3fs_kwargs}
        parallel='lithops',
    )

    # have to treat the first commit differently as there are no arrays to append to in the store yet
    if polygon_id == '000':
        virtual_ds.virtualize.to_icechunk(store)
    else:
        virtual_ds.virtualize.to_icechunk(store, append_dim='polygon_id')

    store.commit(f"wrote virtual references for polygon={polygon_id}")

Once that's run, we're done!

### Accessing the data

To access the data, users first start a read-only session with the icechunk store

In [None]:
import icechunk

storage_config = icechunk.StorageConfig.s3_from_env(
    bucket="icechunk-test",
    prefix="quickstart-demo-1"
)
repo = icechunk.Repository.open(storage_config)
store = repo.readonly_session()

Then they only need to call `xarray.open_zarr` to get at the entire dataset!

In [None]:
%%time
ds = xr.open_zarr(repo.store(), consolidated=False)
ds

Notice how quick that was to open - that's because now instead of the user having to touch hundreds of thousands of netCDF files to see whats inside, they only need to read the lightweight metadata that we extracted from those files and committed to icechunk for them.

This access pattern is entirely serverless - there is no icechunk server between the user and the data, they are just directly reading files on S3.

Icechunk supports an arbitrary number of concurrent users trying to read the same data, and we the data provider can even make updates to the data as it is being read without causing any consistency problems!