# ESGF Virtual Aggregation

The aim of this project is to create a ready-to-deploy TDS catalog including ALL available data in ESGF, using OPeNDAP endpoints to provide ESGF data analysis while avoiding the download of any data from remote repositories.

In [None]:
import time
import psutil

import xarray
import dask

import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt

In [None]:
dask.config.set(scheduler="processes")

## Baseline OPeNDAP performance test

This is a baseline performance test, targeting a data server unrelated to ESGF. The purpose is to get a baseline to compare performance of ESGF data nodes.

In [None]:
# NcML - 72 netCDF files inside and 9 variables (9 join existing aggregations of 8 files each, plus the union)
dataset = "https://hub.climate4r.ifca.es/thredds/dodsC/ncmls/ESGF/interp025/CORDEX/output/AFR-22/GERICS/MOHC-HadGEM2-ES/historical/r1i1p1/REMO2015/v1/day/CORDEX_output_AFR-22_GERICS_MOHC-HadGEM2-ES_historical_r1i1p1_REMO2015_v1_day_v20191015.ncml"

ds = xarray.open_dataset(dataset).chunk({"time": 100})
v = ds["tas"]
v

In [None]:
%time v.mean().compute(num_workers=4)

## ESGF data node testing

Let's start testing using a NcML with few netCDFs inside.

In [None]:
# NcML - 5 netCDF files inside and 1 variable (5 join existing aggregations of 1 file each)
dataset = "https://hub.ipcc.ifca.es/thredds/dodsC/esgeva/ensemble/CMIP6/ScenarioMIP/day/CMIP6_ScenarioMIP_CNRM-CERFACS_CNRM-CM6-1_ssp245_day_gr_v20190410/replicas/aims3.llnl.gov/CMIP6_ScenarioMIP_CNRM-CERFACS_CNRM-CM6-1_ssp245_day_tas_gr_v20190410_aims3.llnl.gov.ncml"

ds = xarray.open_dataset(dataset).chunk({"variant_label": 1, "time": 400})
v = ds["tas"]
v

In [None]:
%time a = v.mean(["lat", "lon"]).compute(num_workers=4)

In [None]:
a.resample({"time": "Y"}).mean().plot.line(x="time")

## ESGF with larger NcML

If we test larger NcMLs, specifically at lower temporal resolution, we start to see timeouts and different OPeNDAP errors.

This is caused by both bad chunking in the netCDF files in the time coordinate and multiple DDS and DAS requests from OPeNDAP. A chunk size of 1 makes the BTree of the variable unncessary large and kills performance.

The following is an example of this. An alternative NcML is provided in which the time coordinate is hardcoded into the NcML, thus avoiding reading the values from the server. We can observe that OPeNDAP errors dissapear.

In [None]:
# NcML - 2494 netCDF files inside, 29 variant_labels (29 join existing aggregations of 86 files each)
dataset = "https://hub.ipcc.ifca.es/thredds/dodsC/esgeva/ensemble/CMIP6/ScenarioMIP/3hr/CMIP6_ScenarioMIP_MIROC_MIROC-ES2L_ssp245_3hr_gn_v20210107/CMIP6_ScenarioMIP_MIROC_MIROC-ES2L_ssp245_3hr_tas_gn_v20210107_esgf-data02.diasjp.net.ncml"

# Substitute the URL for the one with hardcoded time values
dataset = "https://hub.ipcc.ifca.es/thredds/dodsC/esgeva/demo/CMIP6_ScenarioMIP_MIROC_MIROC-ES2L_ssp245_3hr_tas_gn_v20210107_esgf-data02.diasjp.net.ncml"

ds = xarray.open_dataset(dataset).chunk({"variant_label": 1, "time": 1600})
v = ds["tas"]
v

In [None]:
# Disable HTTP compression
!sed -i '/HTTP\.DEFLATE/{s|1|0|}' ~/.dodsrc

In [None]:
%time a = v.mean(["lat", "lon"]).compute(num_workers=4)

## ESGF with Kerchunk

OPeNDAP reads chunks from netCDF files, performs decompression on the server, and transmits the uncompressed data over the network. On the other hand, Zarr or netCDF+kerchunk both send the chunks compressed through the network.

netCDF clients support HTTP compression but the compression is applied by the HTTP component, OPeNDAP still performs decompression when reading.

In [None]:
nworkers = [2, 4, 8]
results = []

def measure(op, name, nworkers):
    start_net = psutil.net_io_counters()
    start_time = time.time()

    op.compute(num_workers=nworkers)

    end_time = time.time()
    end_net = psutil.net_io_counters()

    result = {
        "name": name,
        "time": end_time-start_time,
        "bytes_recv": end_net.bytes_recv-start_net.bytes_recv,
        "bytes_sent": end_net.bytes_sent-start_net.bytes_sent,
        "packets_recv": end_net.packets_recv-start_net.packets_recv,
        "packets_sent": end_net.packets_sent-start_net.packets_sent,
        "errin": end_net.errin-start_net.errin,
        "errout": end_net.errout-start_net.errout,
        "dropin": end_net.dropin-start_net.dropin,
        "dropout": end_net.dropout-start_net.dropout,
        "workers": nworkers
    }

    return result

### Kerchunk

In [None]:
ds = xarray.open_dataset(
    "reference://",
    engine="zarr",
    backend_kwargs={
        "consolidated": False,
        "storage_options": {"fo": 'CMIP6_ScenarioMIP_CNRM-CERFACS_CNRM-CM6-1_ssp245_day_tas_gr_v20190410_aims3.llnl.gov.json', "remote_protocol": "https"}
    }).chunk({"variant_label": 1, "time": 400})
v = ds["tas"]

In [None]:
for n in nworkers:
    results.append(
        measure(v.mean(["lat", "lon"]), "Kerchunk", n))

### OPeNDAP

In [None]:
dataset = "https://hub.ipcc.ifca.es/thredds/dodsC/esgeva/demo/CMIP6_ScenarioMIP_CNRM-CERFACS_CNRM-CM6-1_ssp245_day_tas_gr_v20190410_aims3.llnl.gov.ncml"

ds = xarray.open_dataset(dataset).chunk({"variant_label": 1, "time": 400})
v = ds["tas"]

In [None]:
# Disable HTTP compression
!sed -i '/HTTP\.DEFLATE/{s|1|0|}' ~/.dodsrc

In [None]:
for n in nworkers:
    results.append(
        measure(v.mean(["lat", "lon"]), "OPeNDAP", n))

### OPeNDAP with HTTP compression

In [None]:
# Enable HTTP compression
!sed -i '/HTTP\.DEFLATE/{s|0|1|}' ~/.dodsrc

In [None]:
for n in nworkers:
    results.append(
        measure(v.mean(["lat", "lon"]), "OPeNDAP-deflate", n))

### Analyze the results

In [None]:
df = pd.DataFrame.from_records(results)
df

In [None]:
df.to_csv("kerchunk-results.csv")

In [None]:
fig, axes = plt.subplots(1, 3, figsize=(16,5))

df["throughput"] = (df["bytes_recv"] / 2**20) / df["time"]

sns.barplot(data=df, y="time", x="name", hue="workers", ax=axes[0])
sns.barplot(data=df, y="bytes_recv", x="name", hue="workers", ax=axes[1])
sns.barplot(data=df, y="throughput", x="name", hue="workers", ax=axes[2])

for ax in axes:
    ax.set_xlabel("")
    
axes[0].set_ylabel("Time (seconds)")
axes[1].set_ylabel("Size (bytes)")
axes[2].set_ylabel("Throughput (MiB/s)")