# Build the Intake-ESM Catalog

In [1]:
from distributed import Client
from ncar_jobqueue import NCARCluster

  from distributed.utils import format_bytes, parse_bytes, tmpfile
  from distributed.utils import format_bytes, parse_bytes, tmpfile
  from distributed.utils import parse_bytes


In [2]:
cluster = NCARCluster()
cluster.scale(20)
client = Client(cluster)

In [3]:
client

0,1
Connection method: Cluster object,Cluster type: PBSCluster
Dashboard: https://jupyterhub.hpc.ucar.edu/stable/user/mgrover/proxy/8787/status,

0,1
Dashboard: https://jupyterhub.hpc.ucar.edu/stable/user/mgrover/proxy/8787/status,Workers: 0
Total threads:  0,Total memory:  0 B

0,1
Comm: tcp://10.12.206.9:43930,Workers: 0
Dashboard: https://jupyterhub.hpc.ucar.edu/stable/user/mgrover/proxy/8787/status,Total threads:  0
Started:  Just now,Total memory:  0 B


In [12]:
from ecgtools import Builder
from ecgtools.parsers.cesm import parse_cesm_history
import pandas as pd

In [5]:
def build_catalog(case):
    b = Builder(
        # Directory with the output
        f"/glade/scratch/hannay/archive/{case}/ocn/hist",
        # Depth of 1 since we are sending it to the case output directory
        depth=1,
        # Exclude the timeseries and restart directories
        exclude_patterns=["*/tseries/*", "*/rest/*"],
        # Number of jobs to execute - should be equal to # threads you are using
        njobs=-1,
    )
    
    return b.build(parse_cesm_history)

In [6]:
cases = ['b1850.f19_g17.validation_nuopc.004',
         'b1850.f19_g17.validation_mct.004',
         'b1850.f19_g17.validation_mct.002']

In [8]:
df_list = []
for case in cases:
    b = build_catalog(case)
    df_list.append(b.df)

[Parallel(n_jobs=-1)]: Using backend LokyBackend with 72 concurrent workers.
[Parallel(n_jobs=-1)]: Done   1 out of   1 | elapsed:    0.3s finished
[Parallel(n_jobs=-1)]: Using backend LokyBackend with 72 concurrent workers.
[Parallel(n_jobs=-1)]: Done  18 tasks      | elapsed:    0.7s
[Parallel(n_jobs=-1)]: Done 144 tasks      | elapsed:    2.0s
[Parallel(n_jobs=-1)]: Done 306 tasks      | elapsed:    3.8s
[Parallel(n_jobs=-1)]: Done 504 tasks      | elapsed:    6.1s
[Parallel(n_jobs=-1)]: Done 738 tasks      | elapsed:    8.6s
[Parallel(n_jobs=-1)]: Done 1008 tasks      | elapsed:   11.6s
[Parallel(n_jobs=-1)]: Done 1314 tasks      | elapsed:   14.9s
[Parallel(n_jobs=-1)]: Done 1656 tasks      | elapsed:   18.7s
[Parallel(n_jobs=-1)]: Done 2034 tasks      | elapsed:   23.0s
[Parallel(n_jobs=-1)]: Done 2448 tasks      | elapsed:   27.7s
[Parallel(n_jobs=-1)]: Done 2898 tasks      | elapsed:   33.0s
[Parallel(n_jobs=-1)]: Done 3384 tasks      | elapsed:   38.5s
[Parallel(n_jobs=-1)]: D

In [15]:
b.df = pd.concat(df_list)

In [14]:
b.save(
    # File path - could save as .csv (uncompressed csv) or .csv.gz (compressed csv)
    "/glade/work/mgrover/cesm-validation-catalog.csv",
    # Column name including filepath
    path_column_name='path',
    # Column name including variables
    variable_column_name='variables',
    # Data file format - could be netcdf or zarr (in this case, netcdf)
    data_format="netcdf",
    # Which attributes to groupby when reading in variables using intake-esm
    groupby_attrs=["component", "stream", "case"],
    # Aggregations which are fed into xarray when reading in data using intake
    aggregations=[
        {
            "type": "join_existing",
            "attribute_name": "date",
            "options": {"dim": "time", "coords": "minimal", "compat": "override"},
        }
    ],
)

Saved catalog location: /glade/work/mgrover/cesm-validation-catalog.json and /glade/work/mgrover/cesm-validation-catalog.csv




In [16]:
# Import ast which helps with parsing the list of variables
import ast

# Import intake-esm
import intake

In [17]:
col = intake.open_esm_datastore(
    "/glade/work/mgrover/cesm-validation-catalog.json",
    csv_kwargs={"converters": {"variables": ast.literal_eval}},
    sep="/",
)
col

Unnamed: 0,unique
component,1
stream,4
date,2501
case,2
member_id,1
frequency,4
variables,545
path,7402


In [18]:
cat = col.search(
    variables='TEMP',
    stream='pop.h',
)
cat

Unnamed: 0,unique
component,1
stream,1
date,1200
case,2
member_id,1
frequency,1
variables,434
path,2400


In [19]:
cat

Unnamed: 0,unique
component,1
stream,1
date,1200
case,2
member_id,1
frequency,1
variables,434
path,2400


In [20]:
sub = cat.search(date=sorted(cat.df.date.unique())[:12])

In [None]:
dsets = sub.to_dataset_dict(cdf_kwargs={'use_cftime': True, 'chunks': {'time': 10}})


--> The keys in the returned dictionary of datasets are constructed as follows:
	'component/stream/case'


In [None]:
dsets['ocn/pop.h/b1850.f19_g17.validation_mct.004'].TEMP.isel(z_t=0).plot()