This script generates the ETCCDI indices for fixed thresholds only (not percentiles) to assess annual record breakers.  
Calculating using the historical WACCM simulations (three members).  

Outputs one file per index:

* Rx1day
* Rx5day  
* PTOT    
* NWD    

* TMX (hottest mean temp)  
* TMN (coldest mean temp)

In [1]:
import warnings
warnings.filterwarnings('ignore')

In [2]:
import numpy as np # data arrays
import xarray as xr # data array manipulation
import pandas as pd
import datetime as dt
import os

Define the precipitation functions

In [3]:
from importlib import reload
import precex_func
import utils #pyclimdex by B Groenks
import tempex_func

In [4]:
import glob

Read in each of the datasets in turn, calculate indices, then write the output to netcdf.

In [5]:
iDir = "/glade/collections/cmip/CMIP6/CMIP/NCAR/CESM2-WACCM/historical/"
oPDir = "/glade/work/maritye/Data/ARISE-SAI/ETCCDI/Historical/PRECT/"
oTDir = '/glade/work/maritye/Data/ARISE-SAI/ETCCDI/Historical/'

In [6]:
ensmem = os.listdir(iDir)
ensmem

['r2i1p1f1', 'r1i1p1f1', 'r3i1p1f1']

**Eventually I want to repeat this task for each of the datasets produced for the ensemble members above. But for now am only calculating on one member.**

**Would this be best carried out as a loop? Or setting to run as a batch job? And if batch job, how do I set that up?**

In [7]:
filenames = glob.glob(iDir + '/r3i1p1f1/day/pr/gn/files/d20190415/pr*')

Spin up a dask server and use that to process anything that can be processed,

In [8]:
import dask
from dask_jobqueue import PBSCluster
from dask.distributed import Client

In [9]:
# Create a PBS cluster object
cluster = PBSCluster(
    job_name = 'dask-wk23-hpc',
    cores = 1,
    memory = '4GiB',
    processes = 1,
    local_directory = '/local_scratch/pbs.$PBS_JOBID/dask/spill',
    resource_spec = 'select=1:ncpus=1:mem=4GB',
    queue = 'casper',
    walltime = '60:00',
    interface = 'ext'
)


In [10]:
# Create the client to load the Dashboard
client = Client(cluster)

In [19]:
client

0,1
Connection method: Cluster object,Cluster type: dask_jobqueue.PBSCluster
Dashboard: https://jupyterhub.hpc.ucar.edu/stable/user/maritye/proxy/39927/status,

0,1
Dashboard: https://jupyterhub.hpc.ucar.edu/stable/user/maritye/proxy/39927/status,Workers: 5
Total threads: 5,Total memory: 20.00 GiB

0,1
Comm: tcp://128.117.208.94:46385,Workers: 5
Dashboard: https://jupyterhub.hpc.ucar.edu/stable/user/maritye/proxy/39927/status,Total threads: 5
Started: 27 minutes ago,Total memory: 20.00 GiB

0,1
Comm: tcp://128.117.208.110:39715,Total threads: 1
Dashboard: https://jupyterhub.hpc.ucar.edu/stable/user/maritye/proxy/37923/status,Memory: 4.00 GiB
Nanny: tcp://128.117.208.110:41543,
Local directory: /local_scratch/pbs.9529692.casper-pbs/dask/spill/dask-scratch-space/worker-sl2nwie7,Local directory: /local_scratch/pbs.9529692.casper-pbs/dask/spill/dask-scratch-space/worker-sl2nwie7

0,1
Comm: tcp://128.117.208.86:36345,Total threads: 1
Dashboard: https://jupyterhub.hpc.ucar.edu/stable/user/maritye/proxy/46497/status,Memory: 4.00 GiB
Nanny: tcp://128.117.208.86:43677,
Local directory: /local_scratch/pbs.9529690.casper-pbs/dask/spill/dask-scratch-space/worker-crbwdot6,Local directory: /local_scratch/pbs.9529690.casper-pbs/dask/spill/dask-scratch-space/worker-crbwdot6

0,1
Comm: tcp://128.117.208.75:40499,Total threads: 1
Dashboard: https://jupyterhub.hpc.ucar.edu/stable/user/maritye/proxy/45221/status,Memory: 4.00 GiB
Nanny: tcp://128.117.208.75:37993,
Local directory: /local_scratch/pbs.9529693.casper-pbs/dask/spill/dask-scratch-space/worker-o7_0onjm,Local directory: /local_scratch/pbs.9529693.casper-pbs/dask/spill/dask-scratch-space/worker-o7_0onjm

0,1
Comm: tcp://128.117.208.103:32783,Total threads: 1
Dashboard: https://jupyterhub.hpc.ucar.edu/stable/user/maritye/proxy/35899/status,Memory: 4.00 GiB
Nanny: tcp://128.117.208.103:45629,
Local directory: /local_scratch/pbs.9529689.casper-pbs/dask/spill/dask-scratch-space/worker-zr9cwqhk,Local directory: /local_scratch/pbs.9529689.casper-pbs/dask/spill/dask-scratch-space/worker-zr9cwqhk

0,1
Comm: tcp://128.117.208.86:33723,Total threads: 1
Dashboard: https://jupyterhub.hpc.ucar.edu/stable/user/maritye/proxy/36367/status,Memory: 4.00 GiB
Nanny: tcp://128.117.208.86:46105,
Local directory: /local_scratch/pbs.9529691.casper-pbs/dask/spill/dask-scratch-space/worker-ws56wa_8,Local directory: /local_scratch/pbs.9529691.casper-pbs/dask/spill/dask-scratch-space/worker-ws56wa_8


In [11]:
# Decide how many workers to support the chunks
num_workers = 6 
cluster.scale(num_workers)

client.wait_for_workers(num_workers)

In [20]:
chunks_dict = {"time": 365}

In [13]:
%%time
dsP= xr.open_mfdataset(filenames, parallel=True, chunks= chunks_dict)

CPU times: user 710 ms, sys: 101 ms, total: 811 ms
Wall time: 7.34 s


In [14]:
dailyp = dsP.pr * 86400 * 1000

In [15]:
dailyp

Unnamed: 0,Array,Chunk
Bytes,24.81 GiB,1.50 GiB
Shape,"(60226, 192, 288)","(3650, 192, 288)"
Dask graph,17 chunks in 37 graph layers,17 chunks in 37 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 24.81 GiB 1.50 GiB Shape (60226, 192, 288) (3650, 192, 288) Dask graph 17 chunks in 37 graph layers Data type float64 numpy.ndarray",288  192  60226,

Unnamed: 0,Array,Chunk
Bytes,24.81 GiB,1.50 GiB
Shape,"(60226, 192, 288)","(3650, 192, 288)"
Dask graph,17 chunks in 37 graph layers,17 chunks in 37 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray


Annual precipitation indices

In [16]:
PRCPTOT = dailyp.sel('time'=slice('1850-01-01','2014-12-31').groupby('time.year').sum('time') # drop the one day of 2015
RX1D = dailyp.sel('time'=slice('1850-01-01','2014-12-31').groupby('time.year').max('time')
RX5D = dailyp.sel('time'=slice('1850-01-01','2014-12-31').rolling(time=5).sum().groupby('time.year').max('time')
NWD = dailyp.sel('time'=slice('1850-01-01','2014-12-31').where(dailyp>=1,1,0).groupby('time.year').sum('time')

In [18]:
NWD

Unnamed: 0,Array,Chunk
Bytes,70.03 MiB,4.22 MiB
Shape,"(166, 192, 288)","(10, 192, 288)"
Dask graph,17 chunks in 96 graph layers,17 chunks in 96 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 70.03 MiB 4.22 MiB Shape (166, 192, 288) (10, 192, 288) Dask graph 17 chunks in 96 graph layers Data type float64 numpy.ndarray",288  192  166,

Unnamed: 0,Array,Chunk
Bytes,70.03 MiB,4.22 MiB
Shape,"(166, 192, 288)","(10, 192, 288)"
Dask graph,17 chunks in 96 graph layers,17 chunks in 96 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray


In [21]:
ptot = PRCPTOT.compute()
ptot

KilledWorker: Attempted to run task ('concatenate-3c1be90e088318c43add1c056bbf038f', 14, 0, 0) on 4 different workers, but all those workers died while running it. The last worker that attempt to run the task was tcp://128.117.208.103:44847. Inspecting worker logs is often a good next step to diagnose what went wrong. For more information see https://distributed.dask.org/en/stable/killed.html.

In [30]:
fileintro = filenames[0].split('/')[16][:-20]

ptotnm = os.path.join(oPDir, (fileintro + 'PTOT_1850-2014.nc'))
rx1nm = os.path.join(oPDir, (fileintro + 'Rx1day_1850-2014.nc'))
rx5nm = os.path.join(oPDir, (fileintro + 'Rx5day_1850-2014.nc'))
nwdnm = os.path.join(oPDir, (fileintro + 'NWD_1850-2014.nc'))

In [31]:
ptotnm #checking filepath

'/glade/work/maritye/Data/ARISE-SAI/ETCCDI/Historical/PRECT/pr_day_CESM2-WACCM_historical_r3i1p1f1_gn_PTOT_1850-2014.nc'

**Can I write some common attributes to save with each index instead of doing it individually?**

In [None]:
ptot = ptot.assign_attributes(description='Gridcell Level Precipitation Indices based on ETCCDI definitions. WACCM Historical CESM2 Simulations run in CMIP6 configuration', 
                history='Created by Mari Tye February 2024.',
                units='mm per year',
                longname = 'Annual Total Precipitation'   )
rx1d = rx1d.assign_attributes(description='Gridcell Level Precipitation Indices based on ETCCDI definitions. WACCM Historical CESM2 Simulations run in CMIP6 configuration', 
                history='Created by Mari Tye February 2024.',
                units='mm per year',
                longname = 'Annual Wettest Day'   )
rx5d = rx5d.assign_attributes(description='Gridcell Level Precipitation Indices based on ETCCDI definitions. WACCM Historical CESM2 Simulations run in CMIP6 configuration', 
                history='Created by Mari Tye February 2024.',
                units='mm per year',
                longname = 'Annual Wettest Pentad'   )
nwd = nwd.assign_attributes(description='Gridcell Level Precipitation Indices based on ETCCDI definitions. WACCM Historical CESM2 Simulations run in CMIP6 configuration', 
                history='Created by Mari Tye February 2024.',
                units='days per year',
                longname = 'Annual Number of Day >1mm Precipitation'   )

In [None]:
ptot.to_netcdf(ptotnm)
rx1d.to_netcdf(rx1nm)
rx5d.to_netcdf(rx5nm)
nwd.to_netcdf(nwdnm)