# VirtualiZarr Useful Recipes with NASA Earthdata

#### *Author: Dean Henze, PO.DAAC*

*Reference herein to any specific commercial product, process, or service by trade name, trademark, manufacturer, or otherwise, does not constitute or imply its endorsement by the United States Government or the Jet Propulsion Laboratory, California Institute of Technology.*

## Summary

This notebook goes through several functionalities of the VirtualiZarr package to create virtual reference files, specifically using it with NASA Earthdata and utilizing the `earthaccess` package. It is meant to be a quick-start reference that introduces some key capabilities / characteristics of the package once a user has a high-level understanding of virtual data sets and the cloud-computing challenges they address (see references in the *Prerequisite knowledge* section below). In short, VirtualiZarr is a Python package to create "reference files", which can be thought of as road maps for the computer to efficiently navigate through large arrays in a single data file, or across many files. Once a reference file for a data set is created, utilizing it to open the data can speed up several processes including lazy loading, accessing subsets, and in some cases performing computations. Importantly, one can create a combined reference for all the files in a dataset and use it to lazy load / access the entire record at once.

The functionalities of VirtualiZarr (with earthaccess) covered in this notebook are:

1. **Getting Data File endpoints in Earthdata Cloud** which are needed for virtualizarr to create reference files.
2. **Generating reference files for 1 day, 1 year, and the entire record of a ~750 GB data set**. The data set used is the Level 4 global gridded 6-hourly wind product from the Cross-Calibrated Multi-Platform project (https://doi.org/10.5067/CCMP-6HW10M-L4V31), available on PO.DAAC. This section also covers speeding up the reference creation using parallel computing. Reference files are saved in both JSON and PARQUET formats. The latter is an important format as it reduces the reference file size by ~30x in our tests. *Saving in ice chunk formats will be tested / covered in the coming months.*
3. **Combining reference files (in progress)**. The ability to combine reference files together is valuable, for example to upate reference files for forward-streaming datasets when new data are available, without re-creating the entire record from scratch. However, with the current workflows and version of VirtualiZarr, this is not possible due to our use of a specific kwarg when creating the reference files. The workflow is still included here (with errors) because it is anticipated that this will be fixed in upcoming versions. Alternately, the use of ice chunk will also likely solve this issue (ice chunk functionality to be tested soon). 

## Requirements, prerequisite knowledge, learning outcomes

#### Requirements to run this notebook

* Earthdata login account: An Earthdata Login account is required to access data from the NASA Earthdata system. Please visit https://urs.earthdata.nasa.gov to register and manage your Earthdata Login account.

* Compute environment: This notebook is meant to be run in the cloud (AWS instance running in us-west-2). We used an `m6i.4xlarge` EC2 instance (16 CPU's, 64 GiB memory) for the parallel computing sections. At minimum we recommend a VM with 10 CPU's to make the parallel computations in Section 2.2.1 faster.

* Optional Coiled account: To run the section on distributed clusters, Create a coiled account (free to sign up), and connect it to an AWS account. For more information on Coiled, setting up an account, and connecting it to an AWS account, see their website [https://www.coiled.io](https://www.coiled.io). 

#### Prerequisite knowledge

* This notebook covers virtualizarr functionality but does not present the high-level ideas behind it. For an understanding of reference files and how they are meant to enhance in-cloud access to file formats that are not cloud optimized (such netCDF, HDF), please see e.g. this [kerchunk page](https://fsspec.github.io/kerchunk/), or [this page on virtualizarr](https://virtualizarr.readthedocs.io/en/latest/).

* Familiarity with the `earthaccess` and `Xarray` packages. Familiarity with directly accessing NASA Earthdata in the cloud. 

* The Cookbook notebook on [Dask basics](https://podaac.github.io/tutorials/notebooks/Advanced_cloud/basic_dask.html) is handy for those new to parallel computating.

#### Learning Outcomes

This notebook serves both as a pedagogical resource for learning several key workflows as well as a quick reference guide. Readers will gain the understanding to combine the virtualizarr and earthaccess packages to create virtual dataset reference files for NASA Earthdata.

## Import Packages
#### ***Note Zarr Version***
***Zarr version 2 is needed for the current implementation of this notebook, due to (as of February 2025) Zarr version 3 not accepting `FSMap` objects.***

We ran this notebook in a Python 3.12 environment. The minimal working environment we used to run this notebook was:
```
zarr==2.18.4
fastparquet==2024.5.0
xarray==2025.1.2
earthaccess==0.11.0
fsspec==2024.10.0
dask==2024.5.2 ("dask[complete]"==2024.5.2 if using pip)
h5netcdf==1.3.0
matplotlib==3.9.2
jupyterlab
jupyter-server-proxy
virtualizarr==1.3.0
kerchunk==0.2.7
```
And optionally:
```
coiled==1.58.0
```

In [1]:
# Built-in packages
import os
import sys

# Filesystem management 
import fsspec
import earthaccess

# Data handling
import xarray as xr
from virtualizarr import open_virtual_dataset

# Parallel computing 
import multiprocessing
from dask import delayed
import dask.array as da
from dask.distributed import Client, print

import boto3
import time
import logging
import urllib3


In [23]:
collection = "ECCO_L4_TEMP_SALINITY_05DEG_DAILY_V4R4"
loadable_coord_vars = "latitude,longitude,time"
start_date = "1-1-2017" # like 1-1-2022
end_date = "1-3-2017"# None # like 1-1-2025
bucket = "podaac-thredds-sit"
debug = False 

In [24]:
loadable_coord_vars = loadable_coord_vars.split(",")

In [25]:
print("Collection: {}".format(collection))
print("Vars: {} ({})".format(loadable_coord_vars, str(type(loadable_coord_vars))))
print("Bucket: {}".format(bucket))
print("start_date: {}".format(start_date))
print("end_date: {}".format(end_date))

Collection: ECCO_L4_TEMP_SALINITY_05DEG_DAILY_V4R4
Vars: ['latitude', 'longitude', 'time'] (<class 'list'>)
Bucket: podaac-thredds-sit
start_date: 1-1-2017
end_date: 1-3-2017


## Other Setup

In [26]:
xr.set_options( # display options for xarray objects
    display_expand_attrs=False,
    display_expand_coords=True,
    display_expand_data=True,
)

<xarray.core.options.set_options at 0x314884cb0>

## 1. Get Data File https endpoints


In [27]:
# Get Earthdata creds
earthaccess.login()

<earthaccess.auth.Auth at 0x1101f2960>

In [28]:
# Get AWS creds. Note that if you spend more than 1 hour in the notebook, you may have to re-run this line!!!
# fs = earthaccess.get_s3_filesystem(daac="PODAAC")
fs = earthaccess.get_fsspec_https_session() 

In [29]:
if debug:
    # Get the urllib3 logger
    log = logging.getLogger('urllib3')
    
    # Set the logging level to DEBUG
    log.setLevel(logging.DEBUG)
    
    # Create a stream handler to output logs to the console
    # ch = logging.StreamHandler()
    # ch.setLevel(logging.DEBUG)
    
    # Add the handler to the logger
    # log.addHandler(ch)
    from http.client import HTTPConnection
    
    # Set the debug level for HTTPConnection
    HTTPConnection.debuglevel = 1

In [30]:
# Locate CCMP file information / metadata:
if start_date != None or end_date != None:
    granule_info = earthaccess.search_data(
        short_name=collection,
        temporal=(start_date, end_date)
    )
else:
    granule_info = earthaccess.search_data(
        short_name=collection,
    )

In [31]:
# Get S3 endpoints for all files:
data_s3links = [g.data_links(access="https")[0] for g in granule_info]
data_s3links[0:3]
# You can also access specific fields of the umm like this: 
# granule_info[0]['umm']['TemporalExtent']['RangeDateTime']['BeginningDateTime']

['https://archive.podaac.earthdata.nasa.gov/podaac-ops-cumulus-protected/ECCO_L4_TEMP_SALINITY_05DEG_DAILY_V4R4/OCEAN_TEMPERATURE_SALINITY_day_mean_2016-12-31_ECCO_V4r4_latlon_0p50deg.nc',
 'https://archive.podaac.earthdata.nasa.gov/podaac-ops-cumulus-protected/ECCO_L4_TEMP_SALINITY_05DEG_DAILY_V4R4/OCEAN_TEMPERATURE_SALINITY_day_mean_2017-01-01_ECCO_V4r4_latlon_0p50deg.nc',
 'https://archive.podaac.earthdata.nasa.gov/podaac-ops-cumulus-protected/ECCO_L4_TEMP_SALINITY_05DEG_DAILY_V4R4/OCEAN_TEMPERATURE_SALINITY_day_mean_2017-01-02_ECCO_V4r4_latlon_0p50deg.nc']

## 2. Generate reference files for 1 day, 1 year, and entire record

### 2.1 First day
The virtualizarr function to generate reference information is compact. We use it on one file for demonstration.

***Important***

The kwarg `loadable_variables` is not mandatory to create a viable reference file, but will become important for rapid lazy loading when working with large combined reference files. Assign to this at minimum the list of 1D coordinate variable names for the data set (additional 1D or scalar vars can also be added). This functionality will be the default in future releases of virtualizarr.

In [32]:
# This will be assigned to 'loadable_variables' and needs to be modified per the specific 
# coord names of the data set:
coord_vars = loadable_coord_vars

In [33]:
reader_opts = {"storage_options": fs.storage_options} # S3 filesystem creds from previous section.


In [34]:
%%time

# Create reference for the first data file:
virtual_ds_example = open_virtual_dataset(
    data_s3links[0], indexes={}, 
    reader_options=reader_opts, loadable_variables=coord_vars
    )
print(virtual_ds_example)

<xarray.Dataset> Size: 104MB
Dimensions:         (time: 1, Z: 50, latitude: 360, longitude: 720, nv: 2)
Coordinates:
    Z               (Z) float32 200B ManifestArray<shape=(50,), dtype=float32...
    time_bnds       (time, nv) int32 8B ManifestArray<shape=(1, 2), dtype=int...
    latitude_bnds   (latitude, nv) float32 3kB ManifestArray<shape=(360, 2), ...
    longitude_bnds  (longitude, nv) float32 6kB ManifestArray<shape=(720, 2),...
    Z_bnds          (Z, nv) float32 400B ManifestArray<shape=(50, 2), dtype=f...
  * time            (time) datetime64[ns] 8B 2016-12-31T12:00:00
  * latitude        (latitude) float32 1kB -89.75 -89.25 -88.75 ... 89.25 89.75
  * longitude       (longitude) float32 3kB -179.8 -179.2 -178.8 ... 179.2 179.8
Dimensions without coordinates: nv
Data variables:
    THETA           (time, Z, latitude, longitude) float32 52MB ManifestArray...
    SALT            (time, Z, latitude, longitude) float32 52MB ManifestArray...
Attributes: (62)
CPU times: user 2.25 s

The reference can be saved to file and used to open the corresponding CCMP data file with Xarray:

In [35]:
virtual_ds_example.virtualize.to_kerchunk('virtual_ds_example.json', format='json')

In [36]:
# Open data using the reference file, using a small wrapper function around xarray's open_dataset. 
# This will shorten code blocks in other sections. 
def opends_withref(ref, fs_data):
    """
    "ref" is a reference file or object. "fs_data" is a filesystem with credentials to
    access the actual data files. 
    """
    storage_opts = {"fo": ref, "remote_protocol": "https", "remote_options": fs_data.storage_options}
    fs_ref = fsspec.filesystem('reference', **storage_opts)
    m = fs_ref.get_mapper('')
    data = xr.open_dataset(
        m, engine="zarr", chunks={},
        backend_kwargs={"consolidated": False}
    )
    return data

In [37]:
data_example = opends_withref('virtual_ds_example.json', fs)
data_example

Unnamed: 0,Array,Chunk
Bytes,400 B,400 B
Shape,"(50, 2)","(50, 2)"
Dask graph,1 chunks in 2 graph layers,1 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 400 B 400 B Shape (50, 2) (50, 2) Dask graph 1 chunks in 2 graph layers Data type float32 numpy.ndarray",2  50,

Unnamed: 0,Array,Chunk
Bytes,400 B,400 B
Shape,"(50, 2)","(50, 2)"
Dask graph,1 chunks in 2 graph layers,1 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,2.81 kiB,2.81 kiB
Shape,"(360, 2)","(360, 2)"
Dask graph,1 chunks in 2 graph layers,1 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 2.81 kiB 2.81 kiB Shape (360, 2) (360, 2) Dask graph 1 chunks in 2 graph layers Data type float32 numpy.ndarray",2  360,

Unnamed: 0,Array,Chunk
Bytes,2.81 kiB,2.81 kiB
Shape,"(360, 2)","(360, 2)"
Dask graph,1 chunks in 2 graph layers,1 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,5.62 kiB,5.62 kiB
Shape,"(720, 2)","(720, 2)"
Dask graph,1 chunks in 2 graph layers,1 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 5.62 kiB 5.62 kiB Shape (720, 2) (720, 2) Dask graph 1 chunks in 2 graph layers Data type float32 numpy.ndarray",2  720,

Unnamed: 0,Array,Chunk
Bytes,5.62 kiB,5.62 kiB
Shape,"(720, 2)","(720, 2)"
Dask graph,1 chunks in 2 graph layers,1 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,16 B,16 B
Shape,"(1, 2)","(1, 2)"
Dask graph,1 chunks in 2 graph layers,1 chunks in 2 graph layers
Data type,datetime64[ns] numpy.ndarray,datetime64[ns] numpy.ndarray
"Array Chunk Bytes 16 B 16 B Shape (1, 2) (1, 2) Dask graph 1 chunks in 2 graph layers Data type datetime64[ns] numpy.ndarray",2  1,

Unnamed: 0,Array,Chunk
Bytes,16 B,16 B
Shape,"(1, 2)","(1, 2)"
Dask graph,1 chunks in 2 graph layers,1 chunks in 2 graph layers
Data type,datetime64[ns] numpy.ndarray,datetime64[ns] numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,49.44 MiB,6.18 MiB
Shape,"(1, 50, 360, 720)","(1, 25, 180, 360)"
Dask graph,8 chunks in 2 graph layers,8 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 49.44 MiB 6.18 MiB Shape (1, 50, 360, 720) (1, 25, 180, 360) Dask graph 8 chunks in 2 graph layers Data type float32 numpy.ndarray",1  1  720  360  50,

Unnamed: 0,Array,Chunk
Bytes,49.44 MiB,6.18 MiB
Shape,"(1, 50, 360, 720)","(1, 25, 180, 360)"
Dask graph,8 chunks in 2 graph layers,8 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,49.44 MiB,6.18 MiB
Shape,"(1, 50, 360, 720)","(1, 25, 180, 360)"
Dask graph,8 chunks in 2 graph layers,8 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 49.44 MiB 6.18 MiB Shape (1, 50, 360, 720) (1, 25, 180, 360) Dask graph 8 chunks in 2 graph layers Data type float32 numpy.ndarray",1  1  720  360  50,

Unnamed: 0,Array,Chunk
Bytes,49.44 MiB,6.18 MiB
Shape,"(1, 50, 360, 720)","(1, 25, 180, 360)"
Dask graph,8 chunks in 2 graph layers,8 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray


In [38]:
# Also useful to note, these reference objects don't take much memory:
print(sys.getsizeof(virtual_ds_example), "bytes")

120 bytes


### 2.2 First year
Reference information for each data file in the year is created individually, and then the combined reference file for the year can be created.

For us, reference file creation for a single file takes about 0.7 seconds, so processing a year of files would take about 4.25 minuts. One can easly accomplish this with a for-loop:

```
virtual_ds_list = [
    open_virtual_dataset(
        p, indexes={},
        reader_options={"storage_options": fs.storage_options},
        loadable_variables=coord_vars
        )
    for p in data_s3links
    ]
```

However, we speed things up using basic parallel computing. 

### 2.2.1 Method 1: parallelize using Dask local cluster
If using an `m6i.4xlarge` AWS EC2 instance, there are 16 CPUs available and each should have enough memory to utilize all at once. If working on a different VM-type, change the `n_workers` in the call to `Client()` below as needed.

In [39]:
# Check how many cpu's are on this VM:
print("CPU count =", multiprocessing.cpu_count())

CPU count = 12


In [40]:
# Start up cluster and print some information about it:
client = Client(n_workers=multiprocessing.cpu_count(), threads_per_worker=1)
print(client.cluster)
print("View any work being done on the cluster here", client.dashboard_link)

LocalCluster(8a406368, 'tcp://127.0.0.1:50147', workers=12, threads=12, memory=64.00 GiB)
View any work being done on the cluster here http://127.0.0.1:8787/status


In [41]:
# from unittest.mock import Mock

# # Raise an exception class
# open_virtual_dataset = Mock(side_effect=ValueError)

In [42]:
@delayed
def open_vds_par(datalink, reader_options=None, loadable_variables=None):
    for cnt  in range(1,5):
        try:
            if cnt != 1:
                print("Retrying ({}) {} ".format(cnt, datalink))
            return open_virtual_dataset(datalink, indexes={}, reader_options=reader_options,loadable_variables=loadable_variables
 )
        except Exception as e:
            print(e)
            logging.debug(e)
            time.sleep(cnt**2)
    raise Exception("Could not process file " + datalink)
            

In [43]:
%%time
# Create individual references:
print(coord_vars)
#open_vds_par = delayed(open_virtual_dataset)
tasks = [
    open_vds_par(p, reader_options=reader_opts, loadable_variables=coord_vars) 
    for p in data_s3links # all files
    ]
virtual_ds_list = list(da.compute(*tasks)) # The xr.combine_nested() function below needs a list rather than a tuple.

['latitude', 'longitude', 'time']
CPU times: user 3.54 s, sys: 1.2 s, total: 4.75 s
Wall time: 34.7 s


Using the individual references to create the combined reference is fast and does not requre parallel computing.

In [44]:
%%time
# Create the combined reference
virtual_ds_combined = xr.combine_nested(virtual_ds_list, concat_dim='time', coords="minimal", compat="override",)
# virtual_ds_combined = xr.combine_by_coords(
#     virtual_ds_list, coords="minimal", compat="override", combine_attrs="drop"
# )
virtual_ds_combined

CPU times: user 5.63 ms, sys: 1.25 ms, total: 6.88 ms
Wall time: 6.18 ms


In [49]:
temporal = ""
if start_date != None or end_date != None:
    if start_date != None:
        start = start_date
    else:
        start = "beginning"
        
    if end_date != None:
        end = end_date
    else:
        end = "present"
    
    temporal = f'{start}_to_{end}_'
    
    

In [50]:
# Save in JSON or PARQUET format:
fname_combined_json = f'{collection}_{temporal}virtual.json'
virtual_ds_combined.virtualize.to_kerchunk(fname_combined_json, format='json')

#fname_combined_parq = f'{}_virtual.parq'.format(collection)
#virtual_ds_combined.virtualize.to_kerchunk(fname_combined_parq, format='parquet')

In [51]:
%%time
# Test lazy loading of the combine reference file JSON:
data_json = opends_withref(fname_combined_json, fs)
print(data_json)

<xarray.Dataset> Size: 415MB
Dimensions:         (time: 4, Z: 50, latitude: 360, longitude: 720, nv: 2)
Coordinates:
  * Z               (Z) float32 200B -5.0 -15.0 -25.0 ... -5.461e+03 -5.906e+03
    Z_bnds          (Z, nv) float32 400B dask.array<chunksize=(50, 2), meta=np.ndarray>
  * latitude        (latitude) float32 1kB -89.75 -89.25 -88.75 ... 89.25 89.75
    latitude_bnds   (latitude, nv) float32 3kB dask.array<chunksize=(360, 2), meta=np.ndarray>
  * longitude       (longitude) float32 3kB -179.8 -179.2 -178.8 ... 179.2 179.8
    longitude_bnds  (longitude, nv) float32 6kB dask.array<chunksize=(720, 2), meta=np.ndarray>
  * time            (time) datetime64[ns] 32B 2016-12-31T12:00:00 ... 2017-01...
    time_bnds       (time, nv) float64 64B dask.array<chunksize=(1, 2), meta=np.ndarray>
Dimensions without coordinates: nv
Data variables:
    SALT            (time, Z, latitude, longitude) float32 207MB dask.array<chunksize=(1, 25, 180, 360), meta=np.ndarray>
    THETA           

In [31]:
client.close()