In [1]:
%matplotlib inline
import os
from subprocess import call
from glob import glob

import cftime
import pandas as pd
import xarray as xr
import numpy as np

import time

import cartopy
import cartopy.crs as ccrs
import cartopy.feature as cfeature

from cartopy.mpl.ticker import LongitudeFormatter, LatitudeFormatter
import matplotlib.pyplot as plt
import matplotlib.gridspec as gridspec
import matplotlib.colors as colors
import cmocean

import plottools as pt
import gridtools as gt
import project as P
import calc

os.environ['CARTOPY_USER_BACKGROUNDS'] = '/glade/work/mclong/natural-earth-img'
USER = os.environ['USER']

In [2]:
from dask.distributed import Client
from dask_jobqueue import PBSCluster
import dask

Nnodes = 4
processes = 18
cores = 18

dask.config.set({'distributed.dashboard.link':'http://localhost:{port}/status'})


# Lots of arguments to this command are set in ~/.config/dask/jobqueue.yaml
cluster = PBSCluster(queue='regular',
                     cores = cores,
                     processes = processes,
                     memory = '100GB',                     
                     project = 'NCGD0011',
                     walltime = '04:00:00',
                     local_directory=f'/glade/scratch/{USER}/dask-tmp')
client = Client(cluster)


cluster.scale(processes*Nnodes)

#while not cluster.running_jobs:
#    print('...',end='')
#    time.sleep(30)

#client    

In [3]:
import importlib
import cesm
import yaml
import logging

importlib.reload(cesm)

with open('collections.yml') as f:
    spec = yaml.load(f)

print('-'*70+'\nDatasets:')
cases = []
files = {}
for i, (d_src,d_attrs) in enumerate(spec['monthly'].items()):    
    files[d_src] = cesm.list_files(**d_attrs['open_dataset'])
    print(f'({i}) {d_src}: {d_attrs["description"]} ({len(files[d_src])} files)')
    cases.append(d_src)
print('-'*70)

----------------------------------------------------------------------
Datasets:
(0) CTRL: Default model configuration (2880 files)
(1) XT-FE: Extraterrestrial Fe deposited on ocean and ice (2880 files)
----------------------------------------------------------------------


In [10]:
xropen = {'decode_times': False,
          'decode_coords': False,
          'decode_cf': False,
          'chunks': {'time': 1, 'z_t': 1} }

diro = '/glade/scratch/mclong/calcs/xtFe/ts-files'

def gen_timeseries(files_hist,file_ts,variable):
    
    time_vars = ['time','time_bound']
    keep_vars = [variable]+time_vars

    def one_file(file_hist,i):
        with xr.open_dataset(file_hist,**xropen) as ds:
            ds = ds.drop([v for v in ds.variables if v not in keep_vars])
            ds.to_zarr(f'{diro}/{i:04d}.zarr')
            return f'{diro}/{i:04d}.zarr'

    results = []
    for i,file_hist in enumerate(files_hist):
        res = dask.delayed(one_file,pure=True)(file_hist,i)   
        results.append(res)    
        
    res = dask.compute(*results)
    #combined = xr.concat(res, 'time')
    
    return #combined.to_netcdf(file_ts, compute=False)
    

In [11]:
%%time
diro = '/glade/scratch/mclong/calcs/xtFe/ts-files'
for v in ['DIC']:
    file_ts = os.path.join(diro,f'ctrl.{v}.nc')
    obj = gen_timeseries(files['CTRL'], file_ts, v)
    results = obj.compute()


AttributeError: 'NoneType' object has no attribute 'compute'



In [4]:
%%time
diro = '/glade/scratch/mclong/calcs/xtFe/ts-files'
xropen = {'decode_times': False,
          'decode_coords': False,
          'decode_cf': False}

time_vars = ['time','time_bound']
static_vars = ['TLAT','TLONG','TAREA','KMT']

for v in ['DIC']:   
    file_ts = os.path.join(diro,f'ctrl.{v}.nc')
    file_ts_z = os.path.join(diro,f'ctrl.{v}.zarr')
    
    drop = lambda ds: ds.drop([v for v in ds.variables if v not in [v]+time_vars+static_vars])
    
    ds = xr.open_mfdataset(files['CTRL'],
                           concat_dim = 'time',
                           data_vars = [v],
                           chunks = {'time': 1, 'z_t': 1}, 
                           preprocess = drop,
                           autoclose = True,
                           parallel = True,
                           **xropen)
    
    print(f'writing {file_ts}')
    ds.to_netcdf(file_ts,unlimited_dims=['time'])
    #ds.to_zarr(file_ts_z)
    

KeyboardInterrupt: 

In [None]:
files['CTRL']

In [9]:
ds = xr.open_dataset('/glade/scratch/mclong/archive/g.e21a01d.G1850ECOIAF.T62_g17.extraterr-fe.001/ocn/proc/tseries/month_1/g.e21a01d.G1850ECOIAF.T62_g17.extraterr-fe.001.pop.h.DIC.000101-024012.nc',
                    **xropen)
ds

<xarray.Dataset>
Dimensions:                 (d2: 2, lat_aux_grid: 395, moc_comp: 3, moc_z: 61, nchar: 384, nlat: 384, nlon: 320, time: 2880, transport_comp: 5, transport_reg: 2, z_t: 60, z_t_150m: 15, z_w: 60, z_w_bot: 60, z_w_top: 60)
Coordinates:
  * z_t                     (z_t) float32 500.0 1500.0 ... 512502.8 537500.0
  * z_t_150m                (z_t_150m) float32 500.0 1500.0 ... 13500.0 14500.0
  * z_w                     (z_w) float32 0.0 1000.0 ... 500004.7 525000.94
  * z_w_top                 (z_w_top) float32 0.0 1000.0 ... 500004.7 525000.94
  * z_w_bot                 (z_w_bot) float32 1000.0 2000.0 ... 549999.06
  * lat_aux_grid            (lat_aux_grid) float32 -79.48815 -78.952896 ... 90.0
  * moc_z                   (moc_z) float32 0.0 1000.0 ... 525000.94 549999.06
  * time                    (time) float64 396.0 424.0 ... 8.793e+04 8.796e+04
Dimensions without coordinates: d2, moc_comp, nchar, nlat, nlon, transport_comp, transport_reg
Data variables:
    moc_compo