In [None]:
%load_ext autoreload
%autoreload 2
import xarray as xr 
import numpy as np  
import cftime
import datetime
import copy
import scipy.stats
from scipy import signal
from functools import partial
import glob
import dask
import gsw
import matplotlib as mpl
import matplotlib.pyplot as plt
import matplotlib.units as munits
from matplotlib.dates import ConciseDateConverter
munits.registry[cftime.DatetimeNoLeap] = ConciseDateConverter()
munits.registry[cftime.datetime] = ConciseDateConverter()
import cartopy
import cartopy.crs as ccrs
import cartopy.feature as cfeature
from cartopy.util import add_cyclic_point
import matplotlib.ticker as mticker
%matplotlib inline

import os

In [2]:
import dask
from dask.distributed import wait
dask.__version__

'2021.09.0'

## Create Dask Cluster

In [3]:
# Close out Dask Cluster and release workers:
# NOTE:  only run this cell to terminate Dask Cluster!
#cluster.close()
#client.close()

In [4]:
# Use this if computing annual means:
def get_ClusterClient():
    import dask
    from dask_jobqueue import PBSCluster
    from dask.distributed import Client
    cluster = PBSCluster(
        cores=1,
        memory='50GB',
        processes=1,
        queue='casper',
        resource_spec='select=1:ncpus=1:mem=50GB',
        project='NCGD0011',
        walltime='02:00:00',
        interface='ib0',)

    dask.config.set({
        'distributed.dashboard.link':
        'https://jupyterhub.hpc.ucar.edu/stable/user/{USER}/proxy/{port}/status',
        'array.slicing.split_large_chunks': True
    })
    client = Client(cluster)
    return cluster, client

cluster, client = get_ClusterClient()
cluster.scale(72) 

Perhaps you already have a cluster running?
Hosting the HTTP server on port 45178 instead
  f"Port {expected} is already in use.\n"


In [5]:
cluster

0,1
Dashboard: https://jupyterhub.hpc.ucar.edu/stable/user/fredc/proxy/45178/status,Workers: 0
Total threads: 0,Total memory: 0 B

0,1
Comm: tcp://10.12.206.5:38722,Workers: 0
Dashboard: https://jupyterhub.hpc.ucar.edu/stable/user/fredc/proxy/45178/status,Total threads: 0
Started: Just now,Total memory: 0 B


# Main Processing

In [6]:
%%time
# get POP grid
infile='/glade/work/fredc/metric/grid/POP_tx0.1v3_grid.nc'
dsg = xr.open_dataset(infile)
tlon = dsg.TLONG.persist()
tlat = dsg.TLAT.persist()
tarea = dsg.TAREA.persist()
dzt = dsg.DZT.persist()
dzu = dsg.DZU.persist()
dxt = dsg.DXT.persist()
dyt = dsg.DYT.persist()
dxu = dsg.DXU.persist()
dyu = dsg.DYU.persist()
htn = dsg.HTN.persist()
hte = dsg.HTE.persist()
ht = dsg.HT.persist()
hu = dsg.HU.persist()

CPU times: user 22.6 ms, sys: 8.83 ms, total: 31.4 ms
Wall time: 168 ms


In [7]:
temp_path = '/glade/campaign/collections/cmip/CMIP6/iHESP/BRCP85/HR/B.E.13.BRCP85C5CN.ne120_t12.sehires38.003.sunway.CN_OFF/ocn/proc/tseries/month_1/B.E.13.BRCP85C5CN.ne120_t12.sehires38.003.sunway.CN_OFF.pop.h.TEMP.*.nc'
temp_files = sorted(glob.glob(temp_path))
temp_files = temp_files[:-2]

In [8]:
salt_path = '/glade/campaign/collections/cmip/CMIP6/iHESP/BRCP85/HR/B.E.13.BRCP85C5CN.ne120_t12.sehires38.003.sunway.CN_OFF/ocn/proc/tseries/month_1/B.E.13.BRCP85C5CN.ne120_t12.sehires38.003.sunway.CN_OFF.pop.h.SALT.*.nc'
salt_files = sorted(glob.glob(salt_path))
salt_files = salt_files[:-2]

In [9]:
nf = len(temp_files)

In [None]:
for n in range(nf):

    dst = xr.open_mfdataset(temp_files[n],
                        parallel=True, # use dask to read in parallel
                        combine="nested",  # combine files in order provided
                        concat_dim="time",  # concatentate files along time dimension
                        data_vars="minimal",  # only concatentate data variables with "time" dimension
                        coords="minimal",  # same as above for coordinate variables
                        compat="override",
                        #preprocess=preprocess, # run this function on each dataset first before combining
                        chunks={"nlat": "auto", "nlon": "auto", "z_t":"auto"}, # choose simple
                       )
    dst['time'] = dst.time - datetime.timedelta(15) #middle of the month
    temp = dst.TEMP
    month_length = dst.time.dt.days_in_month
    wgts = month_length.groupby("time.year") / month_length.groupby("time.year").sum()
    ytemp = (temp * wgts).resample(time="AS").sum(dim="time") / (xr.ones_like(temp) * wgts).resample(time="AS").sum(dim="time")
    ytemp = ytemp.where(ytemp!=0)

    dss = xr.open_mfdataset(salt_files[n],
                        parallel=True, # use dask to read in parallel
                        combine="nested",  # combine files in order provided
                        concat_dim="time",  # concatentate files along time dimension
                        data_vars="minimal",  # only concatentate data variables with "time" dimension
                        coords="minimal",  # same as above for coordinate variables
                        compat="override",
                        #preprocess=preprocess, # run this function on each dataset first before combining
                        chunks={"nlat": "auto", "nlon": "auto", "z_t":"auto"}, # choose simple
                       )    
    dss['time'] = dst['time']
    salt = dss.SALT
    ysalt = (salt * wgts).resample(time="AS").sum(dim="time") / (xr.ones_like(salt) * wgts).resample(time="AS").sum(dim="time")
    ysalt = ysalt.where(ysalt!=0)
    
    p = gsw.p_from_z(-dst.z_t/100., tlat)
    SA = gsw.SA_from_SP(ysalt, p, tlon, tlat)
    CT = gsw.CT_from_pt(SA, ytemp)
    rho = gsw.rho(SA,CT,p)
    rho['z_t'] = dzt.z_t
    
    #Write to netcdf
    dso = rho.to_dataset(name='RHO')
    
    outdir = outdir = os.path.dirname(temp_files[n]).replace('month_1','year_1')
    fout = os.path.split(temp_files[n])[-1].split('.')[:-3]
    fout.append('RHO')
    fout.append('{:04d}'.format(dso.time.dt.year[0].values))
    fout.append('nc')
    fout = '.'.join(fout)
    fout = os.path.join(outdir,fout)
    
    dso.to_netcdf(fout, unlimited_dims='time')