# Process TTE-related variables from the CESM-LE

In [1]:
%matplotlib inline
import os
import shutil

from glob import glob

import cftime

import numpy as np
import xarray as xr

import matplotlib.pyplot as plt
import matplotlib.colors as colors

import cartopy.crs as ccrs
from cartopy.util import add_cyclic_point

import intake
import pop_tools
import esmlab
import util

import warnings
warnings.filterwarnings('ignore')

## Spin up dask cluster

In [2]:
%%time

import dask

# Use dask jobqueue
from dask_jobqueue import PBSCluster

# Import a client
from dask.distributed import Client

# Setup your PBSCluster
cluster = PBSCluster(
    cores=2, # The number of cores you want
    memory='256 GB', # Amount of memory
    processes=1, # How many processes
    queue='casper', # The type of queue to utilize (/glade/u/apps/dav/opt/usr/bin/execcasper)
    local_directory='$TMPDIR', # Use your local directory
    resource_spec='select=1:ncpus=2:mem=256GB', # Specify resources
    project='NCGD0011', # Input your project ID here
    walltime='02:00:00', # Amount of wall time
    interface='ib0', # Interface to use
)
# Scale up
cluster.scale(32)

# Change your url to the dask dashboard so you can see it
dask.config.set({'distributed.dashboard.link':'https://jupyterhub.hpc.ucar.edu/stable/user/{USER}/proxy/{port}/status'})

# Setup your client
client = Client(cluster)

CPU times: user 254 ms, sys: 312 ms, total: 567 ms
Wall time: 35.1 s


In [3]:
client

0,1
Client  Scheduler: tcp://10.12.206.55:38224  Dashboard: https://jupyterhub.hpc.ucar.edu/stable/user/kristenk/proxy/8787/status,Cluster  Workers: 0  Cores: 0  Memory: 0 B


In [4]:
grid = pop_tools.get_grid('POP_gx1v6')
grid

## Read the CESM-LE data 

We will use [`intake-esm`](https://intake-esm.readthedocs.io/en/latest/), which is a data catalog tool.
It enables querying a database for the files we want, then loading those directly as an `xarray.Dataset`.

First step is to set the "collection" for the CESM-LE, which depends on a json file conforming to the [ESM Catalog Specification](https://github.com/NCAR/esm-collection-spec).

In [5]:
catalog_file = '/glade/u/home/kristenk/TTE_CESM-LE/krill-cesm-le/notebooks/data/glade-cesm1-le.json'
#catalog_file = '/glade/collections/cmip/catalog/intake-esm-datastore/catalogs/glade-cesm1-le.json'
variables = ['Fe_scavenge'] #['IRON_FLUX','Jint_100m_Fe','tend_zint_100m_Fe']

experiments = ['20C', 'RCP85']
stream = 'pop.h'
    
col = intake.open_esm_datastore(catalog_file, sep=',')
col

Unnamed: 0,unique
experiment,7
case,108
component,6
stream,15
variable,1052
date_range,116
member_id,40
path,191066
ctrl_branch_year,6
ctrl_experiment,4


Now we will search the collection for the ensemble members (unique `member_id`'s) that have a chlorophyll field. This is necessary because the ocean biogeochemistry was corrupted in some members and the data deleted.

In this cell, `member_id` is a list of the ensemble members we want to operate on.

In [6]:
col_sub = col.search(experiment=['20C'],                      
                     stream='pop.h', 
                     variable=['diatChl'])

member_id = list(col_sub.df.member_id.unique())
print(member_id)

[1, 2, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 101, 102, 103, 104, 105]


## Now let's search for the data we want

Specify a list of variables and perform a search. Under the hood, the `search` functionality uses [`pandas`](https://pandas.pydata.org/) data frames. We can view that frame here using the `.df` syntax.

In [7]:
col_sub = col.search(
    experiment=experiments, 
    stream=stream, 
    variable=variables,
    member_id=member_id,
    )

print(col_sub)

col_sub.df.head()

<glade-cesm1-le catalog with 2 dataset(s) from 95 asset(s)>


Unnamed: 0,experiment,case,component,stream,variable,date_range,member_id,path,ctrl_branch_year,ctrl_experiment,ctrl_member_id
0,20C,b.e11.B20TRC5CNBDRD.f09_g16.001,ocn,pop.h,Fe_scavenge,185001-200512,1,/glade/campaign/cesm/collections/cesmLE/CESM-C...,402,CTRL,1
1,20C,b.e11.B20TRC5CNBDRD.f09_g16.002,ocn,pop.h,Fe_scavenge,192001-200512,2,/glade/campaign/cesm/collections/cesmLE/CESM-C...,1920,20C,1
2,20C,b.e11.B20TRC5CNBDRD.f09_g16.009,ocn,pop.h,Fe_scavenge,192001-200512,9,/glade/campaign/cesm/collections/cesmLE/CESM-C...,1920,20C,1
3,20C,b.e11.B20TRC5CNBDRD.f09_g16.010,ocn,pop.h,Fe_scavenge,192001-200512,10,/glade/campaign/cesm/collections/cesmLE/CESM-C...,1920,20C,1
4,20C,b.e11.B20TRC5CNBDRD.f09_g16.011,ocn,pop.h,Fe_scavenge,192001-200512,11,/glade/campaign/cesm/collections/cesmLE/CESM-C...,1920,20C,1


Now we can use the [`to_dataset_dict`](https://intake-esm.readthedocs.io/en/latest/api.html#intake_esm.core.esm_datastore.to_dataset_dict) method to return a dictionary of `xarray.Dataset`'s. `intake_esm` makes groups of these according to rules in the collection spec file.

We can use the `preprocess` parameter to pass in a function that makes some corrections to the dataset. So first we define a function that does the following:
- fix the time coordinate to be the middle of the interval
- drop the singleton dimension on SST (which screws up coordinate alignment)
- subset to the time-interval 1920-2100

In [8]:
client

0,1
Client  Scheduler: tcp://10.12.206.55:38224  Dashboard: https://jupyterhub.hpc.ucar.edu/stable/user/kristenk/proxy/8787/status,Cluster  Workers: 0  Cores: 0  Memory: 0 B


In [9]:
%%time
dsets = col_sub.to_dataset_dict(cdf_kwargs={'chunks': {'time':5}, 'decode_times': False})
dsets


--> The keys in the returned dictionary of datasets are constructed as follows:
	'component,experiment,stream'


CPU times: user 8.87 s, sys: 980 ms, total: 9.85 s
Wall time: 16min 17s


{'ocn,20C,pop.h': <xarray.Dataset>
 Dimensions:               (d2: 2, lat_aux_grid: 395, member_id: 34, moc_comp: 3, moc_z: 61, nlat: 384, nlon: 320, time: 1872, transport_comp: 5, transport_reg: 2, z_t: 60, z_t_150m: 15, z_w: 60, z_w_bot: 60, z_w_top: 60)
 Coordinates: (12/13)
   * time                  (time) float64 6.753e+05 6.753e+05 ... 7.322e+05
     TLAT                  (nlat, nlon) float64 dask.array<chunksize=(384, 320), meta=np.ndarray>
     TLONG                 (nlat, nlon) float64 dask.array<chunksize=(384, 320), meta=np.ndarray>
     ULAT                  (nlat, nlon) float64 dask.array<chunksize=(384, 320), meta=np.ndarray>
     ULONG                 (nlat, nlon) float64 dask.array<chunksize=(384, 320), meta=np.ndarray>
   * lat_aux_grid          (lat_aux_grid) float32 -79.49 -78.95 ... 89.47 90.0
     ...                    ...
   * z_t                   (z_t) float32 500.0 1.5e+03 ... 5.125e+05 5.375e+05
   * z_t_150m              (z_t_150m) float32 500.0 1.5e+03 ...

Now, let's compute the total surface chlorophyll, put time at the mid-point of the interval, and subset to 1920-2100.

In [10]:
dsets['ocn,RCP85,pop.h'].Fe_scavenge

Unnamed: 0,Array,Chunk
Bytes,1.04 TiB,140.62 MiB
Shape,"(34, 1140, 60, 384, 320)","(1, 5, 60, 384, 320)"
Count,29473 Tasks,7752 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 1.04 TiB 140.62 MiB Shape (34, 1140, 60, 384, 320) (1, 5, 60, 384, 320) Count 29473 Tasks 7752 Chunks Type float32 numpy.ndarray",1140  34  320  384  60,

Unnamed: 0,Array,Chunk
Bytes,1.04 TiB,140.62 MiB
Shape,"(34, 1140, 60, 384, 320)","(1, 5, 60, 384, 320)"
Count,29473 Tasks,7752 Chunks
Type,float32,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,0.94 MiB,0.94 MiB
Shape,"(384, 320)","(384, 320)"
Count,2 Tasks,1 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 0.94 MiB 0.94 MiB Shape (384, 320) (384, 320) Count 2 Tasks 1 Chunks Type float64 numpy.ndarray",320  384,

Unnamed: 0,Array,Chunk
Bytes,0.94 MiB,0.94 MiB
Shape,"(384, 320)","(384, 320)"
Count,2 Tasks,1 Chunks
Type,float64,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,0.94 MiB,0.94 MiB
Shape,"(384, 320)","(384, 320)"
Count,2 Tasks,1 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 0.94 MiB 0.94 MiB Shape (384, 320) (384, 320) Count 2 Tasks 1 Chunks Type float64 numpy.ndarray",320  384,

Unnamed: 0,Array,Chunk
Bytes,0.94 MiB,0.94 MiB
Shape,"(384, 320)","(384, 320)"
Count,2 Tasks,1 Chunks
Type,float64,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,0.94 MiB,0.94 MiB
Shape,"(384, 320)","(384, 320)"
Count,2 Tasks,1 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 0.94 MiB 0.94 MiB Shape (384, 320) (384, 320) Count 2 Tasks 1 Chunks Type float64 numpy.ndarray",320  384,

Unnamed: 0,Array,Chunk
Bytes,0.94 MiB,0.94 MiB
Shape,"(384, 320)","(384, 320)"
Count,2 Tasks,1 Chunks
Type,float64,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,0.94 MiB,0.94 MiB
Shape,"(384, 320)","(384, 320)"
Count,2 Tasks,1 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 0.94 MiB 0.94 MiB Shape (384, 320) (384, 320) Count 2 Tasks 1 Chunks Type float64 numpy.ndarray",320  384,

Unnamed: 0,Array,Chunk
Bytes,0.94 MiB,0.94 MiB
Shape,"(384, 320)","(384, 320)"
Count,2 Tasks,1 Chunks
Type,float64,numpy.ndarray


In [11]:
def phys_med_Fe(ds):
    """calculate physically mediated Fe supply """
    #convert IRON_FLUX from mmol/m2/s to mmol/m3 cm/s
    ds['phys_med_Fe'] = ds.tend_zint_100m_Fe - ds.Jint_100m_Fe - (ds.IRON_FLUX * 100.)
    ds.phys_med_Fe.attrs = ds.tend_zint_100m_Fe.attrs
    
    return ds #.drop(['IRON_FLUX','Jint_100m_Fe','tend_zint_100m_Fe'])

def zint_Fe_scavenge(ds):
    """100 m integral of Fe scavenging"""
    
    dz100m = ds.dz.isel(z_t=slice(0, 10))
    ds['Fe_scavenge_zint100m'] = ((ds.Fe_scavenge).isel(z_t=slice(0, 10)) * dz100m).sum(dim='z_t')
    ds.Fe_scavenge_zint100m.attrs = ds.Fe_scavenge.attrs
    ds.Fe_scavenge_zint100m.attrs['long_name'] = '100m depth integral of Fe scavenging'
    ds.Fe_scavenge_zint100m.attrs['units'] = ds.Fe_scavenge_zint100m.attrs['units'] + ' cm'
    
    return ds.drop(['Fe_scavenge'])

def fix_time(ds):
    ds = ds.copy(deep=True)
    
    time_attrs = ds.time.attrs
    time_encoding = ds.time.encoding
    
    ds['time'] = xr.DataArray(
        cftime.num2date(
            ds.time_bound.mean(dim='d2'), 
            units=ds.time.units, 
            calendar=ds.time.calendar
        ), 
        dims=('time')
    )
    
    time_encoding['units'] = time_attrs.pop('units')
    time_encoding['calendar'] = time_attrs.pop('calendar')
    
    ds.time.attrs = time_attrs
    ds.time.encoding = time_encoding
    return ds    

In [12]:
%%time

# fix time
dsets2 = {key: fix_time(ds) for key, ds in dsets.items()}
print('fixed time')

# subset time
dsets2 = {key: ds.sel(time=slice('1920', '2100')) for key, ds in dsets2.items()}
print('subset time done')

# compute physically mediated iron supply
#dsets2 = {key: phys_med_Fe(ds) for key, ds in dsets2.items()}

# compute 100 m integral of Fe scavenging
dsets2 = {key: zint_Fe_scavenge(ds) for key, ds in dsets2.items()}


fixed time
subset time done
CPU times: user 993 ms, sys: 38.5 ms, total: 1.03 s
Wall time: 2.1 s


Concatenate the datasets in time, i.e. 20C + RCP8.5 experiments.

In [13]:
ordered_dsets_keys = ['ocn,20C,pop.h', 'ocn,RCP85,pop.h']
#ordered_dsets_keys = ['ocn.20C.pop.h', 'ocn.RCP85.pop.h']
ds = xr.concat(
    [dsets2[exp] for exp in ordered_dsets_keys], 
    dim='time', 
    data_vars='minimal',
    #compat='override' ## added this
)
time_encoding = dsets2[ordered_dsets_keys[0]].time.encoding
ds

Unnamed: 0,Array,Chunk
Bytes,33.94 kiB,80 B
Shape,"(2172, 2)","(5, 2)"
Count,1476 Tasks,435 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 33.94 kiB 80 B Shape (2172, 2) (5, 2) Count 1476 Tasks 435 Chunks Type float64 numpy.ndarray",2  2172,

Unnamed: 0,Array,Chunk
Bytes,33.94 kiB,80 B
Shape,"(2172, 2)","(5, 2)"
Count,1476 Tasks,435 Chunks
Type,float64,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,33.80 GiB,2.34 MiB
Shape,"(34, 2172, 384, 320)","(1, 5, 384, 320)"
Count,200111 Tasks,14790 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 33.80 GiB 2.34 MiB Shape (34, 2172, 384, 320) (1, 5, 384, 320) Count 200111 Tasks 14790 Chunks Type float32 numpy.ndarray",34  1  320  384  2172,

Unnamed: 0,Array,Chunk
Bytes,33.80 GiB,2.34 MiB
Shape,"(34, 2172, 384, 320)","(1, 5, 384, 320)"
Count,200111 Tasks,14790 Chunks
Type,float32,numpy.ndarray


### Make annual means for each of the below variables

In [14]:
variables= ['Fe_scavenge_zint100m'] #['phys_med_Fe','IRON_FLUX','Jint_100m_Fe','tend_zint_100m_Fe'] 

## Compute annual means 

In [15]:
%%time
ds_ann = util.ann_mean(ds, time_bnds_varname='time_bound', time_centered=True)
ds_ann

CPU times: user 7.58 s, sys: 122 ms, total: 7.7 s
Wall time: 9.91 s


Unnamed: 0,Array,Chunk
Bytes,5.63 GiB,0.94 MiB
Shape,"(181, 34, 384, 320)","(1, 1, 384, 320)"
Count,358170 Tasks,6154 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 5.63 GiB 0.94 MiB Shape (181, 34, 384, 320) (1, 1, 384, 320) Count 358170 Tasks 6154 Chunks Type float64 numpy.ndarray",181  1  320  384  34,

Unnamed: 0,Array,Chunk
Bytes,5.63 GiB,0.94 MiB
Shape,"(181, 34, 384, 320)","(1, 1, 384, 320)"
Count,358170 Tasks,6154 Chunks
Type,float64,numpy.ndarray


In [16]:
for var in variables:
    ds_ann[var] = ds_ann[var].chunk((5,34,384,320))

In [17]:
%%time
ds_ann.load()

CPU times: user 18min 48s, sys: 32.5 s, total: 19min 20s
Wall time: 1h 37min 49s


#### write out data ANNUAL

In [18]:
%%time

#var = variables[0]

for var in variables:


    print('starting variable: ', var)

    keep_vars = ['time_bound','TAREA','time','dz','KMT', 'member_id','TLAT','TLONG', var]

    ds_out = ds_ann.drop([v for v in ds_ann.variables if v not in keep_vars])


    ds_out.compute()
    outfile='/glade/scratch/kristenk/CESM-LE-output/CESM-LE-'+var+'.nc'
    ds_out.to_netcdf(outfile)

starting variable:  Fe_scavenge_zint100m
CPU times: user 70.6 ms, sys: 1.82 s, total: 1.89 s
Wall time: 2.11 s


In [19]:
cluster.close()