In [1]:
import dask_mpi
dask_mpi.initialize()

from datetime import datetime, timezone
import math
import os

import distributed
import xarray as xr
import yaml
from cf_units import Unit

import data_catalog
from tseries_utils import clean_units, get_weight, get_rmask, tseries_fname, tseries_copy_vars

client = distributed.Client()

distributed.scheduler - INFO - Clear task state
distributed.scheduler - INFO -   Scheduler at:   tcp://10.148.10.13:8786
distributed.scheduler - INFO -       bokeh at:                     :8787


RuntimeError: Cannot run the event loop while another loop is running

In [None]:
tseries_specs_fname = 'tseries_specs_ice.yaml'
with open(tseries_specs_fname, mode='r') as fptr:
    tseries_specs = yaml.load(fptr)

catalog_name = 'experiments'
data_catalog.set_catalog(catalog_name)

varname = 'aice'
ts_spec = tseries_specs[varname]
print(ts_spec)

component = 'ice'
stream = 'cice.h'
experiment = 'esm-piControl'
entries = data_catalog.find_in_index(
    variable=varname, component=component, stream=stream, experiment=experiment)
entries

In [None]:
ensemble = 0
fnames = data_catalog.get_files(
    variable=varname, component=component, stream=stream, experiment=experiment, ensemble=ensemble)
fnames

In [None]:
ds_in = xr.open_mfdataset(fnames, decode_times=False, decode_coords=False, chunks={'time':4}, data_vars='minimal')
ds_in

In [None]:
da_in = ds_in[varname]
da_in

In [None]:
var_units = clean_units(da_in.attrs['units'])
print(var_units)
if 'unit_conv' in ts_spec:
    var_units = '(%s)(%s)' % (str(ts_spec['unit_conv']), var_units)
print(var_units)

In [None]:
reduce_dims = ts_spec['reduce_dims']
weight = get_weight(ds_in, component, reduce_dims)
weight_attrs = weight.attrs
print(weight)
weight = get_rmask(ds_in, component) * weight
print(weight)
weight.attrs = weight_attrs
print(weight)
area_earth_wikipedia = 510072000*1.0e3**2
print(weight.sum(dim=('nj', 'ni')).values)
print(weight.sum(dim=('nj', 'ni')).values / area_earth_wikipedia)

In [None]:
da_out = (da_in * weight).sum(dim=reduce_dims)
da_out.name = varname
da_out.attrs['long_name'] = 'Integrated '+da_in.attrs['long_name']
da_out.attrs['units']=Unit('(%s)(%s)' % (weight.attrs['units'], var_units)).format()
da_out

In [None]:
Unit(da_out.attrs['units']).convert(da_out.values[0:12], Unit(clean_units(ts_spec['units_out'])))

In [None]:
ds_out = da_out.to_dataset()
merge_objs = []
if 'bounds' in ds_in['time'].attrs:
    tb = ds_in[ds_in['time'].attrs['bounds']]
    tb.attrs['units'] = ds_in['time'].attrs['units']
    tb.attrs['calendar'] = ds_in['time'].attrs['calendar']
    ds_out = xr.merge((ds_out, tb))
print(ds_out)
for copy_var in tseries_copy_vars(component):
    print(copy_var)
    ds_out = xr.merge((ds_out, ds_in[copy_var]))
ds_out.attrs = ds_in.attrs
datestamp = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S %Z")
ds_out.attrs['history'] = 'created at %s' % datestamp
print(ds_out)