# Drift detection streaming pipeline

Diese Pipeline berechnet laufend den Datendrift der generierten Mushroom Daten. Sie dient als Illustration einer ganz einfachen Stream Processing Timeline mit Windowing Funktion.

Für Anwendungsfälle, in denen es nicht zu erwarten ist, dass die Inferenzdaten sehr schnell driften, kann die Drift Detection natürlich auch als Batch Pipeline umgesetzt werden.

Was fehlt:
 * Joins
 * Data Contracts
 * Event Notification design (Metadaten, sauberes Naming)
 * Event Notification format (better use avro with schema or suchlike)
 * Tests
 * Feature Engineering - Diese Pipeline testet den Drift der Rohdaten

Aufbau:
* Der Datagen simuliert Prediction Request Events und schickt entsprechende Event Notifications an Kafka.
* In diesen Nachrichten sind die Features eines Prediction requests (bzw. die Rohdaten, da wir in unserem vereinfachten Modell keine features berechnen) enthalten
* Die Pipeline liest diese Notifications laufend von Kafka und sammelt alle Requests, bis eine akzeptable Menge zusammen hat
* Sie vergleicht dann die aktuelle, von Kafka erhaltenen Daten mit dem Referenzdatenset, welches für das Training des Mushroom-Modells verwendet wurde, und berechnet pro Feature eine Kennzahl, welche den Drift angibt
* Diese Kennzahlen senden sie an Statsd, wo sie von Prometheus gepollt werden, welcher die Daten als Zeitreihe speichert
* Am Ende der Kette pollt Grafana diese Daten von Prometheus, um sie in einem Dashboard darstellen zu können, um bei zu grossem Drift Alarm zu schlagen.

# Load reference dataset

Beachte, dass wir die Spalte mit dem Label gleich hier droppen.

In [1]:
import pandas as pd

reference_df = pd.read_parquet('s3://traindata/train_raw.parquet', storage_options={"anon": False}).drop("class", axis="columns")

# Drift calculation and pushing

Funktionen, um den Drift pro Spalte zu berechnen und an statsd zu melden

In [2]:
from evidently.report import Report
from evidently.metric_preset import DataDriftPreset
from evidently import ColumnMapping
from loguru import logger


def calculate_drift(df: pd.DataFrame, reference_df: pd.DataFrame,) -> tuple[tuple[str], tuple[float]]:

    logger.info(f"Received window of length {len(df)}")
    
    # the dataframe generated from json has an index of dtype str, which we replace by an index of ints,
    # or else evidently chokes
    df = df.reset_index(drop=True)

    # use a column mapping to make it easy for evidently to find the right metric
    column_mapping = ColumnMapping()
    column_mapping.categorical_features = ['cap-shape', 'gill-attachment', 'gill-color', 'stem-color']
    column_mapping.numerical_features = [c for c in df.columns if c not in column_mapping.categorical_features]

    # define and execute evidently standard drift report
    data_drift_dataset_report = Report(metrics=[DataDriftPreset()])
    data_drift_dataset_report.run(reference_data=reference_df, current_data=df, column_mapping=column_mapping)

    # extract a list of features and calculated drift metrics from report
    report_whole_output = data_drift_dataset_report.as_dict()
    report_just_drift = report_whole_output["metrics"][1]["result"]["drift_by_columns"]
    metrics_dict = {}
    for column_name, column_dict in report_just_drift.items():
        metrics_dict[column_name] = {k:v for k, v in column_dict.items() if k in ['stattest_name', 'drift_score']}
        metrics_dict[column_name]['stattest_name'] = metrics_dict[column_name]['stattest_name'].replace(' ', '_')
        metrics_dict[column_name]['drift_score'] = float(metrics_dict[column_name]['drift_score'])
    features, metrics = zip(*metrics_dict.items())
    
    return features, metrics

In [3]:
import statsd

def report_drift_to_statsd(df: pd.DataFrame, reference_df: pd.DataFrame, statsd_client: statsd.client.udp.StatsClient) -> None:
    
    # calculate metric per column
    features, metrics = calculate_drift(df, reference_df)

    # push to statsd
    prefix = f"drift_metrics.mushroom.v1"
    for f, m in zip(features, metrics):
        statsd_client.gauge(f"{prefix}.{f}.{m['stattest_name']}", m['drift_score'])

In [None]:
from quixstreams import Application

# create main quix object
app = Application(
    broker_address="message-broker:9092",
    # auto_offset_reset="earliest"
)

# define topic and message format
messages_topic = app.topic(name="mushroom_inference_request", value_deserializer="json")

# create a StreamingDataFrame
sdf = app.dataframe(topic=messages_topic)

from datetime import timedelta

def initializer(value: dict) -> dict:
    """
    Initialize the state for aggregation when a new window starts.

    It will prime the aggregation when the first record arrives 
    in the window.
    """
    
    # add a string index to the dict to get something like this
    # we need this second level when combining multiple rows in
    # the reducer, or else we just overwrite the same value
    # again and again
    """
    {0: {'cap-diameter': 3.0,
      'cap-shape': 3.0,
      'gill-attachment': 5.0,
      'gill-color': 2.0,
      'stem-height': 0.7591098099,
      'stem-width': 1397.0,
      'stem-color': 9.0,
      'season': 0.9545582517}}
    """

    return {str(k):v for k,v in enumerate([value])}


def reducer(aggregated: dict, value: dict) -> dict:
    """
    Called on every row but the first

    Reducer always receives two arguments:
    - previously aggregated value (the "aggregated" argument)
    - current value (the "value")
    It combines them into a new aggregated value and returns it.
    This aggregated value will be also returned as a q of the window.
    """

    # first add old and new (without their respective index)
    list_of_dicts = [value] + list(aggregated.values())
    
    # then readd an incremental index
    return {str(k):v for k,v in enumerate(list_of_dicts)}


statsd_client = statsd.StatsClient('statsd', 8125)

sdf = (
    # quix lacks the functionality to define a window of a fixed size, which would be appropriate here
    # so instead as a crutch, we use a tumbling window, which works but is a bit weird
    # don't make it too short, it should be long enough that reducer is called at least once
    sdf.tumbling_window(duration_ms=timedelta(seconds=10))

    # create a "reduce" aggregation with "reducer" and "initializer" functions
    # this is the quix way to define arbitrary aggregations (standard aggregations have their convenience functions)
    .reduce(reducer=reducer, initializer=initializer)

    # emit results only for closed windows
    .final()

    # now calculate drift statistics on full windows
    .apply(lambda m: report_drift_to_statsd(pd.DataFrame(m["value"]).T, reference_df, statsd_client))
)

app.run(sdf)

In [5]:
xxx


KeyboardInterrupt

