# Big Data Application
---

In this notebook we are going to put our visualizations together into a Application using Panel Pipelines.

In [None]:
import param
import panel as pn

pn.extension()

In [None]:
import dask_gateway
import dask.dataframe as dd

import hvplot.dask

In [None]:
gateway = dask_gateway.Gateway()

In [None]:
class LaunchDaskCluster(param.Parameterized):

    workers = pn.widgets.IntRangeSlider(name='Number of Workers', start=1, end=10, value=(1, 5), step=1)
    launch_dask = pn.widgets.Button(name='Launch Dask')
    dask_status = pn.widgets.StaticText(name='Dask Status', value='Not Connected')
    years = pn.widgets.IntRangeSlider(name='Choose Date Range', start=2003, end=2022, value=(2018, 2022), step=1)
    cluster = None
    cluster_name = param.String()
    client = None
    ready = param.Boolean(default=False, precedence=-1)
    
    def __init__(self, **params):
        super().__init__(**params)
        self.launch_dask.on_click(self.dask_launcher)
    
    @param.output(('cluster_name', param.String), ('start_year', param.Number), ('end_year', param.Number))
    def output(self):
        return self.cluster_name, self.years.value[0], self.years.value[1]

    def dask_launcher(self, event):
        if len(running_clusters := gateway.list_clusters())>0:
            self.dask_status.value = "Found existing dask cluster, connecting and rescaling"
            self.cluster = gateway.connect(running_clusters[0].name)
            self.cluster.adapt(self.workers.value[0], self.workers.value[1])
        else:
            self.dask_status.value = "Launching new dask cluster"
            self.cluster = gateway.new_cluster(conda_environment="pycon2023/pycon2023-tutorial", profile="Medium Worker")
            self.cluster.adapt(self.workers.value[0], self.workers.value[1])
        
        self.cluster_name = self.cluster.name
        self.client = self.cluster.get_client()
        self.dask_status.value = "Waiting for at least 1 worker"
        self.client.wait_for_workers(1)
        self.dask_status.value = f"Cluster Ready - {self.client.dashboard_link}"
        self.ready = True

    def panel(self):
        return pn.Column(
            self.workers,
            self.years,
            self.launch_dask,
            self.dask_status,
        )

stage1 = LaunchDaskCluster()
# stage1.panel()

In [None]:
class Dashboard(param.Parameterized):
    method = pn.widgets.RadioButtonGroup(name='Method', options=['min', 'mean', 'max'])
    field = pn.widgets.RadioButtonGroup(name='Field', options=['DEP_DELAY', 'ARR_DELAY'])
    groupby = pn.widgets.RadioButtonGroup(name='GroupBy', options=['YEAR', 'MONTH', 'DAY_OF_MONTH', 'OP_CARRIER'], value='MONTH')
    cluster_name = param.String()
    start_year = param.Number()
    end_year = param.Number()
    
    def __init__(self, **params):
        super().__init__(**params)
        url = "gcs://quansight-datasets/airline-ontime-performance/sorted/full_dataset.parquet"
        columns = [
            'YEAR', 'MONTH', 'DAY_OF_MONTH', 'DAY_OF_WEEK', 'FL_DATE', 'OP_CARRIER', 
            'TAIL_NUM', 'OP_CARRIER_FL_NUM', 'ORIGIN', 'DEST', 'CRS_DEP_TIME', 
            'DEP_TIME', 'DEP_DELAY', 'ARR_TIME', 'ARR_DELAY', 'CANCELLED', 
            'CANCELLATION_CODE', 'DIVERTED', 'AIR_TIME', 'FLIGHTS', 'DISTANCE',
            'CARRIER_DELAY', 'WEATHER_DELAY', 'NAS_DELAY', 'SECURITY_DELAY', 
            'LATE_AIRCRAFT_DELAY', 'DIV_ARR_DELAY'
        ]
        self.flights = dd.read_parquet(
                f"gcs://quansight-datasets/airline-ontime-performance/sorted/parquet_by_year", 
                filters=[
                    ('YEAR', '>=', self.start_year),
                    ('YEAR', '<=', self.end_year)
                ],
                columns=columns,
        ).interactive()
        
    @param.depends('cluster_name')
    def dask_dashboard(self):
        self.cluster = gateway.connect(self.cluster_name)
        self.client = self.cluster.get_client()
        return pn.pane.HTML(f"""
        <iframe width="800" height="800" src="{self.client.dashboard_link}"
        frameborder="0" scrolling="no" marginheight="0" marginwidth="0"></iframe>
        """)
    
    @param.depends('groupby', 'method', 'field')        
    def plot_data(self):
        return (
            self.flights
                .groupby(self.groupby)[self.field]
                .agg(how=self.method)
                .hvplot()
        ).panel()
    
    @param.output('cluster_name', param.String)
    def output(self):
        return self.cluster_name
        
        
    def panel(self):
        return pn.Row(
            pn.Column(
                self.groupby,
                self.field,
                self.method,
                pn.layout.Divider(),
                self.plot_data,
            ),
            self.dask_dashboard,
        )
    

In [None]:
stage2 = Dashboard(
            cluster_name=stage1.output()[0],
            start_year=stage1.output()[1],
            end_year=stage1.output()[2]
)
# stage2.panel()

In [None]:
class StopDaskCluster(param.Parameterized):
    cluster_name = param.String()
    dask_status = pn.widgets.StaticText(name='Dask Status', value='')
    stop_dask = pn.widgets.Button(name='Stop Dask Cluster')
    
    def __init__(self, **params):
        super().__init__(**params)
        self.stop_dask.on_click(self.shutdown_cluster)
        
    @param.depends('cluster_name')
    def dask_dashboard(self):
        self.dask_status.value = "Checking Status"
        self.cluster = gateway.connect(self.cluster_name)
        self.dask_status.value = self.cluster.status
    
    def shutdown_cluster(self, event):
        self.dask_status.value = "Shutting Cluster Down"
        self.cluster.shutdown()
        self.dask_status.value = self.cluster.status
        
    def panel(self):
        return pn.Column(
            self.stop_dask,
            self.dask_status,
        )

In [None]:
stage3 = StopDaskCluster(cluster_name=stage2.output())
# stage3.panel()

In [None]:
pipeline = pn.pipeline.Pipeline()

In [None]:
pipeline.add_stage('Launch Dask', stage1)
pipeline.add_stage('Visualization', stage2)
pipeline.add_stage('Stop Cluster', stage3)

In [None]:
pipeline.servable()

In [None]:
gateway.list_clusters()

In [None]:
# panel serve 07b-big-data-dashboard-pipeline.ipynb --allow-websocket-origin=nebari.quansight.dev
# https://nebari.quansight.dev/user/dharhas@quansight.com/proxy/5006/07b-big-data-dashboard-pipeline