# Download and Save Cloud Data

This notebook downloads variables needed to calculate the precipitation-buoyancy POD from multiple cloud stores. The following code obtains and processes thermodynamic variables from ERA5, and precipitation from both IMERG V06 and GPCP.

## Import Necessary Packages

In [1]:
import xesmf
import gcsfs
import fsspec
import warnings
import numpy as np
import xarray as xr
import planetary_computer
from datetime import datetime
import pystac_client as pystac
warnings.filterwarnings('ignore')

## User-Defined Fields

Define user information (for data download attribution), the directory where the data should be saved to, and subsetting parameters (years, months, and latitude/longitude/pressure level ranges).

In [2]:
AUTHOR    = 'Savannah L. Ferretti'      
EMAIL     = 'savannah.ferretti@uci.edu' 
SAVEDIR   = '/global/cfs/cdirs/m4334/sferrett/monsoon-pod/data/raw'
YEARS     = [2000,2001,2002,2003,2004,2005,2006,2007,2008,2009,2010,2011,2012,2013,2014,2015,2016,2017,2018,2019,2020]
MONTHS    = [6,7,8]
LATRANGE  = (5.,25.) 
LONRANGE  = (60.,90.)
LEVRANGE  = (500.,1000.)

## Import and Preprocess Cloud Datasets

The raw data for this analysis is accessible through publicly available cloud stores. ERA5 data, at its native hourly frequency and 0.25° x 0.25° spatial resolution, can be found on the LEAP Pangeo Data Catalog [here](https://catalog.leap.columbia.edu/feedstock/arco-era5). IMERG V06 data, provided at half-hourly frequency at 0.1° x 0.1° spatial resolution, can be accessed via Microsoft Planetary Computer [here](https://planetarycomputer.microsoft.com/dataset/gpm-imerg). GPCP data, available at daily frequency with 1.0° x 1.0° spatial resolution, is also hosted on the LEAP Pangeo Data Catalog [here](https://catalog.leap.columbia.edu/feedstock/global-precipitation-climatology-project). To efficiently handle these large datasets, the following functions lazily load all data using Xarray.

In [3]:
def get_era5():
    store = 'gs://gcp-public-data-arco-era5/ar/1959-2022-full_37-1h-0p25deg-chunk-1.zarr-v2/'
    ds    = xr.open_zarr(store,decode_times=True)  
    return ds

def get_imerg():
    store   = 'https://planetarycomputer.microsoft.com/api/stac/v1'
    catalog = pystac.Client.open(store,modifier=planetary_computer.sign_inplace)
    assets  = catalog.get_collection('gpm-imerg-hhr').assets['zarr-abfs']
    ds      = xr.open_zarr(fsspec.get_mapper(assets.href,**assets.extra_fields['xarray:storage_options']),consolidated=True)
    return ds

def get_gpcp():
    store = 'https://ncsa.osn.xsede.org/Pangeo/pangeo-forge/gpcp-feedstock/gpcp.zarr'
    ds    = xr.open_dataset(store,engine='zarr',chunks={})   
    return ds

In [4]:
era5data  = get_era5()
imergdata = get_imerg()
gpcpdata  = get_gpcp()

The ```preprocess()``` function preprocesses each variable using the user-defined fields specified above. It executes two functions: one that standardizes dimensions across datasets, and another that subsets the time and space dimensions and specifies pressure levels to keep (if applicable).

In [5]:
def standardize_dims(ds):
    dimnames = {'latitude':'lat','longitude':'lon','level':'lev'}
    ds = ds.rename({oldname:newname for oldname,newname in dimnames.items() if oldname in ds.dims})
    targetdims = ['lev','time','lat','lon'] if 'lev' in ds.dims else ['time','lat','lon']
    extradims  = [dim for dim in ds.dims if dim not in targetdims]
    if extradims:
        ds = ds.drop_dims(extradims)
    for dim in targetdims:
        if dim=='time':
            if ds.coords[dim].dtype.kind!='M':
                ds.coords[dim] = ds.indexes[dim].to_datetimeindex()
            ds = ds.sel(time=~ds.time.to_index().duplicated(keep='first'))
        elif dim=='lon':
            ds.coords[dim] = (ds.coords[dim]+180)%360-180        
        elif dim!='time':
            ds.coords[dim] = ds.coords[dim].astype(float)
    ds = ds.sortby(targetdims).transpose(*targetdims)   
    return ds

def subset_data(ds,years=YEARS,months=MONTHS,latrange=LATRANGE,lonrange=LONRANGE,levrange=LEVRANGE):
    ds = ds.sel(time=(ds['time.year'].isin(years))&(ds['time.month'].isin(months)))
    ds = ds.sel(lat=slice(*latrange),lon=slice(*lonrange))
    if 'lev' in ds.dims:
        ds = ds.sel(lev=slice(*levrange))
    return ds

def preprocess(ds,years=YEARS,months=MONTHS,latrange=LATRANGE,lonrange=LONRANGE,levrange=LEVRANGE):
    ds = standardize_dims(ds)
    ds = subset_data(ds,years,months,latrange,lonrange,levrange)
    return ds

In [6]:
gpcp  = preprocess(gpcpdata)
imerg = preprocess(imergdata)
era5  = preprocess(era5data)

## Extract Necessary Variables

Only four variables are needed from across these three datasets: precipitation (in mm/day) from IMERG V06 and GPCP, and surface pressure (hPa), specific humidity (kg/kg), and temperature (K) from ERA5. Convert units as necessary, and remove unrealistic values. 

In [7]:
gpcpprdata  = gpcp.precip.where(gpcp.precip>=0,0)
imergprdata = (imerg.precipitationCal).where(imerg.precipitationCal>=0,0)*24 # mm/hr to mm/day
psdata = era5.surface_pressure/100 # Pa to hPa
qdata  = era5.specific_humidity
tdata  = era5.temperature

## Create Observational Baselines

Our three datasets (ERA5, IMERG V05, and GPCP) have different temporal frequencies and spatial resolutions. To create consistent, analysis-ready data, we apply ```format_var()``` to individual variables, which regrids (using [xESMF](https://xesmf.readthedocs.io/en/stable/)) and temporally resamples the data as-needed, alongside standardizing their format and metadata. ```combine_vars()``` is then used to combine the processed variables into a single, cohesive dataset.

In [8]:
def regrid_data(da,gridsource,gridtarget):
    if not isinstance(gridsource,(xr.Dataset,xr.DataArray)):
        raise TypeError("Input 'gridsource' must be an xarray Dataset or DataArray")
    if not isinstance(gridtarget,(xr.Dataset,xr.DataArray)):
        raise TypeError("Input 'gridtarget' must be an xarray Dataset or DataArray")
    regridder = xesmf.Regridder(gridsource,gridtarget,method='bilinear')
    return regridder(da)
  
def resample_data(da,frequency,method):
    if frequency not in ['H','D']:
        raise ValueError("Frequency must be 'H' (hourly) or 'D' (daily)")
    da.coords['time'] = da.time.dt.floor(frequency) 
    if method=='mean':
        return da.groupby('time').mean()
    elif method=='first':
        return da.groupby('time').first()
    elif method=='last':
        return da.groupby('time').last()
    else:
        raise ValueError("Method must be 'mean', 'first', or 'last'")

def format_var(da,shortname,longname,units,gridsource=None,gridtarget=None,frequency=None,method=None):
    if gridsource is not None and gridtarget is not None:
        da = regrid_data(da,gridsource,gridtarget)
    if frequency is not None and method is not None:
        da = resample_data(da,frequency,method)
    da = da.rename(shortname)
    da.attrs.clear()
    for coord in da.coords:
        da[coord].attrs.clear()
    da.attrs = dict(long_name=longname,units=units)
    return da
    
def combine_vars(dalist,author=AUTHOR,email=EMAIL):
    if not isinstance(dalist,list):
        raise TypeError('Input must be a list of Xarray.DataArrays')
    if not all(isinstance(da,xr.DataArray) for da in dalist):
        raise TypeError('All elements in the input list must be Xarray.DataArrays')
    ds = xr.merge(dalist)
    ds.time.attrs = dict(long_name='Time')
    ds.lat.attrs  = dict(long_name='Latitude',units='°N')
    ds.lon.attrs  = dict(long_name='Longitude',units='°E')
    if 'lev' in ds.dims:
        ds.lev.attrs = dict(long_name='Pressure level',units='hPa')
    ds.attrs = dict(history=f'Created on {datetime.today().strftime("%Y-%m-%d")} by {author} ({email})')
    print(f'Dataset Size: {ds.nbytes*1e-9:.2f} GB')
    return ds

### Create ERA5/IMERG Baseline

Our high-resolution observational baseline features hourly data on a 0.25° x 0.25° grid. We use ERA5 variables at their native resolution and adjust IMERG VO6 precipitation to match, coarsening its grid and reducing its sampling frequency.

In [10]:
t  = format_var(tdata,shortname='t',longname='ERA5 air temperature',units='K')
q  = format_var(qdata,shortname='q',longname='ERA5 specific humidity',units='kg/kg')
ps = format_var(psdata,shortname='ps',longname='ERA5 surface pressure',units='hPa')
pr = format_var(imergprdata,shortname='pr',longname='IMERG V06 precipitation rate',units='mm/day',
                gridsource=imerg,gridtarget=era5,frequency='H',method='first')

In [11]:
hiresds = combine_vars([t,q,ps,pr])

Dataset Size: 61.81 GB


### Create ERA5/GPCP Baseline

Our low-resolution observational baseline features daily mean data on a 1.0° x 1.0° grid. We use GPCP precipitation at its native resolution and adjust ERA5 variables to match.

In [13]:
t  = format_var(tdata,shortname='t',longname='ERA5 air temperature',units='K',
                gridsource=era5,gridtarget=gpcp,frequency='D',method='mean')
q  = format_var(qdata,shortname='q',longname='ERA5 specific humidity',units='kg/kg',
                gridsource=era5,gridtarget=gpcp,frequency='D',method='mean')
ps = format_var(psdata,shortname='ps',longname='ERA5 surface pressure',units='hPa',
                gridsource=era5,gridtarget=gpcp,frequency='D',method='mean')
pr = format_var(gpcpprdata,shortname='pr',longname='GPCP precipitation rate',units='mm/day')

In [14]:
loresds = combine_vars([t,q,ps,pr])

Dataset Size: 0.17 GB


## Save Baseline Datasets

Save each baseline Xarray.Dataset as a netCDF file to the user-defined save directory (```SAVEDIR```).

In [18]:
def save(ds,filename,savedir=SAVEDIR):
    filepath = f'{savedir}/{filename}'
    ds.to_netcdf(filepath)

In [None]:
%time save(loresds,'ERA5_GPCP.nc')  

In [None]:
# %time save(hiresds,'ERA5_IMERG.nc')