# Example of Funnel using Xcollection

In [129]:
from xcollection import Collection
import xcollection
import pydantic
import intake
import pandas as pd
from ncar_jobqueue import NCARCluster
from distributed import Client
import dask

In [34]:
catalog = intake.open_esm_datastore(
    'https://raw.githubusercontent.com/NCAR/cesm2-le-aws/main/data/aws-cesm2-le.json'
)
catalog

Unnamed: 0,unique
component,4
spatial_domain,3
frequency,3
path,284
experiment,2
forcing_variant,2
variable,57
long_name,57
start_time,4
end_time,6


In [58]:
cat = catalog.search(variable='SALT', component='ocn', frequency='monthly')

In [62]:
dsets = cat.to_dataset_dict(storage_options={'anon':True})


--> The keys in the returned dictionary of datasets are constructed as follows:
	'component.experiment.frequency.forcing_variant'
█

In [30]:
cluster = NCARCluster()
cluster.scale(10)
client = Client(cluster)

In [31]:
client

0,1
Connection method: Cluster object,Cluster type: PBSCluster
Dashboard: https://jupyterhub.hpc.ucar.edu/stable/user/mgrover/proxy/8787/status,

0,1
Dashboard: https://jupyterhub.hpc.ucar.edu/stable/user/mgrover/proxy/8787/status,Workers: 0
Total threads:  0,Total memory:  0 B

0,1
Comm: tcp://10.12.206.3:40755,Workers: 0
Dashboard: https://jupyterhub.hpc.ucar.edu/stable/user/mgrover/proxy/8787/status,Total threads:  0
Started:  Just now,Total memory:  0 B


In [33]:
cluster

Tab(children=(HTML(value='\n            <div class="jp-RenderedHTMLCommon jp-RenderedHTML jp-mod-trusted jp-Ou…

In [348]:
@pydantic.dataclasses.dataclass
class Funnel:
    dsets: xcollection.Collection
    
    def __post_init_post_parse__(self):
        self.keys = self.dsets.keys()
        self.base_keys = self.keys
        self.operator_keys = [None]
        self.base_token = dask.base.tokenize(self.dsets)
        self.base_dsets = self.dsets
        self.operated_dsets = self.dsets
        
    def apply(self, operator, *operator_args, **operator_kwargs):
        operator_name = operator.__name__
        operated_dsets = self.dsets.map(operator, *operator_kwargs, **operator_kwargs)
        self.operator_keys = operator_name
        out_dsets = {}
        for key in self.base_dsets.keys():
            out_dsets[f'{key}.{operator_name}'] = operated_dsets[key]
        self.operated_dsets = operated_dsets
        return self.operated_dsets
    
    def df(self):
        dataframe = pd.DataFrame({'base_keys':self.base_keys,
                                  'operator_keys':self.operator_keys})
        self.database = dataframe
        return dataframe

In [349]:
collection = xcollection.Collection(dsets)

In [350]:
data_collection = Funnel(collection)

In [353]:
def func_a(ds):
    return ds.isel(member_id=range(2))

In [356]:
print(data_collection.apply(func_a))

<Collection (1 keys)>
🔑 ocn.ssp370.monthly.cmip6
<xarray.Dataset>
Dimensions:     (member_id: 2, time: 1032, z_t: 60, nlat: 384, nlon: 320, d2: 2)
Coordinates:
  * member_id   (member_id) <U12 'r10i1181p1f1' 'r10i1231p1f1'
  * time        (time) object 2015-01-16 12:00:00 ... 2100-12-16 12:00:00
    time_bound  (time, d2) object dask.array<chunksize=(1032, 2), meta=np.ndarray>
  * z_t         (z_t) float32 500.0 1.5e+03 2.5e+03 ... 5.125e+05 5.375e+05
Dimensions without coordinates: nlat, nlon, d2
Data variables:
    SALT        (member_id, time, z_t, nlat, nlon) float32 dask.array<chunksize=(1, 6, 60, 384, 320), meta=np.ndarray>
Attributes:
    Conventions:             CF-1.0; http://www.cgd.ucar.edu/cms/eaton/netcdf...
    calendar:                All years have exactly  365 days.
    cell_methods:            cell_methods = time: mean ==> the variable value...
    contents:                Diagnostic and Prognostic Variables
    model_doi_url:           https://doi.org/10.5065/D67H1H0

In [357]:
data_collection.df()

Unnamed: 0,base_keys,operator_keys
0,ocn.ssp370.monthly.cmip6,func_a
