In [None]:
import intake
import dask
import numpy as np
from dateutil import parser
import xarray as xr
import datetime
import fsspec

In [None]:
from dask_gateway import Gateway

In [None]:
try:
    import cmocean
    print(f"CMOCEAN IS INSTALLED... VERSION {cmocean.__version__}")
except Exception as e:
    print("CMOCEAN IS NOT INSTALLED... INSTALLING")
    !mamba install --yes --quiet cmocean

---
### Setup dask gateway

In [None]:
DASK_GATEWAY = Gateway()

In [None]:
def shutdown_cluster(name, client):
    client.close()
    DASK_GATEWAY.stop_cluster(name)

In [None]:
def clear_gateway(gateway):
    # Ensure that there are no existing cluster
    if len(gateway.list_clusters()) > 0:
        for c in gateway.list_clusters():
            gateway.stop_cluster(c.name)

In [None]:
def create_and_connect_cluster(worker_memory=16, worker_cores=2, n_workers=2, image=''):
    clear_gateway(DASK_GATEWAY)
    if image:
        options = DASK_GATEWAY.cluster_options(use_local_defaults=False)
        options.image = image
    else:
        options = DASK_GATEWAY.cluster_options()
    options.worker_memory = worker_memory
    options.worker_cores = worker_cores
    
    cluster = DASK_GATEWAY.new_cluster(
        cluster_options=options,
        shutdown_on_close=True,
    )
    
    cluster_report = DASK_GATEWAY.get_cluster(cluster.name)
    cluster_info = {
        'name': cluster_report.name,
        'options': cluster_report.options,
        'dashboard_link': cluster_report.dashboard_link,
    }
    print(cluster_info)
    cluster.scale(n_workers)
    return cluster, cluster.get_client()

In [None]:
cluster, client = create_and_connect_cluster(n_workers=2)

In [None]:
cluster

In [None]:
client

---

### Open data catalog (Sv)

In [None]:
import param
import pandas as pd
import datetime as dt
import panel as pn
import hvplot.xarray
from holoviews import streams

pn.extension()

In [None]:
delayed_catalog = dask.delayed(intake.open_catalog)('s3://ooi-raw-data/catalog.yaml').persist()

In [None]:
@dask.delayed
def get_data_cat(cat, name):
    return cat[name]

@dask.delayed
def get_date_list(data_cat):
    return [v.describe()['args']['urlpath'] for _,v in data_cat.items()]

In [None]:
all_cat = [c for c in list(delayed_catalog.compute()) if ('_Sv' in c)]

In [None]:
cat_map = {}
for c in all_cat:
    cat_map[c] = get_date_list(get_data_cat(delayed_catalog, c)).persist()

In [None]:
@dask.delayed
def fetch_dataset(source, chunks={'ping_time': 86400}):
    ds = xr.open_dataset(fsspec.get_mapper(source, anon=True), engine='zarr').sortby('ping_time').chunk(chunks)
    return ds

In [None]:
class EchopypeSingleViewer(param.Parameterized):
    _default_cat = 'CE04OSPS_Sv'
    _default_chunk = {'ping_time': 86400}
    _default_freq = 38000
    _default_color = 'tempo'
    
    _data_sources = cat_map[_default_cat].compute()
    
    
    data_catalog = param.ObjectSelector(default=_default_cat, objects=list(cat_map.keys()))
    num = param.Integer(bounds=(0, len(_data_sources) - 1))
    freqs = param.ObjectSelector(default=_default_freq, objects=[38000, 120000, 200000])
    colorbar = param.ObjectSelector(default=_default_color, objects=list(cmocean.cm.cmap_d.keys()))
    
    data = _data_sources[0]
    dataset = fetch_dataset(data).compute()
    echoview = dataset[['Sv']].sel(frequency=_default_freq).hvplot.image(
        x='ping_time', y='range_bin', 
        color='Sv', rasterize=True, 
        cmap=cmocean.cm.cmap_d[_default_color]
    )
                                        
    @param.depends('data_catalog', watch=True)
    def _update_num_list(self):
        self._data_sources = cat_map[self.data_catalog].compute()
        self.param['num'].bounds = (0, len(self._data_sources) - 1)
        self.num = 0
        self.data = self._data_sources[self.num]
        self.dataset = fetch_dataset(self.data).compute()
        
    @param.depends('dataset', watch=True)
    def _update_frequencies(self):
        if isinstance(self.dataset, xr.Dataset):
            frequencies = self.dataset['frequency'].values.tolist()
            self.param['freqs'].objects = frequencies
            self.param['freqs'].default = frequencies[0]
    
    @param.depends('dataset', watch=True)
    def _create_plot(self):
        if isinstance(self.dataset, xr.Dataset):
            filtered_ds = self.dataset[['Sv']].sel(frequency=self.freqs)
            self.echoview = filtered_ds.hvplot.image(
                x='ping_time', y='range_bin', 
                color='Sv', rasterize=True, 
                cmap=cmocean.cm.cmap_d[self.colorbar]
            )
    
    @param.depends('num')
    def file_source(self):
        self.data = self._data_sources[self.num]
        self.dataset = fetch_dataset(self.data).compute()
        return self.data
    
    @param.depends('echoview')
    def view(self):
        return self.echoview

In [None]:
ev = EchopypeSingleViewer()
plot = pn.Row(pn.Column(ev.param, ev.file_source), ev.view)

In [None]:
plot.servable()