In [1]:
%matplotlib inline
import os
import importlib
import yaml
import copy

import xarray as xr
import numpy as np

import intake
import intake_esm
import xcollection as xc

import dask
import dask.distributed
import ncar_jobqueue

  dot_file_settings = yaml.load(f)
  defaults = yaml.load(f)


In [2]:
col = intake.open_esm_metadatastore(collection_input_definition='cesm1-le-collection.yml',
                                    overwrite_existing=False)
col.df.info()

  self.build_collection(overwrite_existing)


<class 'pandas.core.frame.DataFrame'>
Int64Index: 265484 entries, 0 to 265483
Data columns (total 18 columns):
resource            265484 non-null object
resource_type       265484 non-null object
direct_access       265484 non-null bool
experiment          265484 non-null object
case                265484 non-null object
component           265484 non-null object
stream              265484 non-null object
variable            265484 non-null object
date_range          265484 non-null object
ensemble            265484 non-null int64
file_fullpath       265484 non-null object
file_basename       265484 non-null object
file_dirname        265484 non-null object
ctrl_branch_year    0 non-null float64
year_offset         34129 non-null float64
sequence_order      265484 non-null int64
has_ocean_bgc       265484 non-null bool
grid                54238 non-null object
dtypes: bool(2), float64(2), int64(2), object(12)
memory usage: 34.9+ MB


In [3]:
experiments = ['20C', 'RCP85']
ensembles = col.search(experiment=experiments, has_ocean_bgc=True).query_results.ensemble.unique().tolist()
print(ensembles)

  self.query_results = self.get_results()


[1, 2, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 26, 27, 28, 29, 30, 31, 32, 34, 35, 101, 102, 103, 104, 105]


In [4]:
query_base = dict(experiment=experiments, ensemble=ensembles, stream='pop.h')      
resource_constraints = dict(direct_access=True)

In [5]:
with open('variables.yml', 'r') as fid:
    variables = yaml.load(fid)
    
variable_list = []
for k, v in variables.items():
    variable_list.extend(v)

query = {**dict(variable=variable_list), **query_base, **resource_constraints}

cat = col.search(**query)
variable_list = cat.query_results.variable.unique().tolist()

variable_list = [v for v in variable_list if v in ['NO3', 'PO4', 'SiO3', 'Fe']]
print(variable_list)

  


['Fe', 'NO3', 'PO4', 'SiO3']


In [6]:
base_recipe = {'name': 'monclim',
                 'description': 'compute monthly climatology',
                 'operators': [xc.operator(applied_method="time:mon_clim",
                                           module='esmlab',
                                           function='climatology', 
                                           kwargs={'freq': 'mon'})]}

In [7]:
variable_control = yaml.load(f'''
- slice: !!python/object/apply:slice
         args: ['1940', '1949']
  varlist: {variable_list}

- slice: !!python/object/apply:slice
         args: ['1990', '1999']
  varlist: {variable_list}

- slice: !!python/object/apply:slice
         args: ['2040', '2049']
  varlist: {variable_list}  
  
- slice: !!python/object/apply:slice
         args: ['2090', '2099']
  varlist: {variable_list}  

- slice: !!python/object/apply:slice
         args: ['1995', '2005']
  varlist: ['TEMP', 'SALT']
  
- slice: !!python/object/apply:slice
         args: ['1995', '2010']
  varlist: ['photoC_diat', 'photoC_sp', 'diatChl', 'spChl']
''')
print(yaml.dump(variable_control))

- slice: !!python/object/apply:builtins.slice
  - '1940'
  - '1949'
  - null
  varlist:
  - Fe
  - NO3
  - PO4
  - SiO3
- slice: !!python/object/apply:builtins.slice
  - '1990'
  - '1999'
  - null
  varlist:
  - Fe
  - NO3
  - PO4
  - SiO3
- slice: !!python/object/apply:builtins.slice
  - '2040'
  - '2049'
  - null
  varlist:
  - Fe
  - NO3
  - PO4
  - SiO3
- slice: !!python/object/apply:builtins.slice
  - '2090'
  - '2099'
  - null
  varlist:
  - Fe
  - NO3
  - PO4
  - SiO3
- slice: !!python/object/apply:builtins.slice
  - '1995'
  - '2005'
  - null
  varlist:
  - TEMP
  - SALT
- slice: !!python/object/apply:builtins.slice
  - '1995'
  - '2010'
  - null
  varlist:
  - photoC_diat
  - photoC_sp
  - diatChl
  - spChl





In [8]:
cluster = ncar_jobqueue.NCARCluster(walltime='12:00:00')
client = dask.distributed.Client(cluster)
n_workers = 9 * 6
cluster.scale(n_workers)

  "diagnostics_port has been deprecated. "


In [None]:
for d in variable_control:
    time_slice = d.get('slice')
    varlist = d.get('varlist')
    slice_str = f'{time_slice.start}-{time_slice.stop}'
    for v in varlist:
        
        print(f'working on {v}')
        query = {**dict(variable=v), **query_base, **resource_constraints}

        namer = lambda q: '.'.join(
            ['cesm1_le', 
             '+'.join(q['experiment']), 
             q['variable'], 
             base_recipe['name'], 
             slice_str])

        recipe = copy.deepcopy(base_recipe)
        recipe['operators'].insert(0, xc.operator(module='util', 
                                                  function='sel_time',
                                                  kwargs={'indexer_val': time_slice}))
        
        dc = xc.analyzed_collection(collection_obj=col,
                                    analysis_recipe=recipe,
                                    analysis_name=namer(query),
                                    overwrite_existing=False,
                                    file_format="nc",
                                    **query)


working on Fe


  self.query_results = self.get_results()
  self.query_results = self.get_results()
  file_fullpath_column_name='file_fullpath',


HBox(children=(IntProgress(value=0, description='dataset', max=1, style=ProgressStyle(description_width='initi…

HBox(children=(IntProgress(value=0, description='member', max=1, style=ProgressStyle(description_width='initia…




  self.query_results = self.get_results()
  file_fullpath_column_name='file_fullpath',


HBox(children=(IntProgress(value=0, description='dataset', max=1, style=ProgressStyle(description_width='initi…

HBox(children=(IntProgress(value=0, description='member', max=1, style=ProgressStyle(description_width='initia…




HBox(children=(IntProgress(value=0, description='dataset', max=1, style=ProgressStyle(description_width='initi…

In [None]:
cluster.close()

In [None]:
%load_ext watermark

In [None]:
%watermark --iversion -g -h -m -v -u -d