# Generate consolidated metadata reference - PREFIRE

Generate consolidated metadata reference file (directly from all of the netCDF files) for all of a PREFIRE Level 3 data collection.

- [ ] Can run on Openscapes’ JupyterHub
- [ ] May need to modify underlying kerchunk code to get each group individually, and then combine
- [ ] Or could use VirtualiZarr to open the DMR++ files and consolidate the metadata information from them

Notes: 
- This was originally set up as a modification from James’ gist [https://gist.github.com/jrbourbeau/ab4b07f753d67eafd09a1580baea2b4a]
- **In the latest virtualizarr version, we need `HDFParser` a `Store`, passing `group=` to the Parser**

## 1. Setup

In [1]:
import warnings
from urllib.parse import urlparse

import earthaccess
import obstore
import virtualizarr as vz
import xarray as xr

from tqdm import tqdm
from obstore.store import LocalStore, S3Store
from virtualizarr import open_virtual_dataset
from virtualizarr.parsers import HDFParser
from virtualizarr.registry import ObjectStoreRegistry

**This requires `virtualizarr>=2.0.1`**

In [5]:
vz.__version__

'2.0.1'

In [3]:
# Authenticate my machine with `earthaccess`
earthaccess.login()

<earthaccess.auth.Auth at 0x7f875503b350>

In [4]:
# Get AWS creds. Note that if you spend more than 1 hour in the notebook, you may have to re-run this line!!!
creds = earthaccess.get_s3_credentials(daac="ASDC")

## 2. Generate virtual datacube

### 2.1. Get links to data of interest

In [6]:
# Retrieve data files for the dataset I'm interested in
results = earthaccess.search_data(
    short_name="PREFIRE_SAT2_3-SFC-SORTED-ALLSKY",
    version="R01",
    cloud_hosted=True,
    # temporal=("2025-06-30 12:00", "2025-07-01 12:00"),
)

In [7]:
# Get S3 endpoints for all files:
data_s3links = [g.data_links(access="direct")[0] for g in results]
print("Number of granules found =", len(data_s3links))
print("First few granules:")
data_s3links[0:3]

Number of granules found = 12
First few granules:


['s3://asdc-prod-protected/PREFIRE/PREFIRE_SAT2_3-SFC-SORTED-ALLSKY_R01/2024.07.01/PREFIRE_SAT2_3-SFC-SORTED-ALLSKY_R01_P00_20240701000000_20240731235959.nc',
 's3://asdc-prod-protected/PREFIRE/PREFIRE_SAT2_3-SFC-SORTED-ALLSKY_R01/2024.08.01/PREFIRE_SAT2_3-SFC-SORTED-ALLSKY_R01_P00_20240801000000_20240831235959.nc',
 's3://asdc-prod-protected/PREFIRE/PREFIRE_SAT2_3-SFC-SORTED-ALLSKY_R01/2024.09.01/PREFIRE_SAT2_3-SFC-SORTED-ALLSKY_R01_P00_20240901000000_20240930235959.nc']

In [8]:
url = data_s3links[0]
parsed = urlparse(url)

print(f"url = {url}")
print(f"\nparsed = {parsed}")

url = s3://asdc-prod-protected/PREFIRE/PREFIRE_SAT2_3-SFC-SORTED-ALLSKY_R01/2024.07.01/PREFIRE_SAT2_3-SFC-SORTED-ALLSKY_R01_P00_20240701000000_20240731235959.nc

parsed = ParseResult(scheme='s3', netloc='asdc-prod-protected', path='/PREFIRE/PREFIRE_SAT2_3-SFC-SORTED-ALLSKY_R01/2024.07.01/PREFIRE_SAT2_3-SFC-SORTED-ALLSKY_R01_P00_20240701000000_20240731235959.nc', params='', query='', fragment='')


In [14]:
store = S3Store(
    bucket=parsed.netloc,
    region="us-west-2",
    access_key_id = creds['accessKeyId'],
    secret_access_key= creds['secretAccessKey'],
    token=creds['sessionToken'],
    virtual_hosted_style_request=False,
    client_options={"allow_http": True},
)

reg = ObjectStoreRegistry({"s3://" + parsed.netloc: store})

store

S3Store(bucket="asdc-prod-protected")

### 2.2. Test handling of a single file

In [None]:
files_downloaded = earthaccess.download(results)

In [None]:
dtree = xr.open_datatree(files_downloaded[0])
dtree

In [None]:
open_virtual_dataset(
    url, 
    registry=reg, 
    parser=HDFParser(group="Sfc-Sorted")
)

### 2.3. **Generate virtual datacube, in series**

We create a virtual datacube for each file in each group, concatenate the files within each group, and then merge groups.

In [None]:
%%time
vds_groups = {}

def _open_and_append(file_path, group_name):
    vds_groups.setdefault(group_name, []).append(
        open_virtual_dataset(
            url, 
            registry=reg,
            parser=HDFParser(group=group_name),
        )
    )

warnings.filterwarnings(
  "ignore",
  message="Numcodecs codecs are not in the Zarr version 3 specification*",
  category=UserWarning
)

for p in tqdm(data_s3links[:]):
    # _open_and_append(p, group_name="")
    _open_and_append(p, group_name="Sfc-Sorted")

In [None]:
%%time
xr_concat_kwargs = {
    "coords": "minimal",
    "compat": "override",
    "combine_attrs": "override",
    "data_vars": "minimal"
}

# vds_root = xr.concat(vds_groups[""], dim="time", **xr_concat_kwargs)
vds_sfc_sorted = xr.concat(vds_groups["Sfc-Sorted"], dim="time", **xr_concat_kwargs)

In [None]:
# %%time
# vds_merged = xr.merge([
#     vds_root, 
#     vds_sfc_sorted, 
# ])
# vds_merged

# *** OR ***

vds_merged = vds_sfc_sorted

In [None]:
vds_merged.vz.to_kerchunk("prefire-monthly-kerchunk_created20250815.json", format="json")

## 3. Reading an existing kerchunk file

### 3.1. Setup

In [23]:
import json
import os

from pathlib import Path
from obstore.store import LocalStore
from virtualizarr.parsers import KerchunkJSONParser

warnings.filterwarnings(
  "ignore",
  message="Numcodecs codecs are not in the Zarr version 3 specification*",
  category=UserWarning
)

In [27]:
# Here is the JSON we will read.
json_ref_file = "prefire-monthly-kerchunk_created20250815.json"

In [None]:
with open(json_ref_file) as f:
    metadata = json.load(f)

In [52]:
# Here we create a LocalStore
project_directory = Path.cwd()
local_store = LocalStore(prefix=project_directory)

local_store

LocalStore("/home/jovyan/earthaccess-virtualizar")

In [53]:
project_url = f"file://{project_directory / json_ref_file}"
local_registry = ObjectStoreRegistry({project_url: local_store})

local_registry

<virtualizarr.registry.ObjectStoreRegistry at 0x7f870a9d5810>

### 3.2. Read it

In [60]:
json_parser = KerchunkJSONParser()

manifest_store = json_parser(
    url=project_url, 
    registry=local_registry
)

manifest_store

<virtualizarr.manifests.store.ManifestStore at 0x7f86f130a550>

In [61]:
data = xr.open_dataset(
    manifest_store, 
    engine="zarr", 
    chunks={},
    backend_kwargs={"consolidated": False}
)

GroupNotFoundError: No group found in store <virtualizarr.manifests.store.ManifestStore object at 0x7f86f130a550> at path ''

## Extra, unused code below...

In [None]:
# data = xr.open_dataset(
#     metadata, 
#     engine="zarr"
# )

In [None]:
# vds = open_virtual_dataset(
#     json_ref_file,
#     registry=registry,
#     parser=KerchunkJSONParser()
#     # parser=HDFParser()
# )
# vds

In [None]:
import fsspec

fs = earthaccess.get_s3_filesystem(daac="ASDC")

storage_opts = {"fo": metadata, "remote_protocol": "s3", "remote_options": fs.storage_options}
fs_ref = fsspec.filesystem('reference', **storage_opts)
m = fs_ref.get_mapper('')

In [None]:
data = xr.open_dataset(
    m, 
    engine="zarr", 
    chunks={},
    backend_kwargs={"consolidated": False}
)

In [None]:
post_kerchunk_datatree = xr.open_datatree(
    "reference://", 
    engine="zarr", 
    backend_kwargs={
            "consolidated": False,
            "storage_options": {
                "fo": metadata,
                "remote_protocol": "s3",
                "remote_options": fs.storage_options,
            }
        }
)

In [None]:
vds.wavelength.values

In [None]:
# parser = HDFParser()
# vds = open_virtual_dataset(
#   url=file_url,
#   parser=HDFParser(),
#   registry=registry,
# )

In [None]:
# # Using https://github.com/zarr-developers/VirtualiZarr/pull/631/
# from obstore.store import MemoryStore
# from virtualizarr.parsers import KerchunkJSONParser

# memory_store = obstore.store.MemoryStore()
# memory_store.put("refs.json", json.dumps(metadata).encode())

# # registry = ObjectStoreRegistry({"memory://": memory_store})
# parser = KerchunkJSONParser()  # store_registry=registry)
# manifeststore = parser("refs.json", memory_store)

In [None]:
import json
from IPython.display import JSON

import earthaccess
import xarray as xr

In [None]:
fs = earthaccess.get_s3_filesystem(daac="ASDC")

In [None]:
with open("prefire-monthly-kerchunk_created20250807.json") as f:
    metadata = json.load(f)

In [None]:
post_kerchunk_datatree = xr.open_datatree(
    "reference://", 
    engine="zarr", 
    backend_kwargs={
            "consolidated": False,
            "storage_options": {
                "fo": metadata,
                "remote_protocol": "s3",
                "remote_options": fs.storage_options,
            }
        }
)

#### Using dask delayed function

In [None]:
# # Create individual references:

# def _open_and_append(file_path, group_name):
#     with warnings.catch_warnings():
#         warnings.filterwarnings(
          # "ignore",
          # message="Numcodecs codecs are not in the Zarr version 3 specification*",
          # category=UserWarning
#         )  # Or other warning types
        
#         return open_virtual_dataset(
#             url, 
#             registry=reg,
#             parser=HDFParser(group=group_name)
#         )

# open_vds_partial = dask.delayed(_open_and_append)

# def generate_task_list(group=""):
#     return [
#         open_vds_partial(p, group)  #, loadable_variables=coord_vars) 
#         for p in data_s3links[:]
#     ]
# root_group_tasks = generate_task_list(group="")
# product_group_tasks = generate_task_list(group="product")
# geolocation_group_tasks = generate_task_list(group="geolocation")
# support_data_group_tasks = generate_task_list(group="support_data")

In [None]:
# %%time
# virtual_ds_list_root = list(da.compute(*root_group_tasks))

In [None]:
# %%time
# virtual_ds_list_product = list(da.compute(*product_group_tasks))

In [None]:
# %%time
# virtual_ds_list_geolocation = list(da.compute(*geolocation_group_tasks))

In [None]:
# %%time
# virtual_ds_list_support_data = list(da.compute(*support_data_group_tasks))

In [None]:
# virtual_ds_list_root

In [None]:
# merged_vds_list = []

# for i in tqdm(range(len(virtual_ds_list_root))):
#     merged_vds_list.append(
#         xr.merge([
#             virtual_ds_list_root[i], 
#             virtual_ds_list_product[i], 
#             virtual_ds_list_geolocation[i], 
#             virtual_ds_list_support_data[i]
#             ])
#     )
    
# # merged_vds_list

In [None]:
# xr_concat_kwargs = {
#     "coords": "minimal",
#     "compat": "override",
#     "combine_attrs": "override",
#     "data_vars": "minimal"
# }

# vds_merged_concated = xr.concat(merged_vds_list, dim="time", **xr_concat_kwargs)

In [None]:
# vds_merged_concated

In [None]:
# xr_concat_kwargs = {
#     "coords": "minimal",
#     "compat": "override",
#     "combine_attrs": "override",
#     "data_vars": "minimal"
# }

# vds_root = xr.concat(virtual_ds_list_root, dim="time", **xr_concat_kwargs)
# vds_product = xr.concat(virtual_ds_list_product, dim="time", **xr_concat_kwargs)
# vds_geolocation = xr.concat(vvirtual_ds_list_geolocation, dim="time", **xr_concat_kwargs)
# vds_support_data = xr.concat(virtual_ds_list_support_data, dim="time", **xr_concat_kwargs)

In [None]:
vds_merged = xr.merge([
    vds_root, 
    vds_product, 
    vds_geolocation, 
    vds_support_data
])
vds_merged