In [1]:
import xarray as xr
import numpy as np
import matplotlib.pyplot as plt
from math import nan
import glob
import dask
import datetime
from dateutil.relativedelta import relativedelta
from functools import partial
import pandas as pd

dask.config.set(**{'array.slicing.split_large_chunks': True})

<dask.config.set at 0x1512349b82d0>

In [12]:
ystart = 1970 # start year of hindcasts 
yend = 2020 # end year of hindcasts
nyears = yend - ystart + 1
initmon=11
initmonstr = str(initmon).zfill(2)
nmems = 20 # the number of hindcast members
memstr = [str(i).zfill(3) for i in np.arange(1,nmems+1,1)] # generating member strings

topdir = "/glade/campaign/cesm/development/espwg/SMYLE-CW3E-L83/timeseries/"
expname = "b.e21.BSMYLE-CW3E-L83.f09_g17."
outpath="/glade/campaign/cgd/cas/islas/python_savs/NCAR_CW3E_SMYLE/DATA_SORT/PRECIP/"

In [3]:
from dask_jobqueue import PBSCluster
from dask.distributed import Client

cluster = PBSCluster(
    cores = 1,
    memory = '30GB',
    processes = 1,
    queue = 'casper',
    local_directory='$TMPDIR',
    resource_spec='select=1:ncpus=1:mem=30GB',
    project='P04010022',
    walltime='01:00:00',
    interface='mgt')

# scale up
cluster.scale(20)

# change your urls to the dask dashboard so that you can see it
dask.config.set({'distributed.dashboard.link':'https://jupyterhub.hpc.ucar.edu/stable/user/{USER}/proxy/{port}/status'})

# Setup your client
client = Client(cluster)

In [4]:
cluster

0,1
Dashboard: https://jupyterhub.hpc.ucar.edu/stable/user/islas/proxy/8787/status,Workers: 0
Total threads: 0,Total memory: 0 B

0,1
Comm: tcp://10.18.206.103:38497,Workers: 0
Dashboard: https://jupyterhub.hpc.ucar.edu/stable/user/islas/proxy/8787/status,Total threads: 0
Started: Just now,Total memory: 0 B


In [9]:
files_precc=[]
files_precl=[]
for iyear in np.arange(ystart,yend+1,1):
    filest = [ glob.glob(topdir+expname+str(iyear)+'-'+initmonstr+'.'+str(imem).zfill(3)+'/atm/proc/tseries/month_1/*.PRECC.*.nc')[0] for imem in np.arange(1,20+1,1) ]
    files_precc.append(filest)

    filest = [ glob.glob(topdir+expname+str(iyear)+'-'+initmonstr+'.'+str(imem).zfill(3)+'/atm/proc/tseries/month_1/*.PRECL.*.nc')[0] for imem in np.arange(1,20+1,1) ]
    files_precl.append(filest)

In [10]:
def preprocessor(ds):
    # sort out the times so that each member has the time 
    timebndavg = np.array(ds.time_bnds, dtype='datetime64[s]').view('i8').mean(axis=1).astype('datetime64[s]')
    ds['time'] = timebndavg
    datestart = pd.to_datetime("1970-"+str(ds.isel(time=0).time.dt.month.values).zfill(2)+"-"+str(ds.isel(time=0).time.dt.day.values).zfill(2),
                               format="%Y-%m-%d")
    time = [ datestart + relativedelta(months = int(i)) for i in np.arange(0,ds.time.size,1) ] 
    ds['time'] = time
    return ds

In [13]:
precc = xr.open_mfdataset(files_precc, combine='nested', concat_dim=['init_year','M'],
                          parallel=True, data_vars=['PRECC'], coords='minimal', compat='override', preprocess = partial(preprocessor))
precc['init_year'] = np.arange(ystart,yend+1,1)
precc = precc.PRECC

#precc.load().to_netcdf(outpath+"PRECC_BSMYLE-CW3E-L83_mon_init"+initmonstr+".nc")

precl = xr.open_mfdataset(files_precl, combine='nested', concat_dim=['init_year','M'],
                          parallel=True, data_vars=['PRECL'], coords='minimal', compat='override', preprocess = partial(preprocessor))
precl['init_year'] = np.arange(ystart,yend+1,1)
precl = precl.PRECL

#precl.load().to_netcdf(outpath+"PRECL_BSMYLE-CW3E-L83_mon_init"+initmonstr+".nc")

prect = precc + precl
prect = prect.rename('PRECT')
prect.load().to_netcdf(outpath+'PRECT_BSMYLE-CW3E-L83_mon_init'+initmonstr+'.nc')

In [14]:
cluster.close()

distributed.client - ERROR - Failed to reconnect to scheduler after 30.00 seconds, closing client
_GatheringFuture exception was never retrieved
future: <_GatheringFuture finished exception=CancelledError()>
concurrent.futures._base.CancelledError
