## Simple Pipeline Monitoring Dashboard

These days any project that is deployed should incorporate the principles of CI/CD (highly recommended [great talk from Eric Ma](https://www.youtube.com/watch?v=Dx2vG6qmtPs&t=232s) July 2020 - describes the issue in the realm of _Data Science_).  
After setting up our [dagster pipeline](https://dev.to/sephib/implementing-a-graph-network-pipeline-with-dagster-3i3a) we needed to implement some sort monitoring ed
solution to review the outcome of our workflow. Working in a small DS team we needed to push forward and couldn't wait for the _heavy guns_ of the enterprise IT to take over. So until we have there support here is a simple dashboard that we put together to monitor our `Assets`.

In this blog post we aim to describe how we created a functional dashboard based on python widgets.  
We will describe the origin of our data, followed by the our solution using python's Panel library.  

* NEED TO INSERT IMAGE OF THE OUTCOME


## Dagster Assets   

We are not going to dive into [Dagster](https://docs.dagster.io/) (see our previous [blog post on our data pipeline](https://dev.to/sephib/implementing-a-graph-network-pipeline-with-dagster-3i3a)), but the TLDR  is that Dagster is an orchestration framework for building modern data applications and workflows. The framework has integrated logging and the ability to [produce persistent assets](https://docs.dagster.io/overview/asset-materializations#materializing-an-asset) that are stored in a database (in our case postgresql) for future references.   
For our project we are interested in monitoring the number of nodes and edges that we generate in our data pipline workflow. During our _pipeline run_ we log (or in the Dagster's jargon Materialize - see [AssetMaterialization in the documnetation](https://docs.dagster.io/examples/materializations#main)) various stats on the datasets that we manipulate. We would like to view the changes of these stats over time in order to verify the health of our system/pipeline.


## Panel widgets  

These days the python ecosystem is very rich and vibrant with various visualization libraries that are constatnly beeing developed. Two of the libraries that we reviewed were [streamlit](https://www.streamlit.io/) and [Panel](https://panel.holoviz.org/). We decided on going with Panel which seamed to suite our needs (mainly due to the structure and maintanance approach).  
Inspired by a talk given by  [Lina Weichbrodt in the MLOps meetup](https://www.youtube.com/watch?v=Un30yb1WlpU&feature=youtu.be)  we wanted to view the percent change of our metrics over time.   

Panel is capable of displaying and integrating many python widgets from various packages. We are going to work with hvplot which is best suited to our needs, due to it's richnes and integration with Pandas.  


## Getting our data    
In this section we are describing how we extracted the data from Dagster Asset database. If this is not relevant for, you may want to jump to the sample data section.  
In order to get the Asset data we needed to dig into the `event_log` table which logs all the events that are generated when running a Dagster pipeline. The code in the [linked repo] extracts the data into a Pandas Dataframe, based on the defined `Asset Keys` that are defined in the `Materialization` process.    




In [None]:
from sqlalchemy import create_engine, MetaData, select
from datetime import datetime
import pandas as pd
import json

In [None]:


engine = create_engine('postgersql://database_user:user_password@server:port/database_name')
conn = engine.connect()
meta = MetaData()
meta.reflect(engine)
t_event_logs = meta.tables['event_logs']  # table which dagster saves all the events


In [None]:
# helper function to get all asset_keys in database
def get_all_asset_keys(table, filter_asset_keys:str=''):
    _sd = select([table.c.asset_key]).distinct(table.c.asset_key)
    return [asset_key[0] for asset_key in conn.execute(_sd) if asset_key[0] and filter_asset_keys in asset_key[0]]


def get_asset_keys_values(result_list)->dict:
    assets={}
    for asset_key, res in result_list:
        time_stamp = datetime.fromtimestamp(json.loads(res)['timestamp']).strftime('%Y-%m-%d %H:%M:%S')
        assets[time_stamp] = {}
        assets[time_stamp]['asset_key'] = asset_key
        for key_message, value_message in [m.split('=') for m in json.load(res)['message'].split('\n') if '=' in m]:
            if key_message.strip() == 'event_specific_data':
                for _event in json.loads(value_message).values():
                    if isinstance(_event, list):
                        for asset_list in _event[2]:
                            assets[time_stamp][asset_list[0]]=asset_list[-1][0]



In [None]:
assets = get_all_asset_keys(t_event_logs,'')

_s = select([t_event_logs.c.asset_key, t_event_logs.c.event]).where(t_event_logs.c.asset_key.in_(assets))
result = conn.execute(_s)
result_list = result.fetchall()  # or fetchmany(n)
assets = get_asset_keys_values(result_list)

df_assets = pd.DataFrame.from_dict(assets, orient='index')
df_assets.index = pd.to_datetime(df_assets.index)
df_assets_pivot = pd.pivot(df_assets, columns='asset_key')
df_assets_pct_change = df_assets_pivot.pct_change()
df_assets_pct_change


### Sample Data 

For the dashboard in this post, we are going to use the sample us_crime data from the `hvplot.sample_data` set.   

Since we are simulating our datapipline outcomes we are going to use a sample of the columns:  
* Year - as our X / time axis
* Violent crime total 	
* Murder and nonnegligent Manslaughter
* Robbery  

Lets view the data

In [13]:
from hvplot.sample_data import us_crime
data = us_crime.read()
sample_cols = ['Year', 'Robbery', 'Violent crime total', 'Murder and nonnegligent Manslaughter']
crim_data = data[sample_cols].copy()
crim_data.sample(3)

Unnamed: 0,Year,Robbery,Violent crime total,Murder and nonnegligent Manslaughter
18,1978,426930,1085550,19560
8,1968,262840,595010,13800
53,2013,345095,1199684,14319


Since we are interested in the change of volume of the data along the time we can use Panda's [pct_change](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.pct_change.html) method to generate the values that we need.  This also allows to display all the dataset in the same graph since the nominal values of the different datasets are in different order of magnitude.  

```
cols = [col for col in crim_data.columns if col != 'Year']
for c in cols:
    crim_data[f'{c}_pct'] = crim_data[c].pct_change()
col_pct = [col for col in data.columns if col.endswith('pct')]
crime_data = crime_data[['Year'] + col_pct]
crime_data.sample(3)
```


Now that we have the data we can build our dashboard



## Dashboard  

We have 3 widgets that we want to use in our dashboard:  
1. A line plot -  displaying the datasets  
2. A CheckBoxGroup widget - allowing to control which dataset will be visible  
3. A date_range_slider widget - displaying the date range that we want to display  


Our dashboard will display each data series along the X time axis. 

### DateRangeSlider

Panel's [DateRangeSlider](https://panel.holoviz.org/reference/widgets/DateRangeSlider.html) widget "allows selecting a date range using a slider with two handles".  

The parametrs of the widget are self explanitory  

```
date_range_slider = pn.widgets.DateRangeSlider(
        name='Date Range Slider',
        start=data[date_col].min(), 
        end=data[date_col].max(),
        value=(data[date_col].max() - timedelta(days=1), data[date_col].max(),)   # defualt value for slider
)
```  



### CheckBoxGroup  

Panel's [CheckBoxGroup](https://panel.holoviz.org/reference/widgets/CheckBoxGroup.html) widget "allows selecting between a list of options by ticking the corresponding checkboxes".  
  
The parametrs of the widget are self explanitory 

```
 checkbox_widget = pn.widgets.CheckBoxGroup(name='Data sources'
                                               , value= cols
                                               , options=cols
    )
    ```


## Line Plot & Panel's Glue  

Now lets look at the Line plot code:  

>import holoviews.plotting.bokeh  
>import hvplot.pandas  

These define that [bokeh](https://bokeh.org/) will be the visualization engine for hvplot, in addition to allow for hvplot to directly use Panda's dataframes as the datasources for the plots.  

> @pn.depends(date_range_slider.param.value)  

The Panel decorator causes the _line plot_ to change based on the value that is changed form the `date_range_slider` widget.   

> start_date = date_range\[0\], end_date = date_range\[1\]  
> mask = (crime_data\[date_col\] > start_date) & (crime_data\[date_col\] <= end_date)   
> data = crime_data.loc\[mask\]   

In order to filter the dataframe we are masking the data based to the current values from the `date_range_slider` widget.  

> crime_data.hvplot.line  

This is the basic call for a  [line plot] to be rendered from the Panda's dataframe.  
```
 checkbox_widget.jscallback(args={'plot': lines[key]},  
                                  value=  
                                   """  
                                   for (var i = 0; i<plot.renderers.length; i++) {  
                                         plot.renderers[i].visible = cb_obj.active.indexOf(i) >= 0  
                                        }  
                                   """  
```
  
Here is some JavaScript glue to connect the `hvplot.line` and the `checkbox_widget`(Thx to [Philipp Rudiger](https://discourse.holoviz.org/u/philippjfr/summary) for the reference). We are iterating through the various lines and setting their visibility based on the checkbox_widget values.   

Here is the full function:  
```
@pn.depends(date_range_slider.param.value)
def get_plot(date_range):
    data = dft
    start_date = date_range[0], end_date = date_range[1]
    mask = (crime_data[date_col] > start_date) & (crime_data[date_col] <= end_date)
    data = crime_data.loc[mask]

    lines = data[cols + [date_col]].hvplot.line(
          x=date_col
        , y=col_pct[1:]
        , value_label= 'value'  
        , legend='right'
        , height=400
        , width=800
        , muted_alpha=0
        , ylim=(-1, 1)
        , xlabel='time'
        , ylabel='pct'
    )
    for key in list(dict(lines).keys()):
        checkbox_widget.jscallback(args={'plot': lines[key]}
                                    ,  value=
                                    """
                                    for (var i = 0; i<plot.renderers.length; i++) {
                                      plot.renderers[i].visible = cb_obj.active.indexOf(i) >= 0
                                    }
                                    """
        )
return lines.opts(axiswise=True)
```
  

## Final function

Now we can create a functions that connects the different widgets

```
def get_dashboard(dft, cols, date_col):
    date_range_slider = pn.widgets.DateRangeSlider(
        name='Date Range Slider',
        start=data[date_col].min(), end=data[date_col].max(),
        value=(data[date_col].max() - timedelta(days=1), data[date_col].max(),)
    )
    checkbox_widget = pn.widgets.CheckBoxGroup(name='Data sources'
                                               , value= cols
                                               , options=cols
    )
    @pn.depends(date_range_slider.param.value)
    def get_plot(date_range):
        data = dft
        start_date = date_range[0]
        end_date = date_range[1]
        mask = (data[date_col] > start_date) & (data[date_col] <= end_date)
        data = data.loc[mask]

        lines = data[cols + [date_col]].hvplot.line(
              x=date_col
            , y=col_pct[1:]
            , value_label= 'value'  
            , legend='right'
            , height=400
            , width=800
            , muted_alpha=0
            , ylim=(-1, 1)
            , xlabel='time'
            , ylabel='pct'
        )
        for key in list(dict(lines).keys()):
            checkbox_widget.jscallback(args={'plot': lines[key]}
                                       ,  value=
                                       """
                                        for (var i = 0; i<plot.renderers.length; i++) {
                                          plot.renderers[i].visible = cb_obj.active.indexOf(i) >= 0
                                        }
                                       """
            )
        return lines.opts(axiswise=True)
    return get_plot, date_range_slider, checkbox_widget
    ```

## Desgin the Dashboard  

Panel has a simple method to aggregate all the widgets together using rows and columns (like a simple HTML table).   

<img src="../docs/images/panel_layout.png" alt="panel layout" width="450">  
     
Below is the code to design the layout  

```
plot, date_range_slider, checkbox_widget = get_dashboard(data, col_pct, 'Year')
dashboard=pn.Row(
    pn.Column(
        "Plot",
        date_range_slider, checkbox_widget
    )
    , plot
)
dashboard

```  

## Conclusion  
In this blog post we outlined our solution for monitoring our Dagster's Assets that we log during our data pipeline workflow.   
Using the Panel / hvplot libraries was quite straightforward. The documentation and reference gallaries were very usefull, although getting the linkage between the plot and the checkbox is not optimal since the current solution does not rerender the plot, so it's usefull if the datasets are in the same order of magnitude. However if they were not, this solution should be upgraded to be rerendered as the example in the last section in th [getting started documentation](https://panel.holoviz.org/getting_started/index.html).