In [1]:
from typing import Tuple

import plotly.express as px
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import pandas as pd
from pathlib import Path

import numpy as np

from sklearn.metrics import (
    auc,
    confusion_matrix,
    precision_recall_curve,
    roc_auc_score,
    roc_curve,
)

import plotly.io as pio

_parameters to be used with papermill_

In [2]:
# cell has parameters tag for papermill execution
movinet_variation: str = "movineta0"
normalize_features: str = False

# PNG
# ---
# In case you want to export to png to use in report, set to "png".
# jq + base64 decode cli: 
#   `jq -r '.cells[X] | .outputs[Y].data."image/png"' | base64 -d > fileX.png`
# 
# Default
# ---
# Default is "notebook_connected" which is interactive and works while being connected 
# to the internet and works in a browser too
plotly_renderer: str = "notebook_connected"

In [3]:
pio.renderers.default = plotly_renderer
print(f"Using variation: {movinet_variation}")
print(f"Normalize features: {normalize_features}")

Using variation: movineta0
Normalize features: False


In [4]:
# Constants
SOURCE_PATH = Path().resolve() # this template folder

## Loading Ground Truth and Experiment Results

In [5]:
# Load the ground truth sequences
tp_fp_sequences_path = (
    SOURCE_PATH.parent.parent / "data" / "handcrafted-metadata" / "tp_fp_sequences.csv"
)
# the first column being the sequence file name: e.g. "SZTEA101a_00_05_37.mov"
SEQUENCES_DF = pd.read_csv(tp_fp_sequences_path, index_col=0)
# Only keep relevant columns
SEQUENCES_DF = SEQUENCES_DF[
    [
        "Classification",
        "Duration",
        "Distance",
        "SubjectApproachType",
        "SubjectDescription",
        "Distraction",
        "Stage",
    ]
]
# Prefix the index with the path to the sequence file
SEQUENCES_DF = SEQUENCES_DF.set_index("data/sequences/" + SEQUENCES_DF.index)

In [6]:
# Load pickle results
pickle_file = SOURCE_PATH / f"{movinet_variation}.pkl"
features_df = pd.read_pickle(pickle_file)

# MoViNet produce 600 features (the 600 classes of the Kinetics-600 dataset)
FEATURES_COLUMNS_INDEXES = pd.RangeIndex.from_range(range(600))

if normalize_features:
    features = features_df[FEATURES_COLUMNS_INDEXES].to_numpy()
    features_df[FEATURES_COLUMNS_INDEXES] = (features / np.linalg.norm(features, axis=-1, keepdims=True)).tolist()

df = SEQUENCES_DF.join(features_df)

df["Clip"] = df.index
df["Alarm"] = df["Classification"] == "TP"

# For each sample, get the highest feature/signal
df["Activation"] = df[FEATURES_COLUMNS_INDEXES].max(axis=1)

In [7]:
# extract y_true and y_score
y_true = df["Alarm"].to_numpy(dtype=np.int32)
y_score = df["Activation"].to_numpy()

## Activation distribution

Hoping to see that the _activation_ signal match between being an alarm, which
intuitively should give higher signals.

For _non alarms_, it is expected/wished to have lower signals.

In [8]:
fig = px.histogram(
    df,
    x="Activation",
    color="Alarm",
    marginal="rug",
    hover_name="Clip",
    nbins=50,
)
fig.show()

## ROC with AUC

$TPR = TP / (TP + FN)$

$FPR = FP / (FP + TN)$

Goal is to have a curve as high as possible to the top left corner to extend
the area under the curve.

The diagonal represent a random sampling.

To focus on the $FN$, which are more than not expected in an alarm monitoring system,
it can be read in the curve as by following the curve from the top right corner
and expecting the curve to remain at $TPR = 1$ as much as possible as $TPR = TP / (TP)$ 
with $FN = 0$. 
When read from right to left, any drop of the $TPR$ means having more $FN$ classifications
at the given threshold.

In [9]:
# roc data computations
fpr, tpr, thresholds = roc_curve(y_true, y_score)

roc_df =  pd.DataFrame(
    {
        "False Positive Rate": fpr,
        "True Positive Rate": tpr,
    },
    columns=pd.Index(["False Positive Rate", "True Positive Rate"], name="Rate"),
    index=pd.Index(thresholds, name="Thresholds"),
)

ROC_AUC = roc_auc_score(y_true, y_score)
print(f"ROC AUC: {ROC_AUC}")

ROC AUC: 0.611304012345679


In [10]:
fig = px.line(
    roc_df,
    x="False Positive Rate",
    y="True Positive Rate",
    title=f"{movinet_variation} ROC - AUC: {ROC_AUC:.3f}",
    color_discrete_sequence=["orange"],
    range_x=[0, 1],
    range_y=[0, 1],
    width=800,
    height=800,
).add_shape(type="line", line=dict(dash="dash"), x0=0, x1=1, y0=0, y1=1)

fig.update_yaxes(scaleanchor="x", scaleratio=1)
fig.update_xaxes(constrain='domain')

fig.show()

With the given ROC above, find the threshold with $FN = 0$ and plot the confusion matrix.

Hopefully, would have a the most $TP$ and $TN$.

In [11]:
# functions for plotting confusion matrix for following cells

def get_confusion_matrix_for(threshold: float) -> np.ndarray:
    """Return a vector of the form [TP, FN, FP, TN]
    
    -- personal note --
    I prefer giving it like read the confusion matrix from left to right, top to bottom
    """
    y_pred = (df["Activation"] >= threshold).to_numpy(np.int32)

    TN, FP, FN, TP = confusion_matrix(y_true, y_pred).ravel()

    return np.array([TP, FN, FP, TN])

def get_confusion_matrix_plot(confusion_matrix: np.ndarray) -> go.Heatmap:
    """Return a plotly Heatmap for the given confusion matrix
    
    args:
        confusion_matrix: a vector of the form [TP, FN, FP, TN] 
                          or alternatively a 2x2 matrix of the form [[TP, FN], [FP, TN]]
    """
    TP, FN, FP, TN = confusion_matrix.ravel()

    # plotly has a different order for the confusion matrix, so we need to reverse it
    Z = [[FP, TN], [TP, FN]]

    X_LABELS = ["Positive (Alarm)", "Negative"]
    Y_LABELS = ["Negative", "Positive (Alarm)"]

    return go.Heatmap(
        z=Z,
        x=X_LABELS,
        y=Y_LABELS,
        texttemplate="%{z}",
        coloraxis="coloraxis"
    )

In [12]:
# among the activation values, find the threshold where FN = 0
#   -> this means select the TP instances and then find the lowest activation value
min_activation_signal = (
    df[df["Alarm"]] # select TP instances
)["Activation"].min() # find the lowest activation value

min_signal_confusion_matrix = get_confusion_matrix_for(min_activation_signal)
TP, FN, FP, TN = min_signal_confusion_matrix.ravel()

print("min activation signal, TP, FN, FP, TN")
print(f"{min_activation_signal}, {TP}, {FN}, {FP}, {TN}")

min activation signal, TP, FN, FP, TN
2.9163434505462646, 432, 0, 432, 0


In [13]:
fig = go.Figure(data=get_confusion_matrix_plot(min_signal_confusion_matrix))

fig.update_layout(title=f"Confusion Matrix for threshold {min_activation_signal:.3f}")

fig.show()

## Precision-Recall Curve

Precision $= TP / (TP + FP)$

Recall $= TPR$

In [14]:
# PR data computations
precision, recall, thresholds = precision_recall_curve(y_true, y_score)

PR_AUC = auc(recall, precision)
print(f"PR AUC: {PR_AUC}")

PR AUC: 0.6084934687045715


In [15]:
fig = px.area(
    x=recall, 
    y=precision,
    title=f"{movinet_variation} Precision-Recall Curve - AUC: {ROC_AUC:.3f}",
    labels=dict(x='Recall', y='Precision'),
    range_x=[0, 1],
    range_y=[0, 1],
    width=800, 
    height=800,
).add_shape(type="line", line=dict(dash="dash"), x0=0, x1=1, y0=1, y1=0)

fig.update_yaxes(scaleanchor="x", scaleratio=1)
fig.update_xaxes(constrain='domain')

fig.show()

## Cost Function

TODO add details on construction of the cost matrix

In [16]:
# Methods to compute cost function related data

def get_cost_matrix(cost_ratio: float) -> np.ndarray:
    """Return a cost matrix of the form:
     
        [ [0                , cost_ratio], 
          [1 - cost_ratio   , 0]            ]
          
        Following the confusion matrix convention:
        
        [ [TP, FN],
          [FP, TN] ]
          
        TP & TN = 0, as         'great' got predicted correctly
        FN = cost_ratio, as     'worst case scenario' missing an alarm!
        FP = 1 - cost_ratio, as 'well too bad' false alarm but better be too careful than 
                                not enough
    """
    return np.array([[0, cost_ratio], [1 - cost_ratio, 0]])


def compute_cost_function(cost_matrix: np.ndarray, thresholds: np.ndarray) -> np.ndarray:
    """Return the cost function for each threshold"""
    nb_thresholds = len(thresholds)

    # tile "repeats" the cost_matrix nb_thresholds times
    tiled_cost_vector = np.tile(cost_matrix.ravel(), nb_thresholds)

    # get the confusion matrix for each threshold
    confusion_vector = np.array(
        [get_confusion_matrix_for(threshold) for threshold in thresholds]
    ).ravel()

    return (
        # multiple all i element of the confusion vector with the i element of the cost vector
        np.multiply(confusion_vector, tiled_cost_vector)
        # "pack" all results by threshold
        .reshape((nb_thresholds, -1))
        # sum all the results by threshold
        .sum(axis=1)
    )


def get_cost_function_for(cost_ratio: float) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
    """For the given cost ratio, return the cost function at each threshold, the minimum of the cost function and its index"""
    cost_function = compute_cost_function(get_cost_matrix(cost_ratio), thresholds)

    # get the global minimum of the cost function
    y_min_cost_function = cost_function.min(initial=None)
    y_min_arg = np.where(cost_function == y_min_cost_function)

    return cost_function, y_min_cost_function, y_min_arg

In [17]:
# Cost Function data preparation

# 0.0. declare couple of cost ratio between FN and FP (see #get_cost_matrix method doc)
COST_RATIOS = [0.99, 0.9, 0.8, 0.7, 0.5]

# 0.1. reverse the thresholds
#      As pr scikit-learn documentation, the returned thresholds are sorted in decreasing order
thresholds = thresholds[::-1]

In [18]:
# Plotting the cost function

def get_cost_function_traces(cost_ratio: float) -> Tuple[go.Scatter, go.Scatter]:
    """Return the cost function trace and the minimum trace for the given cost ratio"""
    cost_function, y_min_cost_function, y_min_arg = get_cost_function_for(cost_ratio)

    cost_function_trace = go.Scatter(
        x=thresholds, 
        y=cost_function,
        name=f"Cost(T) with Cost Ratio = {cost_ratio}",
        mode="lines+markers",
        marker=go.scatter.Marker(size=5),
    )
    minimum_trace = go.Scatter(
        x=thresholds[y_min_arg],
        y=[y_min_cost_function],
        name=f"Minimum with Cost Ratio = {cost_ratio}",
        text=f"Min Cost(T) = {y_min_cost_function:.3f}",
        marker=go.scatter.Marker(
            symbol="x-thin",
            line=go.scatter.marker.Line(width=4),
            size=14,
            color="orange",
        ),
    )

    return cost_function_trace, minimum_trace

In [19]:
fig = go.Figure()

for cost_ratio in COST_RATIOS:
    cost_function_trace, minimum_trace = get_cost_function_traces(cost_ratio)
    fig.add_trace(cost_function_trace)
    fig.add_trace(minimum_trace)

fig.update_layout(
    title=f"{movinet_variation} Cost(T) Functions", 
    xaxis_title="Threshold", 
    yaxis_title="Cost(T)"
)
fig.update_xaxes(type="log")

fig.show()

### Confusion Matrix at global minimum

Display the confusion matrix at the various global minimum of the cost function above.

In [20]:
# Confusion Matrix data preparation

# for each cost ratio, get the global minimum and based on its index, get the corresponding threshold
min_and_threshold_per_cost_ratio = {
    cost_ratio: (y_min_cost_function, thresholds[y_min_arg][0])
    for (_, y_min_cost_function, y_min_arg), cost_ratio in zip(map(get_cost_function_for, COST_RATIOS), COST_RATIOS)
}

In [21]:
fig = make_subplots(
    rows=len(COST_RATIOS), 
    cols=1, 
    subplot_titles=tuple(
        f"Cost(T = {threshold_min:.3f}) = {y_min:.3f} Function with Cost Ratio = {cost_ratio}"
        for cost_ratio, (y_min, threshold_min) in min_and_threshold_per_cost_ratio.items()
    ),
)

for i, (cost_ratio, (y_min, threshold_min)) in enumerate(min_and_threshold_per_cost_ratio.items()):
    fig.add_trace(
        get_confusion_matrix_plot(get_confusion_matrix_for(threshold_min)),
        row=i + 1,
        col=1,
    )

    # Update axises properties
    fig.update_yaxes(title_text="Real value", row=i + 1, col=1)
    fig.update_xaxes(title_text="Predicted value", row=i + 1, col=1)

# Share the coloraxis across all heatmaps
fig.update_layout(height=200 * len(COST_RATIOS), coloraxis = {'colorscale':'viridis'})

fig.show()