In [None]:
%load_ext autoreload
%autoreload 2

In [None]:
import sys
sys.path.append("../")

import pandas as pd
import time
from azureml.core import Workspace
ws = Workspace.from_config()

kv=ws.get_default_keyvault()


#### Prerequisite
If you use the provision feature, then just need a workspace ws object
If you need to create the cluster manually
    1. Create a service principal and secret (SP)
    2. Provision an ADX cluster and create a DB
    3. Add the SP to be contributor of the cluster
    4. pip install following packages: pip install --upgrade azure-mgmt-kusto azure-kusto-ingest azure-kusto-data azure-identity 



#### Provisioning resource


The following code provision ADX cluster and register neccessary information at the workspace's keyvault

In [None]:
#Option to set parameters to a custom ADX cluster
# from monitoring import KV_SP_ID, KV_SP_KEY, KV_ADX_DB, KV_ADX_URI, KV_TENANT_ID
# tenant_id = "72f988bf-86f1-41af-91ab-2d7cd011db47"
# #Application ID
# client_id = "af883abf-89dd-4889-bdb3-1ee84f68465e"
# #Client Secret, set it at your WS' keyvault with key name same as your client_id
# client_secret = kv.get_secret(client_id)
# subscription_id = "0e9bace8-7a81-4922-83b5-d995ff706507"

# cluster_uri = "https://adx02.westus2.kusto.windows.net" #URL of the ADX Cluster
# kv.set_secret(KV_SP_ID,client_id)
# kv.set_secret(KV_SP_KEY,client_secret)
# kv.set_secret(KV_ADX_DB,"db01")
# kv.set_secret(KV_ADX_URI,cluster_uri)
# kv.set_secret(KV_TENANT_ID,tenant_id)


In [None]:
from monitoring.management import provision
provision(ws)

#### Test Ingestion


In [None]:
from monitoring.data_collector import Online_Collector
table_name = "isd_weather_test4" #new dataset

sample_pd_data = pd.read_parquet("data/test_data.parquet").head(10)
sample_pd_data['timestamp'] = sample_pd_data['datetime']
sample_pd_data.drop(['datetime'], inplace=True, axis=1)

online_collector = Online_Collector(table_name,ws=ws)


In [None]:
online_collector.cluster_uri

In [None]:
online_collector.stream_collect_df(sample_pd_data)

In [None]:
online_collector.batch_collect(sample_pd_data)

### Streaming Ingestion and Real Time Visualization

In [None]:

# Ingest streaming data  asynchronously with internal buffering mechanism to lower impact to main scoring thread
streaming_table_name="streaming_test"
streaming_collector = Online_Collector(streaming_table_name,ws=ws)

import random
streaming_collector.start_logging_df(buffer_time=2, batch_size=10)

for run_id in ["r000001", "r000002", "r000003", "r000004", "r000005"]:
    for i in range(1000):
        for lr in ["0.001", "0.002"]:
            df = pd.DataFrame({ "timestamp":pd.to_datetime('today'), "lr":[lr],"metric1":[random.uniform(3,50)] })
            streaming_collector.stream_collect_df_queue(df)
# streaming_collector.stop_logging()



In [None]:
from monitoring.query import RT_Visualization
rt_viz =RT_Visualization(streaming_table_name,ws)
rt_viz.scatter(max_records=200, ago='12h',groupby='lr', y_metric='metric1',x_metric='timestamp')

### Anomaly Detection

In [None]:
from monitoring.query import KustoQuery
query = KustoQuery(streaming_table_name, ws)

query.anomaly_detection(min_t="4/26/2022 10:08:45", max_t="4/23/2022 10:10:00", step="0.5s", metric="metric1", agg="avg", ts_col="timestamp", groupby="lr", sensitivity=0.1, filter="0.001")


### Drift Query

In [None]:
from monitoring.drift_analysis import Drift_Analysis
drift_analysis =Drift_Analysis(ws)


In [None]:
drift_analysis.analyze_drift(limit=10000,base_table_name = 'ISDWeather',tgt_table_name='ISDWeather', base_dt_from='2013-04-13', base_dt_to='2014-05-13', tgt_dt_from='2013-04-13', tgt_dt_to='2014-05-13', bin='30d')


In [None]:
from jupyter_dash import JupyterDash
from datetime import date
from dateutil import parser
import plotly.express as px
from dash import Dash, dcc, html, Input, Output
import plotly.graph_objects as go

external_stylesheets = ['https://codepen.io/chriddyp/pen/bWLwgP.css']

app = JupyterDash(__name__, external_stylesheets=external_stylesheets)

tables_list = drift_analysis.list_tables()


app.layout = html.Div([
    html.Div([

        html.Div([
            dcc.Dropdown(
                tables_list,
                tables_list[0],
                id='tables',
            ),
            dcc.Dropdown(
                id='columns'
            )
        ], style={'width': '20%', 'display': 'inline-block'}),

        html.Div([    dcc.DatePickerRange(
        id='timeline',

    ),

        ], style={'width': '20%', 'float': 'right', 'display': 'inline-block'})
    ]),

    dcc.Graph(id='graph'),


])

@app.callback(
    Output('columns', 'options'),
    Input('tables', 'value'))
def set_columns_options(table_name):
    return drift_analysis.list_table_columns(table_name)['AttributeName'].values

@app.callback(
    Output('columns', 'value'),
    Input('columns', 'options'))
def set_columns_value(available_options):
    return available_options[0]
@app.callback(
    [Output('timeline', 'min_date_allowed'),Output('timeline', 'max_date_allowed'),Output('timeline', 'initial_visible_month'),Output('timeline', 'end_date')],
    Input('tables', 'value'))
def set_columns_options(table_name):
    time_range =drift_analysis.get_time_range(table_name)

    start_date = parser.parse(time_range[0]).replace(microsecond=0, second=0, minute=0)
    end_date = parser.parse(time_range[1]).replace(microsecond=0, second=0, minute=0)
    
    return [start_date, end_date,end_date, end_date]
# @app.callback(
#     Output('graph', 'figure'),
#     Input('tables', 'value'),
#     Input('columns', 'value'),
#     Input('timeline', 'start_date'),
#     Input('timeline', 'end_date')

#     )
# def update_figure(table, column, start_date, end_date):
#     timestamp_col = drift_analysis.get_timestamp_col(table)
#     if timestamp_col!=column:
#         if start_date is not None:
#             query =f"{table}| where ['{timestamp_col}'] > datetime({start_date}) and ['{timestamp_col}'] <datetime({end_date})| summarize {column} =count({column}) by bin({timestamp_col}, 1d)"
#         else:
#             query =f"{table}| where ['{timestamp_col}'] <datetime({end_date})| summarize {column} =count({column}) by bin({timestamp_col}, 1d)"
#         filtered_df = drift_analysis.query(query)
#         print(query)


#         fig = go.Figure([go.Scatter(x=filtered_df[column].values, y=filtered_df[timestamp_col].values, name=f"{column}")])


#         return fig


app.run_server(mode='inline', debug=False)
