In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
import os
import shutil

import data_collections as dc
import funnel
import matplotlib.pyplot as plt
import metabolic as mi
import numpy as np
import operators as ops
import util
import xarray as xr

  from distributed.utils import tmpfile


In [3]:
sub_spec = dict(
    name='drift-corrected.ann',
    experiment=['20C', 'RCP85'],
    member_id=dc.ocean_bgc_member_ids,
)

catalog = funnel.to_intake_esm(agg_member_id=False).search(**sub_spec)
catalog

Unnamed: 0,unique
experiment,2
component,1
stream,1
member_id,32
variable,2
name,1
path,128


In [4]:
def linear_fit_dim(x, y, fit_dim="time"):
    """Compute the linear regression relationships between two
    xarray.DataArray's over a specified dimension ("fit_dim").

    Parameters
    ----------

    x : xarray.DataArray
      The independent variable.

    y : xarray.DataArray
      The dependent variable.

    fit_dim : string, optional
      The dimension over which to compute regression.

    Returns
    -------

    beta : xarray.DataArray
      The parameters of the regression relationship.

    """
    assert fit_dim in x.dims, f"{fit_dim} dimension not found"
    assert x.dims == y.dims, "dimension mismatch"

    x = x.reset_coords([c for c in x.coords if c not in x.indexes], drop=True)
    y = y.reset_coords([c for c in y.coords if c not in y.indexes], drop=True)

    non_fit_dims = [d for d in x.dims if d != fit_dim]
    x_stack = x.stack(non_fit_dims=non_fit_dims)
    y_stack = y.stack(non_fit_dims=non_fit_dims)

    beta_stack = xr.full_like(
        y.stack(non_fit_dim=[d for d in y.dims if d != fit_dim]).isel({fit_dim: 0}, drop=True),
        fill_value=np.nan,
    )
    beta_stack = beta_stack.expand_dims({"beta": [0, 1]}).copy()
    beta_stack.name = "slope_intercpt"

    for i in range(x_stack.sizes["non_fit_dims"]):
        if not x_stack[0, i].isnull():
            beta_stack[:, i] = np.polyfit(x_stack[:, i], y_stack[:, i], 1)

    return beta_stack.unstack()


def compute_pO2_v_TEMP_regression(ds):
    """Compute the relationships between pO2 and TEMP over time"""

    template = (
        xr.full_like(ds.TEMP.isel(time=0, drop=True), fill_value=np.nan)
        .expand_dims({"beta": [0, 1]})
        .reset_coords({c for c in ds.TEMP.coords if c not in ds.TEMP.dims}, drop=True)
    )
    template.name = "pO2_v_TEMP_beta"
    template.attrs['units'] = f"{ds.pO2.attrs['units']}/{ds.TEMP.attrs['units']}"
    template.attrs['long_name'] = 'Regression parameters'

    return xr.map_blocks(
        linear_fit_dim,
        ds.TEMP,
        [ds.pO2],
        template=template,
    )


def compute_temporal_trend(ds):
    """return a dataset of the linear trend in time"""
    ds_trend = xr.Dataset()

    # assume annual
    year = ds.time

    for v, da in ds.data_vars.items():
        if 'time' not in da.dims:
            ds_trend[v] = da
        else:
            da_trend = ops.linear_trend(da, x=year)
            da_trend.attrs = da.attrs
            if 'units' in da_trend.attrs:
                da_trend.attrs['units'] += '/yr'
            ds_trend[v] = da_trend
    return ds_trend


operations = {
    'pO2_v_TEMP_beta': dict(
        func=compute_pO2_v_TEMP_regression,
        add_ops=["resample_ann"],
        dep_name="drift-corrected.ann",
    ),
}
operations

{'pO2_v_TEMP_beta': {'func': <function __main__.compute_pO2_v_TEMP_regression(ds)>,
  'add_ops': ['resample_ann'],
  'dep_name': 'drift-corrected.ann'}}

In [5]:
try:
    cluster
    client
except:
    cluster, client = util.get_ClusterClient(memory='8GB')
    cluster.scale(128)
client

0,1
Connection method: Cluster object,Cluster type: dask_jobqueue.PBSCluster
Dashboard: https://jupyterhub.hpc.ucar.edu/stable/user/mclong/calcs/proxy/8787/status,

0,1
Dashboard: https://jupyterhub.hpc.ucar.edu/stable/user/mclong/calcs/proxy/8787/status,Workers: 0
Total threads: 0,Total memory: 0 B

0,1
Comm: tcp://10.12.206.60:44135,Workers: 0
Dashboard: https://jupyterhub.hpc.ucar.edu/stable/user/mclong/calcs/proxy/8787/status,Total threads: 0
Started: Just now,Total memory: 0 B


In [6]:
clobber = False
stream = 'pop.h'
component = 'ocn'
experiment_joined = '20C+RCP85'

for member_id in dc.ocean_bgc_member_ids:

    cat = catalog.search(variable=['pO2', 'TEMP'], member_id=member_id)
    dsets = cat.to_dataset_dict()

    exp_keys = [
        f'20C.ocn.pop.h.{member_id}.drift-corrected.ann',
        f'RCP85.ocn.pop.h.{member_id}.drift-corrected.ann',
    ]
    assert set(dsets.keys()) == set(exp_keys)

    ds = xr.concat([dsets[k] for k in exp_keys], dim='time', coords='minimal', compat='override')
    ds = ds.drop(['ULAT', 'ULONG'])

    assert 'member_id' not in ds.dims

    if len(ds.REGION_MASK.shape) == 3:
        ds['REGION_MASK'] = ds.REGION_MASK[0, :, :]
    ds = ds.set_coords('REGION_MASK')
    ds = ds.assign_coords(
        {
            "nlat": xr.DataArray(np.arange(ds.sizes["nlat"]), dims=("nlat")),
            "nlon": xr.DataArray(np.arange(ds.sizes["nlon"]), dims=("nlon")),
        }
    )

    ds = ds.chunk({'nlat': 16, 'nlon': 16, 'z_t': 10, 'time': None})
    for v in ds.variables:
        if 'chunks' in ds[v].encoding:
            del ds[v].encoding['chunks']

    vol_mask = ops.pop_ocean_volume(ds)
    ds = ds.where(vol_mask > 0)

    for variable, info in operations.items():

        dep_name = info['dep_name']
        add_ops = info['add_ops']
        func = info['func']

        # check for existing cache file
        asset = dc.fnl_gen_cache_file_name(
            experiment_joined, component, stream, member_id, variable, dep_name
        )

        if clobber and os.path.exists(asset):
            print(f'removing: {asset}')
            shutil.rmtree(asset)

        if os.path.exists(asset):
            continue

        with util.timer(f'{variable}.{member_id}'):
            dso = ds[['TAREA', 'TLONG', 'TLAT', 'KMT', 'REGION_MASK', 'z_t', 'dz']]
            dso[variable] = func(ds)

            print(f'writing: {asset}')
            dso.to_zarr(asset, mode="w", consolidated=True)
            dc.fnl_make_cache(
                experiment_joined,
                component,
                stream,
                member_id,
                variable,
                dep_name,
                add_ops,
            )


--> The keys in the returned dictionary of datasets are constructed as follows:
	'experiment.component.stream.member_id.name'



--> The keys in the returned dictionary of datasets are constructed as follows:
	'experiment.component.stream.member_id.name'



--> The keys in the returned dictionary of datasets are constructed as follows:
	'experiment.component.stream.member_id.name'



--> The keys in the returned dictionary of datasets are constructed as follows:
	'experiment.component.stream.member_id.name'



--> The keys in the returned dictionary of datasets are constructed as follows:
	'experiment.component.stream.member_id.name'



--> The keys in the returned dictionary of datasets are constructed as follows:
	'experiment.component.stream.member_id.name'



--> The keys in the returned dictionary of datasets are constructed as follows:
	'experiment.component.stream.member_id.name'



--> The keys in the returned dictionary of datasets are constructed as follows:
	'experiment.component.stream.member_id.name'



--> The keys in the returned dictionary of datasets are constructed as follows:
	'experiment.component.stream.member_id.name'



--> The keys in the returned dictionary of datasets are constructed as follows:
	'experiment.component.stream.member_id.name'



--> The keys in the returned dictionary of datasets are constructed as follows:
	'experiment.component.stream.member_id.name'



--> The keys in the returned dictionary of datasets are constructed as follows:
	'experiment.component.stream.member_id.name'



--> The keys in the returned dictionary of datasets are constructed as follows:
	'experiment.component.stream.member_id.name'



--> The keys in the returned dictionary of datasets are constructed as follows:
	'experiment.component.stream.member_id.name'



--> The keys in the returned dictionary of datasets are constructed as follows:
	'experiment.component.stream.member_id.name'



--> The keys in the returned dictionary of datasets are constructed as follows:
	'experiment.component.stream.member_id.name'



--> The keys in the returned dictionary of datasets are constructed as follows:
	'experiment.component.stream.member_id.name'



--> The keys in the returned dictionary of datasets are constructed as follows:
	'experiment.component.stream.member_id.name'



--> The keys in the returned dictionary of datasets are constructed as follows:
	'experiment.component.stream.member_id.name'



--> The keys in the returned dictionary of datasets are constructed as follows:
	'experiment.component.stream.member_id.name'



--> The keys in the returned dictionary of datasets are constructed as follows:
	'experiment.component.stream.member_id.name'



--> The keys in the returned dictionary of datasets are constructed as follows:
	'experiment.component.stream.member_id.name'



--> The keys in the returned dictionary of datasets are constructed as follows:
	'experiment.component.stream.member_id.name'



--> The keys in the returned dictionary of datasets are constructed as follows:
	'experiment.component.stream.member_id.name'



--> The keys in the returned dictionary of datasets are constructed as follows:
	'experiment.component.stream.member_id.name'



--> The keys in the returned dictionary of datasets are constructed as follows:
	'experiment.component.stream.member_id.name'



--> The keys in the returned dictionary of datasets are constructed as follows:
	'experiment.component.stream.member_id.name'



--> The keys in the returned dictionary of datasets are constructed as follows:
	'experiment.component.stream.member_id.name'



--> The keys in the returned dictionary of datasets are constructed as follows:
	'experiment.component.stream.member_id.name'



--> The keys in the returned dictionary of datasets are constructed as follows:
	'experiment.component.stream.member_id.name'



--> The keys in the returned dictionary of datasets are constructed as follows:
	'experiment.component.stream.member_id.name'



--> The keys in the returned dictionary of datasets are constructed as follows:
	'experiment.component.stream.member_id.name'


In [7]:
client.close()
cluster.close()