# Interactive time-series visualisation of MONROE data

## Features

This notebook enables on-demand visualisation of time series data collected with MONROE platform. It provides the following features:
* Fast visualisation of multiple parameters of data collected on MONROE nodes along the time dimension.
* Adaptive granularity, where the data resolution is adjusted for maximum performance depending on the interactive visual analysis zoom level.
* Selection and on-disk storage of visualised data for further analysis in Orange data mining toolbox. 

## Prerequisites

### Database access
Cassandra DB used for the central MONROE data repository is a no-SQL database inappropriate for time-series data mining. Instead, this notebook requires that the data is stored in an Influx DB accessible from the machine where this script is run. Influx DB is a database that __[performs up to 168 times faster for certain queries than Cassandra DB](https://www.influxdata.com/blog/influxdb-vs-cassandra-time-series/)__. To create a replica of MONROE data on your local Influx DB: 
> 1) Create recipes for loading and naming MONROE data tables and their attributes that you plan to have in your local database. __[Example recipes](https://github.com/ivek1312/ricercando/tree/master/scripts/recipes)__.

> 2) Download __[MONROE daily dump CSV files](https://www.monroe-system.eu/user/dailyDumps/)__ of tables for which the recepies are present and for dates for which you would like to have data in your local database.

> 3) Run cassandra_dump_to_line_protocol.sh as __[per instructions](https://github.com/ivek1312/ricercando/tree/master/scripts)__.

### Python packages
The notebook requires the following Python packages:
* **ricercando** - this package is bundled in __[RICERCANDO repository](https://github.com/ivek1312/ricercando)__ and can be installed with ```pip install -e ."``` ran in the repository's root directory.


## Analysis flow
Please run the following cells one after another, starting with the pre-initialisation cells.

### Pre-initialisation

In [None]:
# Set to database IP. This must be reachable from the machine where this script is ran. 
# DB_IP='192.168.27.75'
DB_IP='localhost'

In [None]:
#load with these parameters
#jupyter notebook --NotebookApp.iopub_data_rate_limit=10000000000

import holoviews as hv
import param, paramnb
import pandas as pd
from colorcet import cm #pip install colorcet
from holoviews.streams import RangeXY, RangeX
import numpy as np
from bokeh.models import HoverTool

import time, os

os.environ['TZ'] = 'UTC'
time.tzset()

#https://stackoverflow.com/questions/41675041/bokeh-time-series-plot-annotation-is-off-by-1-hour/41698735
#https://github.com/bokeh/bokeh/issues/5499
#https://github.com/bokeh/bokeh/issues/1135
#https://github.com/bokeh/bokeh/issues/729
#https://github.com/bokeh/bokeh/issues/1103



from holoviews.operation.timeseries import rolling, rolling_outlier_std

hv.notebook_extension('bokeh', width=100)

from ricercando import set_connection_params, all_tables, all_nodes, getdf, tables_for_node, nodes_for_table 
from ricercando.db import _CATEGORICAL_COLUMNS
set_connection_params(host=DB_IP)
#set_connection_params(host='localhost')


rtt_opts= {'Points':{'style':dict(cmap='Set3', size=2), 'plot':dict( color_index='Message', width=1400, height=400, colorbar=True, tools=['hover']) }}
spikes_colors = {'Scheduling.Task.Started':'green', 'Scheduling.Task.Stopped':'red', 'Scheduling.Task.Deploying':'orange', 'Scheduling.Task.Deployed':'blues'}
spike_opts={'Spikes':{'style':dict( line_width=2, color=hv.Palette('Set3')) }}



sec_res = np.timedelta64(2*60,'m')
min_res = np.timedelta64(1*24*60,'m')
day_res = np.timedelta64(3*24*60,'m')


#data type, if df doesnt coontain these valuse, fill them with appropriate NA values => 'None' if categorical, zero if continous
values = {}
categorical = list(_CATEGORICAL_COLUMNS)

continous = ['Altitude', 'Latitude', 'Longitude', 'SatelliteCount', 'Speed', 'RTT', 'BootCounter', 'CPU_Apps', 'CPU_User', 'CumUptime', 'Swap', 
             'Uptime', 'RSCP','RSRP','RSRQ','RSSI', 'Temperature', 'IOWait', 
             'TCPCbytesAll','TCPSbytesAll','TCPDuration','TCPCRTTAVG','TCPCRTTSTD','TCPCPktsRetx','TCPCPktsOOO','TCPSPktsRetx','TCPSPktsOOO',
            'Download','Upload','RTTClient','RTTServer','Status'
            'UDPCbytesAll','UDPSbytesAll','UDPCDurat','UDPSDurat', 'TCPGoodPutUpload', 'TCPGoodPutDownload', 'UDPGoodPutUpload', 'UDPGoodPutDownload']

for val in categorical:
    values[val]='None'
    
for val in continous:
    values[val]=0

#all tables from dataframe
tables = 'ping gps modem event sensor nettest tcpcomplete udpcomplete'

#hv needs fixed number of overlays, layouts at zoom level... when spikes are empty nothing is shown. Lets workaround this by creating dummy spikes on center of x_range with length 0
def dummy_spikes(x_range:None, Day, Month, Year):
    if x_range is None:
        center = (np.datetime64('{0}-{1}-{2} 00:00:00'.format(Year, Month, Day)))+np.timedelta64(12*60,'m')
    else:
        center = (x_range[1]-x_range[0])/2+x_range[0]
    ds = hv.Dataset(( [center, center,center,center], ['Scheduling.Task.Started', 'Scheduling.Task.Stopped', 'Scheduling.Task.Deploying', 'Scheduling.Task.Deployed'] ), kdims=['Date', 'EventType'])
    spike_opts['Spikes']['plot']=dict(spike_length=0)
         
    return ds.to(hv.Spikes).overlay().opts(spike_opts)

def load_data(x_range, Node, Day, Month, Year, Coloring, Colormap, Y='RTT', **kwargs):
    if x_range is not None:

        t_delta = (x_range[1]-x_range[0])
        if t_delta<=sec_res: #we can show 6h of data in 1s resolution with no slowdown
            freq='10ms'
        elif t_delta<=min_res: #1 day
            freq='1m'
        else:
            freq='30m'
            
        df = getdf(tables, nodeid=Node, start_time=x_range[0] , end_time=x_range[1], freq=freq, tolerance=pd.Timedelta(seconds=600))
        events = getdf('event', nodeid=Node, start_time=x_range[0], end_time=x_range[1], freq='10ms')       


        
    else:
        df = getdf(tables, nodeid=Node, start_time='{0}-{1}-{2} 00:00:00'.format(Year, Month, Day), end_time='{0}-{1}-{2} 23:59:59'.format(Year, Month, Day), freq='1m', limit=200000, tolerance=pd.Timedelta(seconds=600))
        events = getdf('event', nodeid=Node, start_time='{0}-{1}-{2} 00:00:00'.format(Year, Month, Day), end_time='{0}-{1}-{2} 23:59:59'.format(Year, Month, Day), freq='10ms')
        



    #we must return something
    if df.empty:
        return    hv.Layout([hv.Points(pd.DataFrame(columns=['Date',Y]), kdims=['Date', Y],vdims=[]).opts(rtt_opts)  ]).cols(1)
#     df['Date'] =  df.index.to_datetime() #doesnt work with pandas 23
    df['Date'] =  pd.to_datetime(df.index)
    

    iccids = sorted(df.Iccid.unique().dropna().sort_values())

    for val in categorical: #categorical data have nan and can't be shown if nan is not set to some new categirical value=>'None'
        if val in df.columns:
            df[val] =df[val].cat.add_categories("None")
        else: df[val] = np.nan

    for val in continous:
        if val not in df.columns:
            df[val] = np.nan

    #replace categorical nans with 'None' so it can be shown on plot, else error occurs => float to string exception
    #replace continous nans with zeroes because
    df=df.fillna(value=values)


    rtt_opts['Points']['plot']['color_index']=Coloring

    rtt_opts['Points']['style']['cmap']=Colormap

    table = [ hv.Points(df[df.Iccid==iccid], kdims=['Date', Y], vdims=categorical+continous, label=iccid).opts(rtt_opts ) for iccid in iccids]

    if not events.empty and 'EventType' in events.columns:
        events=events[events['EventType'].str.contains('Scheduling')]
        if not events.empty:
            events['Date'] = pd.to_datetime(events.index)
            ds = hv.Dataset((events.Date, events.EventType), kdims=['Date', 'EventType'])
                    
            max = df[Y].max()
            min = df[Y].min()
            length= abs(max-min)
            
            spike_opts['Spikes']['plot']=dict(spike_length=length, position=min)            
            overlay = ds.to(hv.Spikes).overlay().opts(spike_opts)
            table = [(plot*overlay).relabel(iccid) for plot,iccid in zip(table,iccids)]    
             

        else:
            overlay = dummy_spikes(x_range, Day, Month, Year)
            table = [(plot*overlay).relabel(iccid) for plot,iccid in zip(table,iccids)]    

    else:
        overlay = dummy_spikes(x_range, Day, Month, Year)
        table = [(plot*overlay).relabel(iccid) for plot,iccid in zip(table,iccids)]    

        
    return  hv.Layout(table).cols(1)
  
    
    
    
#changes the html object when another column is selected, it si not possible to draw to same graph with different axes
def render(obj):
    renderer = hv.renderer('bokeh')
    plot = renderer.get_plot(obj)
    size = renderer.get_size(plot)
    #return renderer.figure_data(plot), size #bokeh older than 0.12.10 and holoview older than 1.9.0
    return renderer._figure_data(plot), size

In [None]:
class DateExplorer(hv.streams.Stream):
    
    output = paramnb.view.HTML(renderer=render)
    
    Node = param.ObjectSelector(default='582', objects=nodes_for_table()['ping'], precedence=5)
    
    Day = param.ObjectSelector(default='01', objects=["%.2d" % i for i in range(1,32)], precedence=1)
    Month = param.ObjectSelector(default='01', objects=["%.2d" % i for i in range(1,13)],precedence=2)

    
    #Month = param.Integer(default=10, bounds=(1, 12),precedence=2)
    Year = param.Integer(default=2018, bounds=(2016, 2018),precedence=3)
    Coloring = param.ObjectSelector(default='Frequency', objects=continous+categorical, precedence=4)
    Y = param.ObjectSelector(default='RTT', objects=continous, precedence=4)
    Colormap = param.ObjectSelector(default=cm['linear_bmy_10_95_c71'], objects=cm.values())
    range_stream = RangeX()
    
    def retData(self):
        if self.range_stream.x_range is not None:
            return getdf(tables, nodeid=self.Node, start_time=self.range_stream.x_range[0] , end_time=self.range_stream.x_range[1], freq='10ms', tolerance=pd.Timedelta(seconds=600))
        else:
            return getdf(tables, nodeid=self.Node, start_time='{0}-{1}-{2} 00:00:00'.format(self.Year, self.Month, self.Day), end_time='{0}-{1}-{2} 23:59:59'.format(self.Year, self.Month, self.Day), 
                         freq='10ms', tolerance=pd.Timedelta(seconds=600))
            
        
    
    def event(self, **kwargs):
        if self.output is None or 'Day' in kwargs or 'Month' in kwargs or 'Year' in kwargs or 'Node' in kwargs  or 'Coloring' in kwargs  or 'Y' in kwargs or 'Colormap' in kwargs:
                      #or 'Colormap' in kwargs
            if 'Day' in kwargs or 'Month' in kwargs or 'Year':
                self.range_stream=RangeX()
                
            self.output = hv.DynamicMap(hv.Callable(load_data, stream_mapping={0: [self.range_stream]}), streams=[self, self.range_stream])
            

        else:            
            super(DateExplorer, self).event( **kwargs)

class Plot(object):
    def __init__(self): 
        self.explorer = DateExplorer()
        paramnb.Widgets(self.explorer, continuous_update=True, callback=self.explorer.event, on_init=True)
    def retData(self):
        return self.explorer.retData()

## Visualisation initalisation and interaction
Running the cell below should produce a node/date/parameter selector and plots of these for all of the node's interfaces.
If you want to visualise data from multiple nodes simultaneously, simply copy the cell, rename the variable (say to ```plot2```) and run it.

### Interactive visualisation
The visualisation widget allows the user to:
* Select the date (day, month, year) for which the data will be shown. 
* Select the node whose data will be visualised.
* Select the parameter that will be shown on the Y axis.
* Select the parameter that will correspond to the coloring of the plotted points. 
* Select the colormap.

The data is initially always shown on a 24-hour plot and for all interfaces on the selected node (plot title corresponds to the interface ICCID). However, the user can zoom in to a particular region on the plot, in which case all plots are zoomed in simultaneously.

The time series plot by default shows experiment start/stop/deployed/deploying times, as user experiment may impact metadata readings. 

In [None]:
# Running this cell should produce a node/date/parameter selector and the corresponding plots
import warnings
warnings.simplefilter(action='ignore', category=FutureWarning)
plot1 = Plot()

## Data selection and storage
Data can be selected with Lasso select or Box select on a plot. 
Calling ```retData()``` function of the ```Plot``` object returns a data frame that corresponds to the selected data as in the example below.

In [None]:
# Returns a dataframe that is held in df_selected object
df_selected = plot1.retData()

In [None]:
#find column cerrlation why RTT changed, growth=True => why it raised, growth=False => why lowered, middle is where set is divided
from ricercando.significance import (hyper_test)
def test_columns (df, growth=True, middle=-5000):
    result = []
    
    for iccid,group in df.groupby('Iccid'):
        if middle==-5000:
            median = group['RTT'].median()
        else:
            median=middle
        if growth:
            rtt_ok = group.apply(lambda row: True if row['RTT'] > median  else False, axis=1)
        else:
            rtt_ok = group.apply(lambda row: True if row['RTT'] < median  else False, axis=1)
        for col in [g for g in group.columns if g not in['RTT', 'NodeID', 'Iccid', 'MCC_MNC', 'Interface', 'Error']]:
            temp = hyper_test(group[col], rtt_ok).reset_index()
            if not temp.empty:
                temp.rename( columns={temp.columns[0]: 'Variable'}, inplace=True)
                temp['Variable'] = temp.apply(lambda row: iccid+','+col+'='+str(row['Variable'][0]), axis=1)
                result.append(temp)
    temp = pd.concat(result).sort_values(by='enrichment', ascending=False)
    temp = temp[temp['count']>400]
#     return temp[temp['p-value']<=0.2]
    return temp

In [None]:
#find out why RTT has spiked
test_columns (plot1.retData())

The selected data can now be stored on a local disk and loaded in Orange using the iPython connector widget from the MONROE toolbox. 

In [None]:
# Stores the df_selected dataframe to a local disk.
%store df_selected