# DPLE: process DPLE fields for FEISTY
 - Based on notebook Liz Maroon's 'DPLE_ENSO_check.ipynb' notebook from RAPCDI-analysis repo
 - data I/O functions based on template from daniel kennedy (djk2120@ucar.edu): https://github.com/djk2120/cesm-lens

In [1]:
import xarray as xr 
import numpy as np  
import os
import cftime
import copy
import scipy.stats
from scipy import signal
import cartopy.crs as ccrs
import glob
import dask
import matplotlib.pyplot as plt
%matplotlib inline

## Create Dask Cluster

In [2]:
# # Close out Dask Cluster and release workers:
# # NOTE:  only run this cell to terminate Dask Cluster!
# cluster.close()
# client.close()

In [3]:
def get_ClusterClient():
    import dask
    from dask_jobqueue import PBSCluster
    from dask.distributed import Client
    cluster = PBSCluster(
        cores=1,
        memory='256GB',
        processes=1,
        queue='casper',
        resource_spec='select=1:ncpus=1:mem=256GB',
        project='NCGD0011',
        walltime='02:00:00',
        interface='ib0',)

    dask.config.set({
        'distributed.dashboard.link':
        'https://jupyterhub.hpc.ucar.edu/stable/user/{USER}/proxy/{port}/status'
    })
    client = Client(cluster)
    return cluster, client

In [4]:
cluster, client = get_ClusterClient()
cluster.scale(30) 

Perhaps you already have a cluster running?
Hosting the HTTP server on port 41173 instead
  http_address["port"], self.http_server.port


### Data I/O functions:
 - Run each of these cells, then proceed to Main Processing
 - Note that these functions are currently hard-wired to retrieve ocean monthly data

In [5]:
def file_dict(filetempl,mem,stmon):
    ''' returns a dictionary of filepaths keyed by initialization year, 
    for a given experiment, field, ensemble member, and initialization month '''
    memstr = '{0:03d}'.format(mem)
    monstr = '{0:02d}'.format(stmon)
    filepaths = {}
    
    filetemp = filetempl.replace('MM',monstr).replace('EEE',memstr)

    #find all the relevant files
    files = glob.glob(filetemp)
        
    for file in files:
        #isolate initialization year from the file name
        ystr = file.split('.pop.h.')[0]
        y0 = int(ystr[-11:-7])
        filepaths[y0]=file
        
    return filepaths

In [6]:
def nested_file_list_by_year(filetemplate,ens,field,firstyear,lastyear,stmon):
    ''' retrieve a nested list of files for these start years and ensemble members'''
    ens = np.array(ens)+1
    yrs = np.arange(firstyear,lastyear+1)
    files = []    # a list of lists, dim0=start_year, dim1=ens
    ix = np.zeros(yrs.shape)+1
    
    for yy,i in zip(yrs,range(len(yrs))):
        ffs = []  # a list of files for this yy
        file0 = ''
        first = True
        for ee in ens:
            filepaths = file_dict(filetemplate,ee,stmon)
            #append file if it is new
            if yy in filepaths.keys():
                file = filepaths[yy]
                if file != file0:
                    ffs.append(file)
                    file0 = file
        
        #append this ensemble member to files
        if ffs:  #only append if you found files
            files.append(ffs)
        else:
            ix[i] = 0
    return files,yrs[ix==1]

In [7]:
## NOTE
## Regulate dask array size using this proprocess function.
## Set appropriately based on analysis to come.
## E.g., currently set to extract POP surface layer and 24 months of data
def preprocess(ds):
    #return ds.isel(z_t=slice(0,15)).isel(time=slice(0,24))
    return ds.isel(time=slice(0,24))

def open_members(in_obj):
    ffs = in_obj[0]  #unwrap the list
    field = in_obj[1]
    ens = in_obj[2]
    lm = in_obj[3]
    chunks = in_obj[4]
    
    d0 = xr.open_mfdataset(ffs,combine='nested',parallel=True,concat_dim='M',data_vars=[field],\
                           chunks=chunks,compat='override', coords='minimal', preprocess=preprocess)
    #added compat=override, coords=minimal here. Assumes that all hindcasts have same dims/coords. Seems a little dangerous
    #but REALLY speeds things up. And we know that the coords are the same for all of SMYLE anyway.

    # quick fix to adjust time vector for monthly data  
    nmonths = len(d0.time)
    yr0 = d0['time.year'][0].values
    d0['time'] =xr.cftime_range(str(yr0),periods=nmonths,freq='MS')
    d0 = d0.assign_coords(M=("M",ens))
    d0 = d0.assign_coords(L=("time",lm))
    d0 = d0.swap_dims({'time': 'L'})
    d0 = d0.reset_coords(["time"])
    
    return d0

In [8]:
def get_monthly_data(filetemplate,ens,leads,field,firstyear,lastyear,stmon,chunks={}):
    ''' returns dask array containing the requested hindcast ensemble '''

    ds = xr.Dataset()    #instantiate Dataset
    lm = np.array(leads)+1
    files,yrs = nested_file_list_by_year(filetemplate,ens,field,firstyear,lastyear,stmon)
    ens = np.array(ens)+1
    
    # all members should have the same number of files, otherwise abort
    nfs = np.array([len(ffs) for ffs in files])
    if np.sum(nfs==nfs[0])==len(nfs):
        complete_set=True   # same number of files
    else:
        raise ValueError('ERROR: Incomplete set of files')
        
    if complete_set: #read all data using map/gather
        dsets = []
        in_obj = [[ffs, field, ens, lm, chunks] for ffs in files]
        dsets = client.map(open_members, in_obj)
        dsets = client.gather(dsets)
        tmp = xr.concat(dsets,dim='Y',data_vars=[field,'time','time_bound'], coords='minimal', compat='override')
        #potentially dangerous compat/coords option - xarray is NOT checking that the coordinates 
        #are the same across all files - pulling values of shared coords from the first file only
        #speeds up read-in time by ~1/3
        tmp = tmp.assign_coords(Y=("Y",yrs))

    ds[field] = tmp[field]
    ds['dz'] = tmp['dz'] #added
    ds['KMT'] = tmp['KMT'] #added
    ds['time'] = tmp['time']
    ds['time_bound'] = tmp['time_bound']
    ds['TAREA'] = tmp['TAREA']
    ds['UAREA'] = tmp['UAREA']

    return ds

### add function for reducing FEISTY fields to 2D

In [9]:
def field_150m_mean(da):
    """compute mean over upper 100 m; assume constant dz"""
    print('compute mean')
    depth_slice = slice(0, 150e2)
    with xr.set_options(keep_attrs=True):
        if 'z_t' in da.dims:
            return da.sel(z_t=depth_slice).mean('z_t')
        elif 'z_t_150m' in da.dims:
            return da.mean('z_t_150m')

        
def field_150m_zint(da, dz):
    """compute integral over upper 100 m; assume constant dz"""
    depth_slice = slice(0, 150e2)

    if 'z_t' in da.dims:            
        dao = (dz * da).sel(z_t=depth_slice).sum('z_t')
    elif 'z_t_150m' in da.dims:
        dao = (dz.isel(z_t=slice(0, 15)).rename({'z_t': 'z_t_150m'}) * da).sum('z_t_150m')
        
    dao.attrs = da.attrs
    dao.attrs['units'] = da.attrs['units'] + ' cm'        
    return dao

def field_at_bottom(da):
    """return a field indexed at the model's bottom layer"""

    tmp_bot = xr.DataArray(np.ones(da[:,:,:, 0, :, :].shape) * np.nan, 
                           dims=tuple(da.dims[i] for i in [0,1,2, 4, 5]),
                           #coords={c: da.coords[c] for c in ['time']},
                          )

    assert KMT.shape == da.shape[-2:]
    
    for j in range(len(da.nlat)):
        for i in range(len(da.nlon)):
            if KMT[j, i] > 0:
                k = int(KMT[j, i] - 1)
                tmp_bot.values[:,:,:, j, i] = da[:,:,:, k, j, i]

                
    return tmp_bot

In [10]:
client

0,1
Client  Scheduler: tcp://10.12.206.60:34892  Dashboard: https://jupyterhub.hpc.ucar.edu/stable/user/kristenk/proxy/41173/status,Cluster  Workers: 12  Cores: 12  Memory: 3.07 TB


# Main Processing

### Read in POP monthly field
- Chosen field is returned as a dask array with leading dimensions of Y (initialization year), M (ensemble member), and L (lead month)
- "time" and "time_bound" variables, which give prediction verification time, are also dimensioned with (Y,L) 

In [11]:
%%time
# DPLE data
# process all 40 ensemble members, first 24 months, all November start dates from 1970-2018:
field = 'TEMP'
datadir = '/glade/campaign/cesm/collections/CESM1-DPLE/ocn/proc/tseries/monthly/'
casename = 'b.e11.BDP.f09_g16.????-MM.EEE'
filetemplate = datadir+field+'/'+casename+'.pop.h.'+field+'.*.nc'
ens = range(40) 
#leadtimes = range(122)
leadtimes = range(24)
firstyear = 1970
lastyear  = 2018
startmonth = 11
chunks={'z_t':60,'nlat':80} #un-commented and change z_t to 15
dple_temp = get_monthly_data(filetemplate,ens,leadtimes,field,firstyear,lastyear,startmonth)
dple_temp.nbytes/1e9 #GB

CPU times: user 11.2 s, sys: 2.46 s, total: 13.7 s
Wall time: 1min 21s


1358.961406304

In [12]:
# Load this in memory to speed up later computations
dple_temp = dple_temp.persist()
KMT = np.int32(dple_temp.KMT.compute())

In [13]:
dso = xr.Dataset() #dple_temp[['TLONG', 'TLAT', 'TAREA']]
#dso['POC_FLUX_IN_bottom'] = tmp_bot

In [14]:
if (field=='TEMP'):
    
#     #TEMP top 150m mean
#     dso['TEMP_150m'] = field_150m_mean(dple_temp[field]).compute()
    
    #TEMP at bottom
    template = dple_temp[field][:,:,:, 0, :, :].squeeze(drop=True) #drop('z_t') 
    dso['TEMP_bottom'] = xr.map_blocks(field_at_bottom, dple_temp[field], template=template)
    
    variables = ['TEMP_bottom'] #,'TEMP_150m'] 
    
    print('processing TEMP fields')
    
elif ('z_t_150m' in dple_temp[field].dims):
    
    print('doing integral',field)
    dso[f'{field}_150m'] = field_150m_zint(dple_temp[field], dple_temp.dz).compute() 
    
    variables = [f'{field}_150m']
    
elif (field=='POC_FLUX_IN'):
    
    print('getting',field, 'at the bottom')
    
    template = dple_temp[field][:,:,:, 0, :, :].squeeze(drop=True)
    dso['POC_FLUX_IN_bottom'] = xr.map_blocks(
        field_at_bottom, dple_temp[field], 
        template=template)
    
    #dso['POC_FLUX_IN_bottom'] = field_at_bottom(dple_temp[field])
    
    variables = ['POC_FLUX_IN_bottom']
    
print('done!')

processing TEMP fields
done!


In [19]:
dso

Unnamed: 0,Array,Chunk
Bytes,4 B,4 B
Shape,(),()
Count,23053 Tasks,1 Chunks
Type,float32,numpy.ndarray
Array Chunk Bytes 4 B 4 B Shape () () Count 23053 Tasks 1 Chunks Type float32 numpy.ndarray,,

Unnamed: 0,Array,Chunk
Bytes,4 B,4 B
Shape,(),()
Count,23053 Tasks,1 Chunks
Type,float32,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,983.04 kB,983.04 kB
Shape,"(384, 320)","(384, 320)"
Count,23053 Tasks,1 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 983.04 kB 983.04 kB Shape (384, 320) (384, 320) Count 23053 Tasks 1 Chunks Type float64 numpy.ndarray",320  384,

Unnamed: 0,Array,Chunk
Bytes,983.04 kB,983.04 kB
Shape,"(384, 320)","(384, 320)"
Count,23053 Tasks,1 Chunks
Type,float64,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,983.04 kB,983.04 kB
Shape,"(384, 320)","(384, 320)"
Count,23053 Tasks,1 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 983.04 kB 983.04 kB Shape (384, 320) (384, 320) Count 23053 Tasks 1 Chunks Type float64 numpy.ndarray",320  384,

Unnamed: 0,Array,Chunk
Bytes,983.04 kB,983.04 kB
Shape,"(384, 320)","(384, 320)"
Count,23053 Tasks,1 Chunks
Type,float64,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,983.04 kB,983.04 kB
Shape,"(384, 320)","(384, 320)"
Count,23053 Tasks,1 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 983.04 kB 983.04 kB Shape (384, 320) (384, 320) Count 23053 Tasks 1 Chunks Type float64 numpy.ndarray",320  384,

Unnamed: 0,Array,Chunk
Bytes,983.04 kB,983.04 kB
Shape,"(384, 320)","(384, 320)"
Count,23053 Tasks,1 Chunks
Type,float64,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,983.04 kB,983.04 kB
Shape,"(384, 320)","(384, 320)"
Count,23053 Tasks,1 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 983.04 kB 983.04 kB Shape (384, 320) (384, 320) Count 23053 Tasks 1 Chunks Type float64 numpy.ndarray",320  384,

Unnamed: 0,Array,Chunk
Bytes,983.04 kB,983.04 kB
Shape,"(384, 320)","(384, 320)"
Count,23053 Tasks,1 Chunks
Type,float64,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,22.65 GB,11.80 MB
Shape,"(48, 40, 24, 384, 320)","(1, 1, 24, 384, 320)"
Count,23053 Tasks,1920 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 22.65 GB 11.80 MB Shape (48, 40, 24, 384, 320) (1, 1, 24, 384, 320) Count 23053 Tasks 1920 Chunks Type float32 numpy.ndarray",40  48  320  384  24,

Unnamed: 0,Array,Chunk
Bytes,22.65 GB,11.80 MB
Shape,"(48, 40, 24, 384, 320)","(1, 1, 24, 384, 320)"
Count,23053 Tasks,1920 Chunks
Type,float32,numpy.ndarray


In [20]:
dso['TEMP_bottom'].isel(Y=4,M=0,L=12).plot(cmap='bwr')

ValueError: Result from applying user function does not contain coordinate variables {'ULAT', 'Y', 'L', 'TLONG', 'z_t', 'ULONG', 'TLAT', 'M'}.

In [19]:
dso_anoms = xr.Dataset()

### Drift removal
##### • Drift removal by lead time

In [20]:
%%time

dple_time_bound = dple_temp.time_bound.load()

#Set the start and end year (inclusive) for the climotology
cl_y1 = 1970
cl_y2 = 2014 

cl_d1 = cftime.DatetimeNoLeap(cl_y1,1,1,0,0,0)
cl_d2 = cftime.DatetimeNoLeap(cl_y2,12,31,23,59,59)

for v in variables:
    
    print('doing variable', v)

    fordrift = dso[v].where((dple_time_bound.mean('d2')>cl_d1) & (dple_time_bound.mean('d2')<cl_d2))
    climodrift = fordrift.mean('M').mean('Y')

    dso_anoms[v] = dso[v] - climodrift

doing variable TEMP_bottom
CPU times: user 40.7 ms, sys: 2.98 ms, total: 43.7 ms
Wall time: 59 ms


In [21]:
dso_anoms[v].isel(Y=4,M=0,L=12).squeeze(drop=True).plot(cmap='bwr')

ValueError: Result from applying user function does not contain coordinate variables {'M', 'TLONG', 'Y', 'TLAT', 'ULAT', 'L', 'z_t', 'ULONG'}.

### write out the data
##### • one netcdf per variable

In [22]:
%%time
USER = os.environ['USER']
dout = f'/glade/work/{USER}/fish-offline'
os.makedirs(dout, exist_ok=True)
#dso

CPU times: user 518 µs, sys: 0 ns, total: 518 µs
Wall time: 2.18 ms


In [23]:
# dso_anoms.load()

In [24]:
%%time

for v in variables:
    dso.to_netcdf(f'{dout}/DPLE-FIESTY-forcing_{v}.nc', mode='w')

CPU times: user 89.3 ms, sys: 4.45 s, total: 4.54 s
Wall time: 5.18 s
