# M2HATS Field Campaign Data Pipeline

### Purpose
Use GDEX to read, standardize, and convert files relevant to the M2HATS field campaign to Zarr for more efficient data processing.

### Data
The two datasets used in this example are ERA5 reanalysis on pressure levels (stored on GDEX) and 30-minute 449MHz Wind Profiler data (from EOL's Field Data Archive and temporarily stored on GDEX). 

### Motivation
The old data comparison process involved numerous manual steps: downloading ERA5 reanalysis data from the Copernicus Climate Data Store, and manually downloading Wind Profiler data from EOL's FDA, then untarring and unzipping the dataset. The intent for this new process is to limit the number of steps required to perform analysis and use data formats more compatible with Python's processing tools.

### Audience
Any researcher or PI interested in performing analyses using EOL's in-situ data, who is looking to modernize their workflow.

---

## Import required packages

In [None]:
# For analysis code
import glob
import numpy as np
import xarray as xr
import pandas as pd
from scipy.interpolate import interp1d

# For Dask + cluster
from dask_jobqueue import PBSCluster
from distributed import Client
from dask import delayed

---

## Designate a scratch directory
Define the designated scratch directory to hold Zarr stores created from field campaign data and ERA5 model data.

In [None]:
lustre_scratch  = "/lustre/desc1/scratch/myasears"

---

## Spin up a cluster
Create a cluster and scale it to 5 workers to assist with the processing in this notebook.

In [None]:
cluster = PBSCluster(
        job_name = 'dask-eol-25',
        cores = 1,
        memory = '4GiB',
        processes = 1,
        local_directory = lustre_scratch + '/dask/spill',
        log_directory = lustre_scratch + '/dask/logs/',
        resource_spec = 'select=1:ncpus=1:mem=4GB',
        queue = 'casper',
        walltime = '3:00:00',
        interface = 'ext')

In [None]:
client = Client(cluster)

In [None]:
n_workers = 5
cluster.scale(n_workers)
client.wait_for_workers(n_workers = n_workers)

---

## Load 449 data
This dataset was initially downloaded from EOL's Field Data Archive (FDA), then placed into a directory on the GDEX designated for this pilot study. The 449 MHz profiler dataset is stored in daily netcdf files, wherein each data variable depends on time (every 30 minutes) and height (every 100 meters). 

We would typically open this grouping of netcdf files using xarray's `open_mfdataset`, but each day of profiler data has a slightly different maximum height dimension, which requires a special process to align the height dimension before concatenating the datasets. Once these issues are resolved and the dataset is standardized for enhanced understanding and workflow, the concatenated dataset is converted to a zarr store for ease of future use. 

In [None]:
# Define profiler file path and create a list of all netcdf files
prof449_path = "/gdex/data/special_projects/pythia_2025/eol-cookbook/m2hats_iss2_data/prof449Mhz_30min_winds"
files = sorted(glob.glob(f"{prof449_path}/*.nc"))

In [None]:
# All height values begin with a multiple of 100 and increase by 100, but have different max and min values.
# This code determines the global min and max of the entire field campaign's profiler data.  
def get_minmax_alt(f):
    with xr.open_dataset(f, decode_cf=False) as tmp:
        return float(tmp['height'].min()), float(tmp['height'].max())
        
min_heights, max_heights = zip(*[get_minmax_alt(f) for f in files])
min_height, max_height = min(min_heights), max(max_heights)

In [None]:
# Create common agl and msl height grids from the min/max height values.
# Altitude is taken from the fifth file to avoid the instrument's set up period (manually checked).
common_agl = np.arange(min_height, max_height + 100, 100)
common_msl = common_agl + xr.open_dataset(files[5]).alt.values

In [None]:
def open_and_regrid(f, common_agl, common_msl):
    """
    Open a dataset and regrid its height coordinates to common AGL/MSL grids.
    """

    # Open dataset with dask-style lazy loading
    ds = xr.open_dataset(f, chunks="auto")

    # Make height coordinate 1-dimensional
    height_1d = ds['height'].isel(time=0).values
    ds = ds.assign_coords(height=("height", height_1d))

    # Reindex height coords to span min + max from entire campaign
    ds = ds.reindex(height=common_agl)

    # Add corresponding AGL and MSL coordinates
    ds = ds.assign_coords(
        height_agl=("height", common_agl),
        height_msl=("height", common_msl)
    )

    # Swap to MSL as the primary height dimension
    ds = ds.swap_dims({"height": "height_msl"}).drop_vars("height")
    ds.height_msl.attrs.update({"long_name": "Height above mean sea level", "units": "m"})

    return ds

In [None]:
# Open and regrid all datasets, then concatenate into a single 449 MHz profiler dataset
datasets = [delayed(open_and_regrid)(f, common_agl, common_msl) for f in files[2:]]
datasets = [d.compute() for d in datasets]
combined_profiler = xr.concat(datasets, dim="time", combine_attrs="override")

In [None]:
# Collapse lat/lon/alt variables to a single value and assign them as coordinates.
combined_profiler = combined_profiler.assign_coords(
    latitude=combined_profiler["lat"].isel(time=0).item(),
    longitude=combined_profiler["lon"].isel(time=0).item(),
    altitude=combined_profiler["alt"].isel(time=0).item()
).drop_vars(["lat", "lon", "alt"])

# Standardize the naming conventions
name_mapping = {
    "u": "u_wind",
    "v": "v_wind",
    "wvert": "w_wind"
}
combined_profiler = combined_profiler.rename(name_mapping)

In [None]:
# Save the profiler data to a zarr store
combined_profiler = combined_profiler.chunk({"time": 48, "height_msl": -1})
combined_profiler.to_zarr(f"{lustre_scratch}/2023_M2HATS/prof449_M2HATS_ISS1_winds30.zarr")

---

## Load ERA5 data
The dataset for ERA5 reanalysis on pressure levels is stored on GDEX, so we bypass any necessity to download files from the CDS. ERA5 reanalysis data is stored in netcdf files separated by day and variable, wherein each data variable depends on time (every hour) and pressure (a standardized pressure grid). We would normally be able to read this data quite simply using 'intake', but this case study is unique in that we are interested in atmospheric profiles form a single lat/lon point for this analysis, and the ERA5 data are stored on pressure levels over an xy plane spanning the entire globe. 

To work with this information, we lazily load the all relevant monthly datasets for a single variable, then subset the Xarray Dataset by the lat/lon of the profiler and all times spanning the target field campaign. This process is repeated for all desired variables, then all resulting datasets are merged together to produce an all-inclusive dataset for the field campaign. 

Upon creating the concatenated dataset, we also implement code to interpolate the data variables onto a common msl height grid for direct comparison with the other ISS instruments.

In [None]:
# Enter the specifications from the M2HATS field campaign
target_lat = 38.0
target_lon = 243.0

start_date = pd.Timestamp("2023-07-11T00:00:00")
end_date = pd.Timestamp("2023-09-27T23:59:59")
yyyymm = ["202307", "202308", "202309"]

In [None]:
# Define the ERA5 file path 
era5_path = '/gdex/data/d633000/e5.oper.an.pl'

In [None]:
def open_variable(file_prefix, yyyymm):
    """Open, subset, and combine ERA5 files for a given variable and date range."""
    files = []
    for month in yyyymm:
        files.extend(sorted(glob.glob(f'{era5_path}/{month}/{file_prefix}*')))

    ds = xr.open_mfdataset(files, combine="by_coords", parallel=True)
    ds_point = ds.sel(latitude=target_lat, longitude=target_lon, time=slice(start_date, end_date))
    
    return ds_point

In [None]:
%%time
# Define the variables to be included in the final dataset
var_map = {"Z": "e5.oper.an.pl.128_129_z",
           "U": "e5.oper.an.pl.128_131_u",
           "V": "e5.oper.an.pl.128_132_v",
           "W": "e5.oper.an.pl.128_135_w"
           }

# Open, subset, and combine files from all variables
datasets = [open_variable(file_prefix, yyyymm) for file_prefix in var_map.values()]

# Merge them together into one xarray dataset
combined_era5 = xr.merge(datasets, compat="override", combine_attrs="override")

In [None]:
# Convert geopotential to MSL height
combined_era5["height_msl"] = (combined_era5["Z"] * 6371008.7714) / (9.80665 * 6371008.7714 - combined_era5["Z"])
combined_era5.height_msl.attrs.update({"long_name": "Height above mean sea level", "units": "meters"})

# Drop utc_date variable
combined_era5 = combined_era5.drop_vars("utc_date")

# Change variable names to standardize with other datasets
name_mapping = {"level": "pressure", "Z": "geopotential", "U": "u_wind", "V": "v_wind", "W": "w_wind"}
combined_era5 = combined_era5.rename(name_mapping)

In [None]:
def make_height_dependent(era5_data, altitude_levels):
    """
    Interpolate ERA5 pressure-level data to a common height grid.
    Adapted from: [Hamid Ali Syed](https://github.com/syedhamidali) (@syedhamidali)
    Referenced at: https://discourse.pangeo.io/t/how-to-convert-era5-pressure-coordinates-to-altitude/4071
    """

    interpolated_vars = {}
    for var in era5_data.data_vars:
        if var in ["height_msl", "geopotential"]:
            continue
    
        # Interpolate along altitude with apply_ufunc
        interp_data = xr.apply_ufunc(
            lambda x, y: interp1d(y, x, bounds_error=False, fill_value="extrapolate")(
                altitude_levels
            ),
            era5_data[var],
            era5_data["height_msl"],
            input_core_dims=[["pressure"], ["pressure"]],
            output_core_dims=[["height_msl"]],
            dask_gufunc_kwargs={"output_sizes": {"height_msl": len(altitude_levels)}},
            vectorize=True,
            dask="parallelized",
            output_dtypes=[era5_data[var].dtype],
        )
    
        # Store with attributes and add to dictionary
        interp_data.attrs = era5_data[var].attrs
        interpolated_vars[var] = interp_data
    
    # Create dataset with interpolated variables and coordinates
    coords = {
        "time": era5_data.time,
        "height_msl": altitude_levels
    }
    
    # Initialize interpolated dataset and set correct order of coordinates and data_vars
    ds_interpolated = xr.Dataset(interpolated_vars, coords=coords).transpose(
        "time", "height_msl"
    )
    
    # Add attributes to alt coordinate
    ds_interpolated["height_msl"].attrs = {
        "standard_name": "height_msl",
        "units": "m",
        "long_name": "Geometric Height (above mean sea level)",
        "positive": "up",
    }
    
    ds_interpolated["longitude"].attrs = era5_data["longitude"].attrs
    ds_interpolated["latitude"].attrs = era5_data["latitude"].attrs
    ds_interpolated["time"].attrs = era5_data["time"].attrs
    
    return ds_interpolated

In [None]:
# Implement the function for height dependence
era5_height_levels = make_height_dependent(combined_era5, common_msl)

In [None]:
# Save the ERA5 data to a zarr store
era5_height_levels.to_zarr(f"{lustre_scratch}/2023_M2HATS/TEST_heights.zarr")

## Open the Zarr files
A sanity check to make sure the Zarr files have the information we'd expect, in the correct format. They look good and ready to be used in analysis!

In [None]:
era5_test_zarr = xr.open_zarr(f"{lustre_scratch}/2023_M2HATS/era5_M2HATS_ISS1_heights.zarr")
era5_test_zarr

In [None]:
prof449Mhz_test_zarr = xr.open_zarr(f"{lustre_scratch}/2023_M2HATS/prof449_M2HATS_ISS1_winds30.zarr")
prof449Mhz_test_zarr

## Notes on this workflow
The GDEX-assisted ERA5 retrieval process highlighted in this notebook took significantly less time and storage space than the previous workflow. I look forward to using GDEX for the retrieval of other large-scale models and reanalysis products. Additionally, the Zarr storage of ERA5 data subset for the M2HATS field campaign will be immensely useful during the next stages of analysis. 

The 449MHz profiler ingest process was very programatically similar to my current workflow, but it's much more approachable to read in netcdf files from the GDEX, rather than go through the download/untar/unzipping process that is currently required. In addition, the ability to analyze the profiler data from a Zarr store rather than numerous individual (and height-unaligned) netcdf files is expected to significantly improve my workflow. Now, I can perform analysis after only writing one or two lines to read in both ERA5 and 449 Profiler datasets. 