# An introduction to dask-kubernetes and intake-esm

Now that TRACMIP is embedded in Pangeo's [climate catalog](https://catalog.pangeo.io/climate), it is easier to aggregate through and merge the 7067 zarr sources that comprise the collection.
Additionally, with dask-kubernetes, it should be much easier to perform computation on this data, as well as checking the performance of these computations using Dask's distributed dashboard.

To initialize a Dask client, use the lab extension to start a new cluster and instantiate a `Client` with the scheduler address:

In [7]:
from dask.distributed import Client

client = Client("tcp://10.32.2.19:37469")
client

0,1
Client  Scheduler: tcp://10.32.2.19:37469  Dashboard: /user/0000-0003-2292-0572/proxy/8787/status,Cluster  Workers: 0  Cores: 0  Memory: 0 B


From here, the number of workers can be scaled manually or adaptively.

To access TRACMIP's ESM collection:

In [8]:
from intake import open_catalog

cat = open_catalog("https://raw.githubusercontent.com/pangeo-data/pangeo-datastore/master/intake-catalogs/climate.yaml")
col = cat.tracmip()
col

pangeo-tracmip-ESM Collection with 7067 entries:
	> 3 frequency(s)

	> 11 experiment(s)

	> 14 model(s)

	> 47 variable(s)

	> 10 version(s)

	> 7067 source(s)

From here, `col.df` can be used to access the list of zarr sources like a Pandas `DataFrame`:

In [9]:
col.df.head()

Unnamed: 0,frequency,experiment,model,variable,version,source
0,A3hr,aqua4xCO2,AM21,hur,v20190116,gs://cmip6/tracmip/A3hr/aqua4xCO2/AM21/hur/v20...
1,A3hr,aqua4xCO2,AM21,hus,v20190116,gs://cmip6/tracmip/A3hr/aqua4xCO2/AM21/hus/v20...
2,A3hr,aqua4xCO2,AM21,ta,v20190116,gs://cmip6/tracmip/A3hr/aqua4xCO2/AM21/ta/v201...
3,A3hr,aqua4xCO2,AM21,ua,v20190116,gs://cmip6/tracmip/A3hr/aqua4xCO2/AM21/ua/v201...
4,A3hr,aqua4xCO2,AM21,va,v20190116,gs://cmip6/tracmip/A3hr/aqua4xCO2/AM21/va/v201...


To find the columns and their unique values:

In [10]:
col.df.columns

Index(['frequency', 'experiment', 'model', 'variable', 'version', 'source'], dtype='object')

In [11]:
col.unique(["experiment", "model", "variable"])

{'experiment': {'count': 11,
  'values': ['aqua4xCO2',
   'aquaControl',
   'land4xCO2',
   'landControl',
   'landOrbit',
   'aquaAbs07',
   'aquaAbs15',
   'aquaAbs20',
   'landAbs07',
   'landAbs15',
   'landAbs20']},
 'model': {'count': 14,
  'values': ['AM21',
   'CAM4',
   'CAM5Nor',
   'CNRM-AM5',
   'ECHAM61',
   'ECHAM63',
   'GISS-ModelE2',
   'MIROC5',
   'MPAS',
   'MetUM-CTL',
   'MetUM-ENT',
   'CAM3',
   'LMDZ5A',
   'CALTECH']},
 'variable': {'count': 47,
  'values': ['hur',
   'hus',
   'ta',
   'ua',
   'va',
   'wap',
   'zg',
   'clivi',
   'clt',
   'clwvi',
   'hfls',
   'hfss',
   'huss',
   'pr',
   'prc',
   'prsn',
   'prw',
   'ps',
   'psl',
   'rlds',
   'rldscs',
   'rlus',
   'rlut',
   'rlutcs',
   'rsds',
   'rsdscs',
   'rsdt',
   'rsus',
   'rsuscs',
   'rsut',
   'rsutcs',
   'rtmt',
   'sfcWind',
   'tas',
   'tasmax',
   'tasmin',
   'tauu',
   'tauv',
   'ts',
   'cl',
   'cli',
   'clw',
   'evspsbl',
   'uas',
   'vas',
   'cct',
   'hurs']}}

To make a query of the collection:

In [12]:
new_col = col.search(frequency="Amon",
                     experiment=["aquaControl", "landControl"],
                     model=["CAM3", "CAM4", "CAM5Nor"],
                     variable=["pr", "cl", "huss"])

To load the collection into lazily using Dask:

In [13]:
ds_dict = new_col.to_dataset_dict(zarr_kwargs={"consolidated" : True})

Progress: |███████████████████████████████████████████████████████████████████████████████| 100.0% 

--> The keys in the returned dictionary of datasets are constructed as follows:
	'model.experiment.frequency'
             
--> There are 6 group(s)


In [14]:
ds_dict.keys()

dict_keys(['CAM3.landControl.Amon', 'CAM3.aquaControl.Amon', 'CAM5Nor.landControl.Amon', 'CAM4.landControl.Amon', 'CAM4.aquaControl.Amon', 'CAM5Nor.aquaControl.Amon'])

In [15]:
ds_dict["CAM3.landControl.Amon"]