In [1]:
# Load libraries

# # Plotting utils 
import datetime
import glob
import os
import pickle
import time
import warnings
from datetime import date, timedelta
import Ngl
import cartopy
import cartopy.crs as ccrs
import cartopy.feature as cfeature
import cartopy.util
import matplotlib as matplotlib
import matplotlib.animation as animation
import matplotlib.colors as colors
import matplotlib.dates as mdates
import matplotlib.patches as patches
import matplotlib.pyplot as plt
import matplotlib.ticker as ticker
from matplotlib.ticker import FormatStrFormatter
import metpy.calc as mpc
import numpy as np
import pandas as pd
import scipy.stats as stats
import seaborn as sns
import xarray as xr
from matplotlib.dates import DateFormatter
from metpy.units import units
from metpy import interpolate
from metpy.calc import vertical_velocity
from mpl_toolkits.axes_grid1 import make_axes_locatable

warnings.filterwarnings('ignore')


In [2]:
# Grabbed from Brian M. to use time midpoints, not end periods
def cesm_correct_time(ds):
    """Given a Dataset, check for time_bnds,
       and use avg(time_bnds) to replace the time coordinate.
       Purpose is to center the timestamp on the averaging inverval.   
       NOTE: ds should have been loaded using `decode_times=False`
    """
    assert 'time_bnds' in ds
    assert 'time' in ds
    correct_time_values = ds['time_bnds'].mean(dim='nbnd')
    # copy any metadata:
    correct_time_values.attrs = ds['time'].attrs
    ds = ds.assign_coords({"time": correct_time_values})
    ds = xr.decode_cf(ds)  # decode to datetime objects
    return ds

**Minimal pre-processing**

In [4]:
# - - - - - - - - - - - - - - - 
# Pre-process data while reading in 
# - - - - - - - - - - - - - - - 

def preprocess_h0(ds):
       
    keepVars = ['SWCF','LWCF','TS','CLOUD','FSNS','FLNS','PS','QREFHT',
                'U10','CLDHGH','CLDLIQ','CONCLD','TMQ','P0','hyam','hybm','hyai','hybi',
                'PHIS','USTAR','QT','GCLDLWP',
                'THETAL','CDNUMC','CLDBOT','CLDLOW',
                'CLDMED','CLDTOP','CLDTOT','THLP2_CLUBB','CLOUDCOVER_CLUBB','CLOUDFRAC_CLUBB',
                'RCM_CLUBB','RTP2_CLUBB','RTPTHLP_CLUBB','RVMTEND_CLUBB','STEND_CLUBB','UP2_CLUBB','UPWP_CLUBB',
                'VP2_CLUBB','T','Q','OMEGA','PBLH','U','V','WP2_CLUBB','WP3_CLUBB','WPRCP_CLUBB',
                'WPRTP_CLUBB',
                'WPTHLP_CLUBB','WPTHVP_CLUBB','Z3','PRECT','PRECC',
                'TGCLDCWP','TGCLDLWP','GCLDLWP',
                'LHFLX','SHFLX','TREFHT','RHREFHT']
        
    ds         = cesm_correct_time(ds)
    ds['time'] = ds.indexes['time'].to_datetimeindex() 
        
    ## Select the second simulated day for analysis 
    # iTimeStart_day2  = np.where( (ds.time.values >= (ds.time.values[0] + np.timedelta64(1,'D'))) & 
    #                              (ds.time.values < (ds.time.values[0] + np.timedelta64(2,'D'))))[0]
    dsSel      = ds.isel(time=np.sort((ds.time.values >= (ds.time.values[0] + np.timedelta64(1,'D'))) & 
                                      (ds.time.values < (ds.time.values[0] + np.timedelta64(2,'D')))))[keepVars]
    
    # Compute local time 
    localTimes = dsSel['time'].values - np.timedelta64(5,'h')
    dsSel      = dsSel.assign_coords({"time": localTimes})
    
    return dsSel

def preprocess_h1(ds):
    
    keepVars = [
                'wpthlp','wprtp','rtp2',
                'thlm','rtm','wm_zm','rtm_zm','thlm_zm',
                ]
        
    ds         = cesm_correct_time(ds)
    ds['time'] = ds.indexes['time'].to_datetimeindex() 
        
    ## Select the second simulated day for analysis 
    dsSel      = ds.isel(time=np.sort((ds.time.values >= (ds.time.values[0] + np.timedelta64(1,'D'))) & 
                                      (ds.time.values < (ds.time.values[0] + np.timedelta64(2,'D')))))[keepVars]

    
    # Compute local time 
    localTimes = dsSel['time'].values - np.timedelta64(5,'h')
    dsSel      = dsSel.assign_coords({"time": localTimes})
        
    return dsSel


def preprocess_h2(ds):
    
    varSels = np.asarray([
                      'edmf_upa','edmf_upw','edmf_upqt','edmf_upthl','edmf_cloudfrac','edmf_dnw',
                      'edmf_precc','edmf_uplh',
                      'edmf_ent','edmf_upent','edmf_updet','edmf_upbuoy',

                       ])

    ds         = cesm_correct_time(ds)
    ds['time'] = ds.indexes['time'].to_datetimeindex() 
        
    ## Select the second simulated day for analysis 
    dsSel      = ds[varSels].isel(time=np.sort((ds.time.values >= (ds.time.values[0] + np.timedelta64(1,'D'))) & 
                                               (ds.time.values <= (ds.time.values[0] + np.timedelta64(2,'D')))))
    
    # Compute local time 
    localTimes = dsSel['time'].values - np.timedelta64(5,'h')
    dsSel      = dsSel.assign_coords({"time": localTimes})

    ## Replacing 'missing' updraft values with NaN 
    dsSel = dsSel.where(dsSel['edmf_upthl'] != 0.0).load()

    return dsSel


def preprocess_h2_2d(ds):
    
    varSels = np.asarray([
                      'edmf_cloudfrac','edmf_qtflxup','edmf_thlflxup','edmf_precc',
                      'edmf_S_ATHLTHL','edmf_S_AQTQT','edmf_S_AWW', 'edmf_L0',
                       ])

    ds         = cesm_correct_time(ds)
    ds['time'] = ds.indexes['time'].to_datetimeindex() 
        
    ## Select the second simulated day for analysis 
    dsSel      = ds[varSels].isel(time=np.sort((ds.time.values >= (ds.time.values[0] + np.timedelta64(1,'D'))) & 
                                 (ds.time.values < (ds.time.values[0] + np.timedelta64(2,'D')))))
    
    # Compute local time 
    localTimes = dsSel['time'].values - np.timedelta64(5,'h')
    dsSel      = dsSel.assign_coords({"time": localTimes}).load()    
    return dsSel

def preprocess_h3(ds):

    ds         = cesm_correct_time(ds)
    ds['time'] = ds.indexes['time'].to_datetimeindex() 
        
    ## Select the second simulated day for analysis 
    dsSel      = ds.isel(time=np.sort((ds.time.values >= (ds.time.values[0] + np.timedelta64(1,'D'))) & 
                                 (ds.time.values < (ds.time.values[0] + np.timedelta64(2,'D')))))
    
    # Compute local time 
    localTimes = dsSel['time'].values - np.timedelta64(5,'h')
    dsSel      = dsSel.assign_coords({"time": localTimes}).load()
    
    ## Replacing 'missing' updraft values with NaN 
    dsSel   = dsSel.where(dsSel['thlu_macmic2'] != 0.0)
    
    return dsSel

def preprocessCLM_h0(ds):
    keepVars_CLM = ['SOILWATER_10CM','TSOI_10CM','RAIN','FSA','TG','TSA',
                    'QVEGT','QVEGE','QSOIL','H2OSOI','TSOI','SOILLIQ']

    ds['time'] = ds.indexes['time'].to_datetimeindex() 
        
    ## Select the second simulated day for analysis 
    dsSel      = ds.isel(time=np.sort((ds.time.values >= (ds.time.values[0] + np.timedelta64(1,'D'))) & 
                                 (ds.time.values < (ds.time.values[0] + np.timedelta64(2,'D')))))[keepVars_CLM]
    
    # Compute local time 
    localTimes = dsSel['time'].values - np.timedelta64(5,'h')
    dsSel      = dsSel.assign_coords({"time": localTimes})

    return dsSel

def preprocessCLM_h1(ds):
    
    ds['time'] = ds.indexes['time'].to_datetimeindex() 
        
    ## Select the second simulated day for analysis 
    dsSel      = ds.isel(time=np.sort((ds.time.values >= (ds.time.values[0] + np.timedelta64(1,'D'))) & 
                                 (ds.time.values < (ds.time.values[0] + np.timedelta64(2,'D')))))
    
    # Compute local time 
    localTimes = dsSel['time'].values - np.timedelta64(5,'h')
    dsSel      = dsSel.assign_coords({"time": localTimes})
        
    QFLX = dsSel.QSOIL+dsSel.QVEGE+dsSel.QVEGT
    dsSel['QFLX'] = (('time','pft'), QFLX.values)
    
    return dsSel

def preprocessCLM_h2(ds):
    ds['time'] = ds.indexes['time'].to_datetimeindex() 
        
    ## Select the second simulated day for analysis 
    dsSel      = ds.isel(time=np.sort((ds.time.values >= (ds.time.values[0] + np.timedelta64(1,'D'))) & 
                                 (ds.time.values < (ds.time.values[0] + np.timedelta64(2,'D')))))
    
    # Compute local time 
    localTimes = dsSel['time'].values - np.timedelta64(5,'h')
    dsSel      = dsSel.assign_coords({"time": localTimes})
        
    QFLX = dsSel.QSOIL+dsSel.QVEGE+dsSel.QVEGT
    dsSel['QFLX'] = (('time','landunit'), QFLX.values)
    
    return dsSel

## Start up dask

In [5]:
import dask

from dask_jobqueue import PBSCluster

# For Casper
cluster = PBSCluster(
    queue="casper",
    walltime="01:00:00",
    project="P93300642",
    #memory="4GB",
    #resource_spec="select=1:ncpus=1:mem=4GB",
    memory="10GB",
    resource_spec="select=1:ncpus=1:mem=10GB",
    cores=1,
    processes=1,
)

# # scale as needed
# cluster.adapt(minimum_jobs=1, maximum_jobs=30)
# cluster

# # Scale up
# cluster.scale(8)
# cluster


In [6]:
from dask.distributed import Client

# Connect client to the remote dask workers
client = Client(cluster)
client

0,1
Connection method: Cluster object,Cluster type: dask_jobqueue.PBSCluster
Dashboard: https://jupyterhub.hpc.ucar.edu/stable/user/mdfowler/Extra/proxy/8787/status,

0,1
Dashboard: https://jupyterhub.hpc.ucar.edu/stable/user/mdfowler/Extra/proxy/8787/status,Workers: 0
Total threads: 0,Total memory: 0 B

0,1
Comm: tcp://128.117.208.76:35697,Workers: 0
Dashboard: https://jupyterhub.hpc.ucar.edu/stable/user/mdfowler/Extra/proxy/8787/status,Total threads: 0
Started: Just now,Total memory: 0 B


In [7]:
cluster.scale(24)


In [8]:
client.wait_for_workers(24)


In [9]:
client.status

'running'

In [46]:
# client.shutdown()

In [5]:
# !qstat -u $USER


## Read in data, but don't do a ton of extra processing yet 

In [None]:
%%time
testDir     = '/glade/derecho/scratch/mdfowler/CLASP+MF_diffusionOff/'

case_names  = [
                'clubbMF_L0eq250_diffOff',
                'claspMF_L0eq250_diffOff',
              ]

caseStart = 'FSCAM.T42_T42.arm97.CLASP_CLUBBMF_'

caseStrings = [
    'usePatchDataTRUE_25each_L0eq250_nup25_newHETterm_simplExtraPlume_diffOff_allJJA.',
    'usePatchDataTRUE_25each_L0eq250_nup25_newHETterm_simplExtraPlume_diffOff_allJJA.'
]

nens=25

for iCase in range(len(case_names)):
    print('*** Starting on case %s ***' % (case_names[iCase]))

    ## Get list of files 
    listFiles_h0 = np.sort(glob.glob(testDir+caseStart+caseStrings[iCase]+'*cam.h0.2016*'))
    listFiles_h1 = np.sort(glob.glob(testDir+caseStart+caseStrings[iCase]+'*cam.h1.2016*'))
    listFiles_h2 = np.sort(glob.glob(testDir+caseStart+caseStrings[iCase]+'*cam.h2.2016*'))
    listFiles_h3 = np.sort(glob.glob(testDir+caseStart+caseStrings[iCase]+'*cam.h3.2016*'))

    listFilesCLM_h2 = np.sort(glob.glob(testDir+caseStart+caseStrings[iCase]+'*clm2.h2.2016*'))
    listFilesCLM_h1 = np.sort(glob.glob(testDir+caseStart+caseStrings[iCase]+'*clm2.h1.2016*'))
    listFilesCLM_h0 = np.sort(glob.glob(testDir+caseStart+caseStrings[iCase]+'*clm2.h0.2016*'))        

    case_h0 = xr.open_mfdataset(listFiles_h0,  preprocess=preprocess_h0, concat_dim='time', 
                                combine='nested', decode_times=False, 
                                data_vars='minimal', parallel='True')
    print('h0 files loaded')
    case_h1 = xr.open_mfdataset(listFiles_h1,  preprocess=preprocess_h1, concat_dim='time', 
                                combine='nested', decode_times=False, 
                                data_vars='minimal', parallel='True')
    print('h1 files loaded')

    case_h2 = xr.open_mfdataset(listFiles_h2,  preprocess=preprocess_h2, concat_dim='time', 
                                combine='nested', decode_times=False, 
                                data_vars='minimal', parallel='True')
    print('h2 files loaded with mfdataset')
    case_h2_2d = xr.open_mfdataset(listFiles_h2,  preprocess=preprocess_h2_2d, concat_dim='time', 
                                combine='nested', decode_times=False, 
                                data_vars='minimal', parallel='True')
    print('h2 2D data loaded with mfdataset')
    
    case_h3 = xr.open_mfdataset(listFiles_h3,  preprocess=preprocess_h3, concat_dim='time', 
                                combine='nested', decode_times=False, 
                                data_vars='minimal', parallel='True')
    print('h3 files loaded with mfdataset')
        
    # Merge cases and load
    caseFull = xr.merge([case_h1, case_h0])
    del case_h0,case_h1
    
    caseH2   = xr.merge([case_h2, case_h2_2d])
    caseH2   = caseH2.assign_coords({"nens": np.arange(nens)})
    del case_h2,case_h2_2d

    if iCase==0:
        ncyc = len(case_h3.ncyc.values)
    
    caseH3   = case_h3.assign_coords({"nens": np.arange(nens)})
    caseH3   = caseH3.assign_coords({"ncyc": np.arange(ncyc)})
    del case_h3    
    # del case_h0,case_h1,case_h2,case_h2_2d,case_h3

    print('DS merged and loaded')

    ## Label with case info 
    caseFull_allDays   = caseFull.squeeze().assign_coords({"case":  case_names[iCase]})
    caseH2_allDays     = caseH2.squeeze().assign_coords({"case":  case_names[iCase]})
    caseH3_allDays     = caseH3.squeeze().assign_coords({"case":  case_names[iCase]})


    ## Read in CLM datasets too
    # - - - - - - - - - - - - - - - - - - - - 
    # caseCLM_h0 = xr.open_mfdataset(listFilesCLM_h0,  preprocess=preprocessCLM_h0, concat_dim='time', 
    #                             combine='nested', decode_times=True, 
    #                             data_vars='minimal', parallel='True')
    # print('CLM_h0 files loaded with mfdataset')

    caseCLM_h1 = xr.open_mfdataset(listFilesCLM_h1,  preprocess=preprocessCLM_h1, concat_dim='time', 
                                combine='nested', decode_times=True, 
                                data_vars='minimal', parallel='True')
    print('CLM_h1 files loaded with mfdataset')
    
    caseCLM_h2 = xr.open_mfdataset(listFilesCLM_h2,  preprocess=preprocessCLM_h2, concat_dim='time', 
                                combine='nested', decode_times=True, 
                                data_vars='minimal', parallel='True')
    print('CLM_h2 files loaded with mfdataset')

    caseCLMh1_allDays = caseCLM_h1.assign_coords({"case":  case_names[iCase]})
    caseCLMh2_allDays = caseCLM_h2.assign_coords({"case":  case_names[iCase]})

    del caseCLM_h1,caseCLM_h2

    ## Combine everthing into one larger array 
    # - - - - - - - - - - - - - - - - - - - - 
    if iCase==0:
        scamDS    = caseFull_allDays
        del caseFull_allDays
        scamDS_h2 = caseH2_allDays
        del caseH2_allDays
        scamDS_h3 = caseH3_allDays
        del caseH3_allDays
        
        clmDS_h1 = caseCLMh1_allDays
        del caseCLMh1_allDays
        clmDS_h2 = caseCLMh2_allDays
        del caseCLMh2_allDays
    else: 
        scamDS    = xr.concat([scamDS, caseFull_allDays], "case") 
        del caseFull_allDays
        scamDS_h2 = xr.concat([scamDS_h2, caseH2_allDays], "case") 
        del caseH2_allDays
        scamDS_h3 = xr.concat([scamDS_h3, caseH3_allDays], "case") 
        del caseH3_allDays
        
        clmDS_h1 = xr.concat([clmDS_h1, caseCLMh1_allDays], "case") 
        del caseCLMh1_allDays
        clmDS_h2 = xr.concat([clmDS_h2, caseCLMh2_allDays], "case") 
        del caseCLMh2_allDays
         


*** Starting on case clubbMF_L0eq250_diffOff ***
h0 files loaded
h1 files loaded
h2 files loaded with mfdataset
h2 2D data loaded with mfdataset


2024-02-26 12:37:31,959 - distributed.scheduler - ERROR - Couldn't gather keys: {('where-1dbad44dce53b6c24d7c357e40b2d2af', 0, 0, 0, 0, 0, 0): 'memory', ('where-e77d1fda21f2ba576b9297fd5e31fbe8', 0, 0, 0, 0, 0, 0): 'memory', ('where-20713d2d5bfd4f44fbc09fa2a89546b1', 0, 0, 0, 0, 0, 0, 0): 'memory', ('where-98f45375178f9d986c74630a4554d803', 0, 0, 0, 0, 0, 0): 'memory', ('where-9a25aaac326e6e75573a7f027edff437', 0, 0, 0, 0, 0, 0): 'memory', ('where-a20f06be539a8f0d7e96287b2aa34cf0', 0, 0, 0, 0, 0, 0): 'memory', ('where-29e53ca7f62e909f4c0352843e8cdb6a', 0, 0, 0, 0, 0, 0, 0): 'memory', ('where-4e2ba32e1de1b5e7d0e49df8f0cedb3d', 0, 0, 0, 0, 0, 0): 'memory'}
2024-02-26 12:37:31,968 - distributed.scheduler - ERROR - Couldn't gather keys: {('where-97655930404e862605893a66504f2a7a', 0, 0, 0, 0, 0, 0): 'waiting', ('where-83dd87140077a93b02dfbf732afabd19', 0, 0, 0, 0, 0, 0): 'waiting'}
2024-02-26 12:37:31,971 - distributed.scheduler - ERROR - Couldn't gather keys: {('where-8ae440d57738ade7c34a4

**Okay, now how about some *more efficient* processing?**

Test the old way first:

In [5]:
# - - - - - - - - - - - - - - - 
# Additional processing after files are read in 
# - - - - - - - - - - - - - - - 

def process_camData(DS):
    
    ## Interpolate to standard levels 
    # - - - - - - - - - - - - - - - - - - - - - - - -
    print('Beginning interpolation...') 
    
    # Decide on levels to interpoalte to and add to larger arrays
    pnew64 = np.arange(200.0,980.0,10.0) 
    
    DS = DS.assign_coords({"levInterp": pnew64})

    varSels = np.asarray(['THLP2_CLUBB','RTP2_CLUBB','RTPTHLP_CLUBB','WPRTP_CLUBB','WPTHLP_CLUBB','WP3_CLUBB','WP2_CLUBB','UP2_CLUBB',
                          'VP2_CLUBB','Z3','U','V','T','Q','OMEGA','RVMTEND_CLUBB','STEND_CLUBB','CLDLIQ','CLOUD','CLOUDFRAC_CLUBB',
                          'UPWP_CLUBB','THETAL',
                          'CONCLD','QT','GCLDLWP',
                          'wpthlp','wprtp','rtp2',
                          'thlm','rtm','wm_zm','rtm_zm','thlm_zm',
                          ])

    for iVar in range(len(varSels)): 

        DS[varSels[iVar]] = DS[varSels[iVar]].expand_dims({'lat': 1}, axis=-1)
        DS[varSels[iVar]] = DS[varSels[iVar]].expand_dims({'lon': 1}, axis=-1)
        
        # Interpolate variables and add to larger arrays 
        interpVar_real = interpolateToPressure_v2(DS, varSels[iVar], pnew64)

        if len(np.shape(interpVar_real))==2: 
            DS[varSels[iVar]+'_interp']  = (('time','levInterp'), interpVar_real)
        elif len(np.shape(interpVar_real))==3: 
            DS[varSels[iVar]+'_interp']  = (('time','nens','levInterp'), interpVar_real)

        
    return DS


def process_camData_h2(DS, DSctrl):
    ## Interpolate to standard levels 
    # - - - - - - - - - - - - - - - - - - - - - - - -
    print('Beginning interpolation...') 
    
    # Decide on levels to interpoalte to and add to larger arrays
    pnew64 = np.arange(200.0,980.0,10.0) 
    
    DS = DS.assign_coords({"levInterp": pnew64})
    varSels = np.asarray([
                          'edmf_upa','edmf_upw','edmf_upqt','edmf_upthl',
                          'edmf_cloudfrac','edmf_dnw','edmf_precc',
                          'edmf_qtflxup','edmf_thlflxup',
                          'edmf_S_ATHLTHL','edmf_S_AQTQT','edmf_S_AWW',
                           'edmf_ent','edmf_upent','edmf_updet','edmf_upbuoy',

                           ])

    for iVar in range(len(varSels)): 
        DS[varSels[iVar]] = DS[varSels[iVar]].expand_dims({'lat': 1}, axis=-1)
        DS[varSels[iVar]] = DS[varSels[iVar]].expand_dims({'lon': 1}, axis=-1)

        # Interpolate variables and add to larger arrays 
        interpVar_real = interpolateToPressure_v2_h2(DS, DSctrl, varSels[iVar], pnew64)

        if len(np.shape(interpVar_real))==2: 
            DS[varSels[iVar]+'_interp']  = (('time','levInterp'), interpVar_real)
        elif len(np.shape(interpVar_real))==3: 
            DS[varSels[iVar]+'_interp']  = (('time','nens','levInterp'), interpVar_real)

    return DS

def process_camData_h3(DS, DSctrl):
    
    ## Interpolate to standard levels 
    # - - - - - - - - - - - - - - - - - - - - - - - -
    print('Beginning interpolation...') 
    
    # Decide on levels to interpoalte to and add to larger arrays
    pnew64 = np.arange(200.0,980.0,10.0) 
    
    DS = DS.assign_coords({"levInterp": pnew64})
    
    varSels = np.asarray(['up_macmicAvg', 'dn_macmicAvg','upa_macmicAvg','dna_macmicAvg',
               'thlu_macmicAvg','qtu_macmicAvg','thld_macmicAvg','qtd_macmicAvg' ])
    
    for iVar in range(len(varSels)): 
        DS[varSels[iVar]] = DS[varSels[iVar]].expand_dims({'lat': 1}, axis=-1)
        DS[varSels[iVar]] = DS[varSels[iVar]].expand_dims({'lon': 1}, axis=-1)

        # Interpolate variables and add to larger arrays 
        interpVar_real = interpolateToPressure_v2_h3(DS, DSctrl, varSels[iVar], pnew64)

        if len(np.shape(interpVar_real))==2: 
            DS[varSels[iVar]+'_interp']  = (('time','levInterp'), interpVar_real)
        elif len(np.shape(interpVar_real))==3: 
            DS[varSels[iVar]+'_interp']  = (('time','nens','levInterp'), interpVar_real)
        elif len(np.shape(interpVar_real))==4: 
            DS[varSels[iVar]+'_interp']  = (('time','ncyc','nens','levInterp'), interpVar_real)
        
    return DS


In [10]:
# - - - - - - - - - - - - - - - 
# Pressure interpolation 
# - - - - - - - - - - - - - - - 

def interpolateToPressure_v2(DS, varName, pressGoals):
    p0mb = DS.P0.values/100        # mb

    # Pull out hya/hyb profiles 
    hyam = np.squeeze(DS.hyam.values)[:]
    hybm = np.squeeze(DS.hybm.values)[:]
    hyai = np.squeeze(DS.hyai.values)[:]
    hybi = np.squeeze(DS.hybi.values)[:]

    # Surface pressure with time dimension
    PS   = DS.PS.values              # Pa

    # Converting variables: 
    if np.shape(DS[varName].values)[1]==len(DS.ilev.values):
        varInterp = Ngl.vinth2p(DS[varName].values,hyai,hybi,pressGoals,PS,1,p0mb,1,True)
    elif np.shape(DS[varName].values)[1]==len(DS.lev.values):
        varInterp = Ngl.vinth2p(DS[varName].values,hyam,hybm,pressGoals,PS,1,p0mb,1,True)
    ## Handle data that's by-plume for EDMF output
    elif np.shape(DS[varName].values)[1]==len(DS.nens.values):
        varInterp = np.full([len(DS.time.values), len(DS.nens.values) ,
                             len(pressGoals)], np.nan)
        
        for iEns in range(len(DS.nens.values)):
            varInterp[:,iEns,:] = np.squeeze(Ngl.vinth2p(DS[varName].values[:,iEns,:],hyai,hybi,pressGoals,PS,1,p0mb,1,True))

    saveOut = varInterp
    
    return saveOut

def interpolateToPressure_v2_h2(DS, DSctrl, varName, pressGoals):
    p0mb = DSctrl.P0.values/100        # mb

    # Pull out hya/hyb profiles 
    hyam = np.squeeze(DSctrl.hyam.values)[:]
    hybm = np.squeeze(DSctrl.hybm.values)[:]
    hyai = np.squeeze(DSctrl.hyai.values)[:]
    hybi = np.squeeze(DSctrl.hybi.values)[:]

    # Surface pressure with time dimension
    PS   = DSctrl.PS.values              # Pa 

    # Converting variables: 
    if np.shape(DS[varName].values)[1]==len(DS.ilev.values):
        varInterp = Ngl.vinth2p(DS[varName].values,hyai,hybi,pressGoals,PS,1,p0mb,1,True)
    elif np.shape(DS[varName].values)[1]==len(DS.lev.values):
        varInterp = Ngl.vinth2p(DS[varName].values,hyam,hybm,pressGoals,PS,1,p0mb,1,True)
    ## Handle data that's by-plume for EDMF output
    elif np.shape(DS[varName].values)[1]==len(DS.nens.values):
        varInterp = np.full([len(DS.time.values), len(DS.nens.values) ,
                             len(pressGoals)], np.nan)
        
        for iEns in range(len(DS.nens.values)):
            varInterp[:,iEns,:] = np.squeeze(Ngl.vinth2p(DS[varName].values[:,iEns,:],hyai,hybi,pressGoals,PS,1,p0mb,1,True))

    saveOut = varInterp
    
    return saveOut


def interpolateToPressure_v2_h3(DS, DSctrl, varName, pressGoals):
    p0mb = DSctrl.P0.values/100        # mb

    # Pull out hya/hyb profiles 
    hyam = np.squeeze(DSctrl.hyam.values)[:]
    hybm = np.squeeze(DSctrl.hybm.values)[:]
    hyai = np.squeeze(DSctrl.hyai.values)[:]
    hybi = np.squeeze(DSctrl.hybi.values)[:]

    # Surface pressure with time dimension
    PS   = DSctrl.PS.values              # Pa 

    # Converting variables: 
    if np.shape(DS[varName].values)[1]==len(DS.ilev.values):
        varInterp = Ngl.vinth2p(DS[varName].values,hyai,hybi,pressGoals,PS,1,p0mb,1,True)
    elif np.shape(DS[varName].values)[1]==len(DS.lev.values):
        varInterp = Ngl.vinth2p(DS[varName].values,hyam,hybm,pressGoals,PS,1,p0mb,1,True)
    ## Handle data that's by-plume for EDMF output
    elif np.shape(DS[varName].values)[1]==len(DS.nens.values):
        varInterp = np.full([len(DS.time.values), len(DS.nens.values),
                             len(pressGoals)], np.nan)
        
        for iEns in range(len(DS.nens.values)):
            varInterp[:,iEns,:] = np.squeeze(Ngl.vinth2p(DS[varName].values[:,iEns,:],hyai,hybi,pressGoals,PS,1,p0mb,1,True))
 
    ## Handle data that's by-plume *and* by subcycle for EDMF output
    elif np.shape(DS[varName].values)[2]==len(DS.nens.values):
        varInterp = np.full([len(DS.time.values), len(DS.ncyc.values), len(DS.nens.values) ,
                             len(pressGoals)], np.nan)
        
        for iEns in range(len(DS.nens.values)):
            for iCyc in range(len(DS.ncyc.values)):
                varInterp[:,iCyc,iEns,:] = np.squeeze(Ngl.vinth2p(DS[varName].values[:,iCyc,iEns,:],hyai,hybi,pressGoals,PS,1,p0mb,1,True))

            
    saveOut = np.squeeze(varInterp)
    
    return saveOut

In [7]:
def combineMacmic_beforeInterp(ds_h3):
    var1    = ['up_macmic1', 'dn_macmic1','upa_macmic1','dna_macmic1',
           'thlu_macmic1','qtu_macmic1','thld_macmic1','qtd_macmic1']

    var2    = ['up_macmic2', 'dn_macmic2','upa_macmic2','dna_macmic2',
               'thlu_macmic2','qtu_macmic2','thld_macmic2','qtd_macmic2']

    varSave = ['up_macmicAvg', 'dn_macmicAvg','upa_macmicAvg','dna_macmicAvg',
               'thlu_macmicAvg','qtu_macmicAvg','thld_macmicAvg','qtd_macmicAvg']

    for iVar in range(len(var1)):
#         print('Computing %s' % (varSave[iVar]))

        varCyc1 = ds_h3[var1[iVar]]
        varCyc2 = ds_h3[var2[iVar]]
        
        ## ADDED 8/8/23: Need to filter out areas that are zero...
        condition1 = ds_h3['upa_macmic1']>0
        subset_ds1 = varCyc1.where(condition1)

        condition2 = ds_h3['upa_macmic2']>0
        subset_ds2 = varCyc2.where(condition2)

        ## This seems to work... 
        # s = np.stack((varCyc1, varCyc2))
        s = np.stack((subset_ds1, subset_ds2))
        # C = np.nansum(s, axis=0)
        C = np.nanmean(s, axis=0)
        C[np.all(np.isnan(s), axis=0)] = np.nan

        # ds_h3[varSave[iVar]]  = (('time','nens','ilev','lat','lon'), C)
        ds_h3[varSave[iVar]]  = (('time','nens','ilev'), np.squeeze(C))
        
        # ds_h3[varSave[iVar]] = ds_h3[varSave[iVar]].expand_dims({'lat': 1}, axis=-1)
        # ds_h3[varSave[iVar]] = ds_h3[varSave[iVar]].expand_dims({'lon': 1}, axis=-1)
        
        if var1[iVar]!='upa_macmic1': 
            ds_h3 = ds_h3.drop_vars(var1[iVar])
        if var2[iVar]!='upa_macmic2':
            ds_h3 = ds_h3.drop_vars(var2[iVar])
    
    return ds_h3

In [9]:
scamDS['PS'] = scamDS['PS'].expand_dims({'lat': 1}, axis=-1)
scamDS['PS'] = scamDS['PS'].expand_dims({'lon': 1}, axis=-1)

**Start up dask server here**

In [21]:
%%time 
scamDS.load() 
print('Hey look! scamDS all loaded in.')



Hey look! scamDS all loaded in.
CPU times: user 28.1 s, sys: 2.5 s, total: 30.6 s
Wall time: 39.2 s


In [69]:
%%time
scamDS_h2.load()
# scamDS_h2.compute()
# scamDS_h2 = client.persist(scamDS_h2)

print('scamDS_h2 made it too! All warshed up and clean')


Task exception was never retrieved
future: <Task finished name='Task-897869' coro=<Client._gather.<locals>.wait() done, defined at /glade/u/apps/opt/conda/envs/npl-2024a/lib/python3.11/site-packages/distributed/client.py:2208> exception=AllExit()>
Traceback (most recent call last):
  File "/glade/u/apps/opt/conda/envs/npl-2024a/lib/python3.11/site-packages/distributed/client.py", line 2217, in wait
    raise AllExit()
distributed.client.AllExit
Task exception was never retrieved
future: <Task finished name='Task-898264' coro=<Client._gather.<locals>.wait() done, defined at /glade/u/apps/opt/conda/envs/npl-2024a/lib/python3.11/site-packages/distributed/client.py:2208> exception=AllExit()>
Traceback (most recent call last):
  File "/glade/u/apps/opt/conda/envs/npl-2024a/lib/python3.11/site-packages/distributed/client.py", line 2217, in wait
    raise AllExit()
distributed.client.AllExit
Task exception was never retrieved
future: <Task finished name='Task-898175' coro=<Client._gather.<loc

KeyboardInterrupt: 

Task exception was never retrieved
future: <Task finished name='Task-898216' coro=<Client._gather.<locals>.wait() done, defined at /glade/u/apps/opt/conda/envs/npl-2024a/lib/python3.11/site-packages/distributed/client.py:2208> exception=AllExit()>
Traceback (most recent call last):
  File "/glade/u/apps/opt/conda/envs/npl-2024a/lib/python3.11/site-packages/distributed/client.py", line 2217, in wait
    raise AllExit()
distributed.client.AllExit
Task exception was never retrieved
future: <Task finished name='Task-898452' coro=<Client._gather.<locals>.wait() done, defined at /glade/u/apps/opt/conda/envs/npl-2024a/lib/python3.11/site-packages/distributed/client.py:2208> exception=AllExit()>
Traceback (most recent call last):
  File "/glade/u/apps/opt/conda/envs/npl-2024a/lib/python3.11/site-packages/distributed/client.py", line 2217, in wait
    raise AllExit()
distributed.client.AllExit
Task exception was never retrieved
future: <Task finished name='Task-898467' coro=<Client._gather.<loc

In [None]:
%%time 

## Process data 
procDS_case0    = process_camData( scamDS.isel(case=0) )
procDS_case1    = process_camData( scamDS.isel(case=1) )
procDS = xr.concat([procDS_case0, procDS_case1], "case") 
print('Done with procDS')
#del scamDS

procDS_h2_case0 = process_camData_h2( scamDS_h2.isel(case=0) , procDS.isel(case=0)  )
procDS_h2_case1 = process_camData_h2( scamDS_h2.isel(case=1) , procDS.isel(case=1)  )
procDS_h2 = xr.concat([procDS_h2_case0, procDS_h2_case1], "case") 
## Drop excessive variables to save space
procDS_h2 = procDS_h2.drop_vars(['ntrk','ntrn','ntrm','gw','hyam','hybm',                                  
                             'P0','hyai','hybi','date','datesec','time_bnds','date_written',
                             'time_written','ndbase','nsbase','nbdate','nbsec','mdt','ndcur',
                             'nscur','co2vmr','ch4vmr','n2ovmr','f11vmr','f12vmr','sol_tsi','nsteph',
                              ## Also remove downdraft data (not active here)
                             'qtd_macmicAvg_interp','thld_macmicAvg_interp',
                              'qtd_macmicAvg','thld_macmicAvg','dna_macmicAvg',
                              'dn_macmicAvg','upa_macmic2','upa_macmic1','edmf_dnw_interp',
                              # 'dn_macmicAvg_interp', 'dna_macmicAvg_interp',  
                              ## Also drop macmicAvg if not interpolated 
                              'up_macmicAvg','upa_macmicAvg','thlu_macmicAvg','qtu_macmicAvg',
                              ## Riskier - but drop the things that are just the last time-step
                              # 'edmf_upa','edmf_upw','edmf_upqt','edmf_upthl',
                                ])
print('Done with procDS_h2')
#del scamDS_h2

procDS_h3_combine_case0 = combineMacmic_beforeInterp(scamDS_h3.isel(case=0))
procDS_h3_combine_case1 = combineMacmic_beforeInterp(scamDS_h3.isel(case=1))
procDS_h3_combine = xr.concat([procDS_h3_combine_case0, procDS_h3_combine_case1], "case")
procDS_h3_combine.load() 
print('We loaded h3!')
# del scamDS_h3
procDS_h3_case0 = process_camData_h3( procDS_h3_combine.isel(case=0), procDS.isel(case=0)  )
procDS_h3_case1 = process_camData_h3( procDS_h3_combine.isel(case=1), procDS.isel(case=1)  )
procDS_h3 = xr.concat([procDS_h3_case0, procDS_h3_case1], "case")
del procDS_h3_combine
print('Done with procDS_h3')

procDS_h2 = xr.merge([procDS_h2, procDS_h3])
del procDS_h3
        
## Combine all the cases into 
case_allDays      = procDS
h2_allDays        = procDS_h2
del procDS_h2, procDS




Task exception was never retrieved
future: <Task finished name='Task-898160' coro=<Client._gather.<locals>.wait() done, defined at /glade/u/apps/opt/conda/envs/npl-2024a/lib/python3.11/site-packages/distributed/client.py:2208> exception=AllExit()>
Traceback (most recent call last):
  File "/glade/u/apps/opt/conda/envs/npl-2024a/lib/python3.11/site-packages/distributed/client.py", line 2217, in wait
    raise AllExit()
distributed.client.AllExit
Task exception was never retrieved
future: <Task finished name='Task-898547' coro=<Client._gather.<locals>.wait() done, defined at /glade/u/apps/opt/conda/envs/npl-2024a/lib/python3.11/site-packages/distributed/client.py:2208> exception=AllExit()>
Traceback (most recent call last):
  File "/glade/u/apps/opt/conda/envs/npl-2024a/lib/python3.11/site-packages/distributed/client.py", line 2217, in wait
    raise AllExit()
distributed.client.AllExit
Task exception was never retrieved
future: <Task finished name='Task-898458' coro=<Client._gather.<loc

Beginning interpolation...


Task exception was never retrieved
future: <Task finished name='Task-899614' coro=<Client._gather.<locals>.wait() done, defined at /glade/u/apps/opt/conda/envs/npl-2024a/lib/python3.11/site-packages/distributed/client.py:2208> exception=AllExit()>
Traceback (most recent call last):
  File "/glade/u/apps/opt/conda/envs/npl-2024a/lib/python3.11/site-packages/distributed/client.py", line 2217, in wait
    raise AllExit()
distributed.client.AllExit
Task exception was never retrieved
future: <Task finished name='Task-899652' coro=<Client._gather.<locals>.wait() done, defined at /glade/u/apps/opt/conda/envs/npl-2024a/lib/python3.11/site-packages/distributed/client.py:2208> exception=AllExit()>
Traceback (most recent call last):
  File "/glade/u/apps/opt/conda/envs/npl-2024a/lib/python3.11/site-packages/distributed/client.py", line 2217, in wait
    raise AllExit()
distributed.client.AllExit
Task exception was never retrieved
future: <Task finished name='Task-899723' coro=<Client._gather.<loc

In [None]:
client.status

In [27]:
client.shutdown()

In [21]:
DSctrl = procDS.isel(case=0)
DS = ds1
varName = 'up_macmicAvg'
pressGoals = np.arange(200.0,980.0,10.0) 



p0mb = DSctrl.P0.values/100        # mb

# Pull out hya/hyb profiles 
hyam = np.squeeze(DSctrl.hyam.values)[:]
hybm = np.squeeze(DSctrl.hybm.values)[:]
hyai = np.squeeze(DSctrl.hyai.values)[:]
hybi = np.squeeze(DSctrl.hybi.values)[:]

# Surface pressure with time dimension
PS   = DSctrl.PS.values              # Pa 

# Converting variables: 
if np.shape(DS[varName].values)[1]==len(DS.ilev.values):
    varInterp = Ngl.vinth2p(DS[varName].values,hyai,hybi,pressGoals,PS,1,p0mb,1,True)
elif np.shape(DS[varName].values)[1]==len(DS.lev.values):
    varInterp = Ngl.vinth2p(DS[varName].values,hyam,hybm,pressGoals,PS,1,p0mb,1,True)
## Handle data that's by-plume for EDMF output
elif np.shape(DS[varName].values)[1]==len(DS.nens.values):
    varInterp = np.full([len(DS.time.values), len(DS.nens.values),
                         len(pressGoals)], np.nan)
    
    for iEns in range(len(DS.nens.values)):
        varInterp[:,iEns,:] = Ngl.vinth2p(DS[varName].values[:,iEns,:],hyai,hybi,pressGoals,PS,1,p0mb,1,True)



In [20]:
len(DS.nens.values)

25

In [None]:
def process_camData_h3(DS, DSctrl):
    
    ## Interpolate to standard levels 
    # - - - - - - - - - - - - - - - - - - - - - - - -
    print('Beginning interpolation...') 
    
    # Decide on levels to interpoalte to and add to larger arrays
    pnew64 = np.arange(200.0,980.0,10.0) 
    
    DS = DS.assign_coords({"levInterp": pnew64})
    
    varSels = np.asarray(['up_macmicAvg', 'dn_macmicAvg','upa_macmicAvg','dna_macmicAvg',
               'thlu_macmicAvg','qtu_macmicAvg','thld_macmicAvg','qtd_macmicAvg' ])
    
    for iVar in range(len(varSels)): 
        # Interpolate variables and add to larger arrays 
        interpVar_real = interpolateToPressure_v2_h3(DS, DSctrl, varSels[iVar], pnew64)

        if len(np.shape(interpVar_real))==2: 
            DS[varSels[iVar]+'_interp']  = (('time','levInterp'), interpVar_real)
        elif len(np.shape(interpVar_real))==3: 
            DS[varSels[iVar]+'_interp']  = (('time','nens','levInterp'), interpVar_real)
        elif len(np.shape(interpVar_real))==4: 
            DS[varSels[iVar]+'_interp']  = (('time','ncyc','nens','levInterp'), interpVar_real)
        
    return DS


In [None]:


def interpolateToPressure_v2_h3(DS, DSctrl, varName, pressGoals):
    p0mb = DSctrl.P0.values/100        # mb

    # Pull out hya/hyb profiles 
    hyam = np.squeeze(DSctrl.hyam.values)[:]
    hybm = np.squeeze(DSctrl.hybm.values)[:]
    hyai = np.squeeze(DSctrl.hyai.values)[:]
    hybi = np.squeeze(DSctrl.hybi.values)[:]

    # Surface pressure with time dimension
    PS   = DSctrl.PS.values              # Pa 

    # Converting variables: 
    if np.shape(DS[varName].values)[1]==len(DS.ilev.values):
        varInterp = Ngl.vinth2p(DS[varName].values,hyai,hybi,pressGoals,PS,1,p0mb,1,True)
    elif np.shape(DS[varName].values)[1]==len(DS.lev.values):
        varInterp = Ngl.vinth2p(DS[varName].values,hyam,hybm,pressGoals,PS,1,p0mb,1,True)
    ## Handle data that's by-plume for EDMF output
    elif np.shape(DS[varName].values)[1]==len(DS.nens.values):
        varInterp = np.full([len(DS.time.values), len(DS.nens.values),
                             len(pressGoals)], np.nan)
        
        for iEns in range(len(DS.nens.values)):
            varInterp[:,iEns,:] = Ngl.vinth2p(DS[varName].values[:,iEns,:,],hyai,hybi,pressGoals,PS,1,p0mb,1,True)
 
    ## Handle data that's by-plume *and* by subcycle for EDMF output
    elif np.shape(DS[varName].values)[2]==len(DS.nens.values):
        varInterp = np.full([len(DS.time.values), len(DS.ncyc.values), len(DS.nens.values) ,
                             len(pressGoals)], np.nan)
        
        for iEns in range(len(DS.nens.values)):
            for iCyc in range(len(DS.ncyc.values)):
                varInterp[:,iCyc,iEns,:] = Ngl.vinth2p(DS[varName].values[:,iCyc,iEns,:],hyai,hybi,pressGoals,PS,1,p0mb,1,True)

            
    saveOut = varInterp

In [None]:
%% time
plumeLabel = np.full([len(scamDS_h2.case.values), len(scamDS_h2.time.values), len(scamDS_h2.nens.values)], 'SurfaceAvg')

sigDig     = 9

for iCase in range(len(clmDS_h2.case.values)):

    for iT in range(len(clmDS_h1.time.values)-1):

        # Sel time QFLX and edmf_uplh 
        this_uplh    = scamDS_h2.isel(case=(iCase)).isel(time=iT, ilev=-1).edmf_uplh.values
        this_qflx    = clmDS_h1.isel(case=(iCase)).isel(time=iT).QFLX 
        this_qflx_LU = clmDS_h2.isel(case=iCase).isel(time=iT).QFLX 
        this_upa     = scamDS_h2.isel(case=(iCase)).isel(time=iT, ilev=-1).edmf_upa.values

        iGrass = np.where(np.around(this_uplh, sigDig) == np.around(this_qflx.values[0],sigDig))[0]
        iCrop  = np.where(np.around(this_uplh, sigDig) == np.around(this_qflx.values[1],sigDig))[0]

        # Land-unit means...
        iUrban = np.where(np.around(this_uplh, sigDig) == np.around(this_qflx_LU.values[2],sigDig))[0]
        iLake  = np.where(np.around(this_uplh, sigDig) == np.around(this_qflx_LU.values[3],sigDig))[0]
        
        ## TODO: For safety, since using a round off digit, should confirm that things aren't being marked as two PFTs 

        ## Also fill in if the plume is not active...
        this_upa  = scamDS_h2.isel(case=(iCase)).isel(time=iT, ilev=-1).edmf_upa.values
        iMiss     = np.where(this_upa==0.0)[0]

        plumeLabel[iCase, iT, iGrass] = 'C3grass'
        plumeLabel[iCase, iT, iCrop]  = 'IrrigCrop'
        plumeLabel[iCase, iT, iLake]  = 'Lake'
        plumeLabel[iCase, iT, iUrban] = 'Urban'
        plumeLabel[iCase, iT, iMiss]  = 'Off'

scamDS_h2['plumeLabel'] = (('case','time','nens'), plumeLabel)


In [None]:
%% time 
## Get number of plumes at each time/level
nPlumesActive = scamDS_h2.edmf_upa_interp.count(dim='nens')
print(np.shape(nPlumesActive))

scamDS['nPlumesActive'] = (('case','time','levInterp'), nPlumesActive.values)

# grassDS = scamDS_h2.where(scamDS_h2.plumeLabel == 'C3grass')
# cropDS  = scamDS_h2.where(scamDS_h2.plumeLabel == 'IrrigCrop')
# lakeDS  = scamDS_h2.where(scamDS_h2.plumeLabel == 'Lake')
# urbanDS = scamDS_h2.where(scamDS_h2.plumeLabel == 'Urban')

scamDS['nPlumesActive_grass'] = (('case','time','levInterp'), scamDS_h2.edmf_upa_interp.where(scamDS_h2.plumeLabel == 'C3grass').count(dim='nens').values)
scamDS['nPlumesActive_crop']  = (('case','time','levInterp'), scamDS_h2.edmf_upa_interp.where(scamDS_h2.plumeLabel == 'IrrigCrop').count(dim='nens').values)
scamDS['nPlumesActive_urban'] = (('case','time','levInterp'), scamDS_h2.edmf_upa_interp.where(scamDS_h2.plumeLabel == 'Urban').count(dim='nens').values)
scamDS['nPlumesActive_lake']  = (('case','time','levInterp'), scamDS_h2.edmf_upa_interp.where(scamDS_h2.plumeLabel == 'Lake').count(dim='nens').values)
