In [None]:
## 20240624 The following code is migrated from Paul Vautraver's
## RCE_COL_XR_FINISHED.ipynb with updates from Sylvia Sullivan.

import numpy as np
import xarray as xr
import pandas as pd
import matplotlib.pyplot as plt
from scipy import ndimage
import random
import scipy 
from dask.distributed import Client, progress, wait

import warnings
warnings.filterwarnings(action='ignore')

### Client Details
Information is called concerning the dask set up

In [None]:
client = Client()
client

### Constants
Important physical constants are given here

In [None]:
g = 9.81
MWw = 0.01802 # kg mol-1
MWa = 0.02897 # kg mol-1
rho_air = 1.3 # kg m-3
rho_water = 1000 # kg m-3
eps = MWw / MWa

### Functions

In [None]:
def sst_file_opener(sst):
    #Opens a directory of files, each with different time steps, corresponding to a single sea surface temperature (sst).
    #Each time step has netcdf data in x,y,z. The output is a joined dataset in time,x,y,z, for a single sst.
    
    #numerical sst value cast into string
    sst_str = str(sst)
    
    #path created
    path = '/xdisk/sylvia/RCE-CAPE-exploration/RCE-sims/ch_cam' + sst_str +'ri0/'
    
    #dataset created,combining different timesteps in the sst directory
    dataset = xr.open_mfdataset(path+'ch_cam'+sst_str+'ri0_4096x64x64_3km_12s_cam'+sst_str+'ri0_64_*.nc',combine='by_coords')

    return dataset

def precipConversion(qp):
    #Function to convert QP to precipitation intensity in units of mm per hour
    
    rho_air = 1.3  # [kg m-3 air] Technically this should account for altitude dependence
    rho_liq = 1000 # [kg m-3 liq water / rain]
    del_t   = 0.5  # [h] Temporal resolution of the RCE sims
    del_x   = 3000 # [m] Spatial resolution of the RCE sims

    # Calculate rain rate [m h-1] and return a value in [m h-1]
    # Why do I multiply by del_x**2 rather than dividing? 
    # Assume 1 m3 of air and 1 m3 of rain. The rain is then distributed over a grid cell of area del_x**2 at some rate.
    rr = qp * rho_air / rho_liq / del_t * del_x**2
    
    # Final factor of 1000 here converst [m h-1] to [mm h-1]
    return rr / 1000

def storm_labeller(binary_t_xr):
    #Function that takes in a binary structure in 3d (time,x and y) and which finds spatially connected
    #components in each time step, i.e. identifies clusters without using temporal connectivity
    
    #Connectivity employed is 4 connectivity
    
    #creating connected components structure for ndimage
    #no temporal connection between clusters, so empty 0th and 2nd array
    struct = np.zeros((3,3,3))

    #simple 4 connection within current time set
    struct[1] = [[0,1,0],[1,1,1],[0,1,0]]

    #labels_0 is array of clusters with unique numerical labels, dim=(time,x,y)
    #nb_0 is number of identified clusters
    labels_0,nb_0 = ndimage.label(binary_t_xr,structure=struct)

    #find the sizes of each object, in terms of pixels , dim=(number of connected elements found)
    pixelsizes = ndimage.sum(binary_t_xr,labels_0,range(nb_0+1))

    #create a boolean array for whether the objects 
    #are larger than a given threshold
    #the threshold was set to 310 by Paul but I think it should be 706 for an eq diameter of 90 km
    mask_size = pixelsizes < 705
    updated_pixelsizes = pixelsizes[pixelsizes >= 705]
    updated_pixelsizes = xr.DataArray( updated_pixelsizes ).rename( 'cluster_sizes' )

    #indices of pixels to be removed are produced
    #remove_pixel,dim=(time,x,y)
    remove_pixel = mask_size[labels_0]
  
    #indices and thus objects are removed based on object size
    labels_0[remove_pixel] = 0
    
    labels_0 = xr.where(labels_0==0,np.nan,labels_0)
    
    #cluster array cast into xr array
    labelled_xr = xr.DataArray(labels_0,
                               dims=['time','y','x'],
                               coords={'time':binary_t_xr.time,'y':binary_t_xr.y,
                                       'x':binary_t_xr.x})
    
    return labelled_xr.rename('clusters'), updated_pixelsizes

def buoy_xr(x_arr,mcs_xr):
    #function to calculate the buoyancy field, outputs x,y,z array per time array
    
    #environmental temperature,qv and qn are called
    #filter out the cells where we have MCSs
    tenv = x_arr.TABS * xr.where( mcs_xr.isnull(), 1, np.nan )
    tenv_xr = tenv.mean(dim=['x','y'],skipna=True)

    qvenv = x_arr.QV * xr.where( mcs_xr.isnull(), 1, np.nan )
    qvenv_xr = qvenv.mean(dim=['x','y'])

    qnenv = x_arr.QN * xr.where( mcs_xr.isnull(), 1, np.nan )
    qnenv_xr = qnenv.mean(dim=['x','y'])
    
    #buoyancy field calculated
    buoyancy_xr = g*(x_arr.TABS - tenv_xr)/tenv_xr + (MWw/MWa)*(qvenv_xr - x_arr.QV) - (qnenv_xr - x_arr.QN)

    #filtered for negative buoyancy
    buoyancy_xr = xr.where(buoyancy_xr<0,0,buoyancy_xr)
    
    return buoyancy_xr.rename('buoyancy')

def svpl_xr(t_arr):
    #function to calculate the saturation vapor pressure over liquid water as a function of temperature
    
    R = 8.314             # J mol-1 K-1
    MWw = 18.015/1000     # kg mol-1
    rhoa = 1.395
    a1 = 54.842763
    a2 = -6763.22
    a3 = -4.21
    a4 = 0.000367
    a5 = 0.0415
    a6 = 218.8
    a7 = 53.878
    a8 = -1331.22
    a9 = -9.44523
    a10 = 0.014025
    
    factor = a7 + a8/t_arr + a9*xr.ufuncs.log(t_arr) + a10*t_arr
    
    psatL = a1 + a2/t_arr + a3*xr.ufuncs.log(t_arr) + a4*t_arr + xr.ufuncs.arctan(a5*(t_arr - a6))*factor
    
    psatL = xr.ufuncs.exp(psatL)
    
    return psatL

def sd_xr(xr_arr):
    #function to calculate the saturation deficit
    
    return (eps*svpl_xr(xr_arr.TABS))/(xr_arr.p*100 - svpl_xr(xr_arr.TABS))*1000 - xr_arr.QV

def sd_masterfunc(dataset,p1,p2,p3):
    #function to calculate the lower-tropospheric saturation deficit
    #as the average of values at three levels (generally 850, 750, and 500 hPa, following Singh et al. 2017)
    
    ds_1 = dataset.where(dataset.p>p1,drop=True).isel(z=-1)

    ds_2 = dataset.where(dataset.p>p2,drop=True).isel(z=-1)

    ds_3 = dataset.where(dataset.p>p3,drop=True).isel(z=-1)
    
    #sd_mean created and given an xr name
    sd_mean = ((sd_xr(ds_1)+sd_xr(ds_2)+sd_xr(ds_3))/3).rename('SD')
    
    return sd_mean

### Super Function
Now all of the established functions are used within the final data collecting function, where the fields are produced, including the cluster field, and they are combined into a single xr array output file.

In [None]:
def super_xr(ds_xr):
    #super function that uses all of the above functions, as well as simple one line xr routines
    #to compile xy fields for each variable, at each timestep. Omega, the pressure velocity, is 
    #also calculated and stored with z dimensions as well.
    
    #ds_xr is a dataset in xr array format, naturally assuming all variables called below
    
    #Surface precipitation values filtered
    z = ds_xr.z
    psurf = ds_xr.QP.isel(z=0).rename('psurf')
    
    #precipitation rate found using conversion
    p_rate = precipConversion(psurf).rename('p_rate')
    
    #condensate
    #filtered for temperatures below 245K
    qn_245 = ds_xr.QN.where(ds_xr.TABS<245)
    
    #moisture array backfilled along z and lowest array is selected, i.e. first values
    #rising in z are selected
    qn_245 = qn_245.bfill('z').isel(z=0).rename('qn245')
    
    #filtered against threshold moisture content
    #changed the threshold from 10**(-4)
    mcs_xr = xr.where(qn_245<10**(-4),0,1).rename('binary')
    
    #should take into account changing air density etc
    #but constant factor is good first approximation
    #the following cancel: divide QV by 1000 to convert g kg-1 -> kg kg-1, multiply cwvc by 1000 to convert m -> mm
    cwvc = (ds_xr.QV.integrate('z')*(1.3/1000)).rename('cwvc')
    
    #calculating the cwvc at saturation to evaluate the CSF
    #saturation mixing ratio [g kg-1]
    qvsat = (eps*svpl_xr(ds_xr.TABS))/(ds_xr.p*100 - svpl_xr(ds_xr.TABS))*1000
    #as above for cwvc: divide QV by 1000 to convert g kg-1 -> kg kg-1, multiply cwvc by 1000 to convert m -> mm
    cwvc_sat = (qvsat.integrate('z')*(1.3/1000))
    csf = (cwvc / cwvc_sat).rename('csf')
    
    #calculating csf in different layers
    #lowest layer from 1007.7 to 953.44 hPa // layer from 937.32 to 867.26 hPa // layer from 833.72 to 713.09 hPa
    #layer from 671.36 to 521.89 hPa
    low_indx = [0, 6, 10, 14]
    up_indx = [6, 10, 14, 19]
    tag_name = ['_BL', '_TROP1', '_TROP2', '_TROP3']
    for l, u, k, tt in zip(low_indx, up_indx, np.arange(4), tag_name):
        ds_xr['qv'+tt] = ds_xr.QV[:,l:u].rename({'z':'z'+tt})
        ds_xr['qvsat'+tt] = qvsat[:,l:u].rename({'z':'z'+tt})
        
        cwvc_layer = (ds_xr['qv'+tt].integrate('z'+tt)*(1.3/1000))
        cwvc_sat_layer = (ds_xr['qvsat'+tt].integrate('z'+tt)*(1.3/1000))
        ds_xr['csf'+tt] = (cwvc_layer / cwvc_sat_layer)
    
    #pressure velocity found
    #Sylvia adding ascent and descent rate below
    omega = (-(ds_xr.W)*g*rho_air).rename('omega')
    ds_xr['ascent'] = xr.where( omega<0, -1.*omega, np.nan )
    ds_xr['descent'] = xr.where( omega>0, omega, np.nan )
    
    #saturation deficit found
    sd_mean = sd_masterfunc(ds_xr,550,700,850)
    
    #slowest step, connected components over 400*64*4096
    #no temporal connectivity
    #clustering using 4-connectivity, this step finds our mesoscale storms
    clusters, cluster_sizes = storm_labeller(mcs_xr) # clusters [=] (400 times, 64 lats, 4096 lons) as other fields
    cluster_sizes = cluster_sizes.rename( {'dim_0':'clusters'} )
    
    #buoyancy calculated and made into a new xr array
    #buoyancy_xr = buoy_xr(ds_xr)
    #Paul's calculation above. Really, we want to take buoyancy relative to an environment that excludes MCSs
    buoyancy_xr = buoy_xr(ds_xr,clusters)
    
    #cape array made by integrating over z
    #cape = buoyancy_xr.integrate('z').rename('CAPE')
    #Paul's calculation above. Really, we want only to integrate from cloud base to cloud top, assume FAT - ztop = 200K
    tmean = ds_xr.TABS.mean(dim=['time','x','y']).values
    i = np.argwhere( (tmean[:-1]-tmean[1:]) < 0 )[0,0]
    buoyancy_xr = buoyancy_xr[:,:i].rename({'z':'zTROP'})
    cape = buoyancy_xr.integrate('zTROP').rename('CAPE')
    
    #new environmental arrays are combined into a final xr array
    #3D temperature and moisture structure in the storm retained to find precip efficiency
    measurements_xr = xr.combine_nested([ p_rate, cape, cwvc, csf, ds_xr.csf_BL, ds_xr.csf_TROP1,
                                          ds_xr.csf_TROP2, ds_xr.csf_TROP3, sd_mean, clusters, omega,
                                          ds_xr.ascent, ds_xr.descent, ds_xr.TABS, ds_xr.QV, ds_xr.p, ds_xr.QN ],
                                        concat_dim=[None], coords='minimal',
                                        compat='override' )
    
    return measurements_xr, cluster_sizes

## Execution 1
Extract the mean conditions (SD, CAPE, etc.) for grid cells in the MCS where its precipitation = mean or 99th percentile value

In [None]:
%%time
def mask_and_average( data, threshold ):
    # Mask data where 'pmax' is greater or equal to the threshold and return the masked data
    ds1 = data.where( data['p_rate'] >= threshold, drop=True )
    return ds1.mean( dim='stacked_time_y_x' )

def _main_execution1_(sst):
    #rce data opened, xr arrays generated, then environmental data is collocated against the
    #clusters using groupby. Mean and Max 
    basedir = '/xdisk/sylvia/RCE-CAPE-exploration/'
    
    #open dataset corresponding to a given sea surface temperature
    ds = sst_file_opener(sst)
    
    #All environmental variable fields and clusters are collected for time,x,y,z
    output_xr, cluster_sizes_xr = super_xr(ds)
    #display(output_xr)
    
    #fields are collocated against clusters
    output_col = output_xr.groupby( 'clusters' )
    print('cluster sizes saved')
    cluster_sizes_xr.to_netcdf( basedir + 'clusters_' + str(sst) + '.nc' )
    
    #identify where the 99th percentile in rainfall is, p_rate_99 [=] clusters
    p_rate_99 = output_col.map( lambda group: group['p_rate'].quantile( 0.99, skipna=True ) )

    #then average over values in a cluster where p_rate > the 99th percentile
    selected_measurements = output_col.map( lambda group: mask_and_average( group, p_rate_99.sel( clusters=group.clusters ) ))
    print( '99 calculated' )
    file_path = basedir + 'RCE_COL_99_' + str(sst) + '.nc'
    selected_measurements.to_netcdf( file_path )
    print( '99 written' )
    
    #identify where the mean rainfall is, p_rate_mean [=] clusters
    p_rate_mean = output_col.map( lambda group: group['p_rate'].mean( skipna=True ) )

    #then average over values in a cluster where p_rate > the 99th percentile
    selected_measurements = output_col.map( lambda group: mask_and_average( group, p_rate_mean.sel( clusters=group.clusters ) ))
    print( 'mean calculated' )
    file_path = basedir + 'RCE_COL_MEAN_' + str(sst) + '.nc'
    selected_measurements.to_netcdf( file_path )
    print( 'mean written' )
    return

_main_execution1_(280)
_main_execution1_(285)
_main_execution1_(290)
_main_execution1_(295)
_main_execution1_(300)
_main_execution1_(305)
_main_execution1_(310)
print('done')

## Execution 2
Extract the mean or 99th percentile conditions (SD, CAPE, etc.) over ALL grid cells in the MCS

In [None]:
%%time
def _main_execution2_(sst):
    #rce data opened, xr arrays generated, then environmental data is collocated against the
    #clusters using groupby. Mean and Max 
    basedir = '/xdisk/sylvia/RCE-CAPE-exploration/all-var-percentile/'
    
    #open dataset corresponding to a given sea surface temperature
    ds = sst_file_opener(sst)
    
    #All environmental variable fields and clusters are collected for time,x,y,z
    output_xr, cluster_sizes_xr = super_xr(ds)
    
    #fields are collocated against clusters
    output_col = output_xr.groupby('clusters')
    cluster_sizes_xr.to_netcdf( basedir + 'clusters_' + str(sst) + '.nc' )
    
    #cluster groups are aggregated by taking the 99th percentile value of the clusters
    output_99 = output_col.quantile( 0.99, skipna=True )
    print( '99 calculated' )
    
    #99th percentile collocated variables are saved
    output_99.to_netcdf( basedir + 'RCE_COL_99_' + str(sst) +'.nc' )
    print( '99 written' )
    
    #cluster groups are aggregated by taking max values of the clusters
    output_max = output_col.max()
    print( 'max calculated' )
    
    #max collocated variables are saved
    output_max.to_netcdf( basedir + 'RCE_COL_MAX_' + str(sst) +'.nc' )
    print( 'max written' )
    
    #cluster groups are aggregated by taking the 95th percentile value of the clusters
    output_95 = output_col.quantile( 0.95, skipna=True )
    print( '95 calculated' )
    
    #95th percentile collocated variables are saved
    output_95.to_netcdf( basedir + 'RCE_COL_95_' + str(sst) +'.nc' )
    print( '95 written' )
    
    #cluster groups are aggregated by taking mean values of the clusters
    output_mean = output_col.mean()
    print( 'mean calculated' )
    
    #mean collocated variables are saved
    output_mean.to_netcdf( basedir + 'RCE_COL_MEAN_' + str(sst) +'.nc' )
    print( 'mean written' )
    
    return

_main_execution2_(280)
_main_execution2_(285)
_main_execution2_(290)
_main_execution2_(295)
_main_execution2_(300)
_main_execution2_(305)
_main_execution2_(310)
print('done')