# Notebook 3: System Integration
In this notebook, we connect our taxicab prediction workload to a monitoring system that leverages importance weighting. We:

1. Walk through bare-bones architecture of the monitoring system built on top of `duckdb`
2. Create a workload with simulated delays
3. Plot metrics over time for our workload

This notebook should be completed _after_ the second notebook (`2-importance-weightingø.ipynb`).

In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
from datetime import date, datetime, timedelta
from db import Task
from pipeline import components
from sklearn.metrics import accuracy_score

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import random
import seaborn as sns
import string

In [3]:
# PARAMETERS
train_start_date = date(2020, 1, 1)
train_end_date = date(2020, 1, 31)
inference_start_date = date(2020, 2, 1)
inference_end_date = date(2020, 3, 31)
cadence = 7

feature_columns = [
    "pickup_weekday",
    "pickup_hour",
    "pickup_minute",
    "work_hours",
    "passenger_count",
    "trip_distance",
    "trip_speed",
    "PULocationID",
    "DOLocationID",
    "RatecodeID",
    "congestion_surcharge",
    "loc_code_diffs",
]
label_column = "high_tip_indicator"

WINDOW_SIZE = 60 * 60 * 24 * 7 # 7 days
DELAY = 60 * 60 * 24 * 2 # 2 days
ID_LEN = 10

## Monitoring System Architecture

For our system prototype (built in Python and DuckDB), we have three main layers: interface, execution, and storage.

### Interface

The system has the following methods exposed to the client:

* `log_prediction`
* `log_feedback`
* `register_metric`
* `register_training_set`
* `compute_metrics`

Metrics consist of a name (e.g., accuracy), function that accepts `y_true` and `y_pred`, and a list of window sizes (seconds). When users want to retrieve the time-series metrics for each window, they can call `compute_metrics`. To handle importance weighting, `register_training_set` defines and precomputes a binning function.

### Execution

The two most complicated triggers are `log_prediction` and `log_feedback`. On `log_prediction`, the system applies the binning function to the features to compute a subgroup_id, increments a counter of unlabeled predictions for each window containing the prediction timestamp and subgroup_id, and writes the prediction to a predictions table. On `log_feedback`, the system identifies the corresponding prediction timestamp and subgroup_id, decrements relevant windows, and writes the feedback to a feedbacks table. Then on `compute_metrics`, we join predictions and feedbacks tables and compute metrics for each window, compute importance-weighted estimates for unlabeled predictions, and merge the joined result with the importance-weighted estimate.

### Storage

We maintain predictions, feedbacks, and metric tables. We also have a view of predictions join feedbacks. We maintain the counters corresponding to subgroup_ids in Python memory. We also keep the training set in-memory, but we can easily persist this to the DB.


### Exercise

We will create an instance of the `Task` class and register the accuracy metric.


In [4]:
def accuracy_udf(labels, predictions):
    """
    Returns the accuracy of the predictions.
    """
    return accuracy_score(labels, np.round(predictions))

def get_random_string(length):
    # choose from all lowercase letter
    letters = string.ascii_lowercase
    result_str = "".join(random.choice(letters) for i in range(length))
    return result_str

task = Task("taxi_tip_prediction")
task.register_metric("accuracy", accuracy_udf, window_sizes=[WINDOW_SIZE])

In [5]:
# Register training set

df = components.load_data(train_start_date, train_end_date)
clean_df = components.clean_data(df, train_start_date, train_end_date)
features_df = components.featurize_data(clean_df)
train_predictions, _ = components.inference(features_df, feature_columns, label_column)
task.register_training_set(
    train_predictions,
    feature_columns,
    label_column,
    "prediction",
)

https://scikit-learn.org/stable/modules/model_persistence.html#security-maintainability-limitations
https://scikit-learn.org/stable/modules/model_persistence.html#security-maintainability-limitations


## Create a Workload with Simulated Delays

We load all the inference data, clean, and run the model on it to generate predictions. Then we will create feedback / label delays by sampling from Exp(2 days).

In [6]:
df = components.load_data(inference_start_date, inference_end_date)
clean_df = components.clean_data(df, inference_start_date, inference_end_date)
features_df = components.featurize_data(clean_df)
inference_predictions, _ = components.inference(features_df, feature_columns, label_column)

inference_predictions[feature_columns] = inference_predictions[feature_columns].astype(float)
inference_predictions["features"] = inference_predictions[feature_columns].apply(
    lambda r: r.to_dict(), axis=1
)


https://scikit-learn.org/stable/modules/model_persistence.html#security-maintainability-limitations
https://scikit-learn.org/stable/modules/model_persistence.html#security-maintainability-limitations


In [7]:
# Create delay column and dataframe of predictions and feedbacks

pred_and_label_df = inference_predictions[
        ["features", "tpep_pickup_datetime", "prediction", label_column]
]
pred_and_label_df = pred_and_label_df.rename(
    columns={
        "tpep_pickup_datetime": "t_pred",
        "prediction": "y_pred",
        label_column: "y_true",
    }
)
pred_and_label_df = pred_and_label_df.assign(
    delay=pd.to_timedelta(
        np.random.exponential(scale=DELAY, size=len(pred_and_label_df)),
        unit="s",
    ),
)
pred_and_label_df = pred_and_label_df.assign(
    t_label=pred_and_label_df["t_pred"] + pred_and_label_df["delay"]
)

# Iterate through times and call log pred or label when necessary
pred_and_label_df["identifier"] = [
    get_random_string(ID_LEN) for _ in range(len(pred_and_label_df))
]
pred_df = pred_and_label_df[
    ["t_pred", "y_pred", "identifier", "features"]
].reset_index(drop=True)
label_df = pred_and_label_df[
    ["t_label", "y_true", "identifier", "features"]
].reset_index(drop=True)
pred_df["type"] = "prediction"
label_df["type"] = "label"
pred_df.rename(columns={"t_pred": "ts", "y_pred": "value"}, inplace=True)
label_df.rename(columns={"t_label": "ts", "y_true": "value"}, inplace=True)
all_logs = pd.concat([pred_df, label_df]).reset_index(drop=True)
all_logs.set_index("ts", inplace=True)

## Integrate `Task` with workload

Here, we will iterate through `all_logs` (predictions + simulated delay feedbacks/labels) and log predictions and feedbacks to our monitoring system. We will compute metrics every day.

In [8]:
metric_dfs = {}

task.clear()
for day, group_df in all_logs.groupby(pd.Grouper(freq="7D")):
    print(f"Processing day: {day}")
    preds = group_df[group_df["type"] == "prediction"].reset_index()
    feedbacks = group_df[group_df["type"] == "label"].reset_index()
    if len(preds) > 0:
        print(f"There are {len(preds)} predictions")
        for t, id, pred, f in zip(preds["ts"].values, preds["identifier"].values, preds["value"].values, preds["features"].values):
            task.log_prediction(t, id, pred, f)
        print(f"Predictions logged for {len(preds)} rows")
    if len(feedbacks) > 0:
        print(f"There are {len(feedbacks)} feedbacks")
        for t, id, feedback in zip(feedbacks["ts"].values, feedbacks["identifier"].values, feedbacks["value"].values):
            task.log_feedback(t, id, feedback)
        print(f"Feedbacks logged for {len(feedbacks)} rows")
    if len(preds) > 0 or len(feedbacks) > 0:
        metrics = task.get_metrics()
        metric_dfs[day] = (metrics)

Processing day: 2020-01-31 00:00:00
There are 128766 predictions


KeyboardInterrupt: 

## Inefficiencies

This is very slow! Mainly because we maintain O(# subgroups times # predictions made in 7 days) number of windows. Also, we don't log predictions/feedbacks in batch. How can we speed this up? 