# Prediction Notebook

First insert the command line arguments as dbutils widget parameters.

In [0]:
# --- Data params ---
dbutils.widgets.text("dataset", "system_1")
dbutils.widgets.text("eval_start", "0")
dbutils.widgets.text("eval_end", "None")

# --- Predict params ---
dbutils.widgets.text("use_cuda", "True")
dbutils.widgets.text("threshold_type", "POT")
# If threshold_type is set to POT, these are the POT params
dbutils.widgets.text("use_mov_av", "False")
dbutils.widgets.text("q", "0.001")
dbutils.widgets.text("level", "0.99")
dbutils.widgets.text("dynamic_pot", "False")

Import the required modules.

In [0]:
from datetime import datetime
import numpy as np
import pandas as pd
import torch
import os

from architecture import MTAD_GAT
from model import Handler
from utils import str2bool, str2type
from utils import get_data, SlidingWindowDataset, create_data_loader, pot_threshold, get_run_id, json_to_numpy

import mlflow



Get the parameters' values and fix them to the correct type.

In [0]:
dataset = dbutils.widgets.get("dataset")
eval_start = int(dbutils.widgets.get("eval_start"))
eval_end = str2type(dbutils.widgets.get("eval_end"))

use_cuda = str2type(dbutils.widgets.get("use_cuda"))
threshold_type = dbutils.widgets.get("threshold_type")
use_mov_av = str2type(dbutils.widgets.get("use_mov_av"))
q = float(dbutils.widgets.get("q"))
level = float(dbutils.widgets.get("level"))
dynamic_pot = str2type(dbutils.widgets.get("dynamic_pot"))

Make sure the proper container (to draw data from) is mounted.

In [0]:
# Checking if mount already exists
mnts = dbutils.fs.mounts()
mnt_exists = False
for mount in mnts:
    if mount.mountPoint == "/mnt/datasets":
        mnt_exists = True

if mnt_exists == False:
    # Setup some parameters and keys
    account_name = "canopuslake"
    container = "datasets"

    client_secret = dbutils.secrets.get(scope="vault_scope", key="dbricks-to-lake-secret")
    client_id = dbutils.secrets.get(scope="vault_scope", key="dbricks-to-lake-client-ID")
    tenant_id = dbutils.secrets.get(scope="vault_scope", key="dbricks-to-lake-tenant-ID")

    # Define the connection configurations
    configs = {"fs.azure.account.auth.type": "OAuth",
          "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
          "fs.azure.account.oauth2.client.id": client_id,
          "fs.azure.account.oauth2.client.secret": client_secret,
          "fs.azure.account.oauth2.client.endpoint": f"https://login.microsoftonline.com/{tenant_id}/oauth2/token"}

    # Command to mount the blob storage container locally
    dbutils.fs.mount(
    source = f"abfss://{container}@{account_name}.dfs.core.windows.net/",
    mount_point = "/mnt/datasets",
    extra_configs = configs)
else:
    print("Mount already exists.")

Mount already exists.


Finally, run the inference script.

In [0]:
# Get custom id for every run
id = datetime.now().strftime("%d%m%Y_%H%M%S")

dataset = dataset

experiment = mlflow.set_experiment(experiment_name=f"/Experiments/{dataset}_inference")
exp_id = experiment.experiment_id

with mlflow.start_run(experiment_id=exp_id, run_name=id):

    # Get Production (or latest) model and run_id
    model_uri = f"models:/{dataset}_model/Production"
    try:
        model = mlflow.pytorch.load_model(model_uri)
        print(f"Fetched {dataset}_model from Production for predictions.")
        # get corresponding run_id
        client = mlflow.MlflowClient()
        run_id = client.get_latest_versions(f"{dataset}_model", stages=["Production"])[0].run_id
    except mlflow.exceptions.MlflowException:
        run_id = get_run_id("-1", f"/Experiments/{dataset}_training")
        model_uri = f"runs:/{run_id}/{dataset}_model"
        model = mlflow.pytorch.load_model(model_uri)
        print("No model found in Production stage, using model from latest run for predictions.")

    print(f"The model's run ID is: {run_id}")
    train_art_uri = mlflow.get_run(run_id).info.artifact_uri
    
    model_args = mlflow.artifacts.load_dict(train_art_uri+"/config.txt")

    window_size = model_args['window_size']

    # --------------------------- START PREDICTION -----------------------------
    # Get data from the dataset
    (x_new, _) = get_data(dataset, mode="new", start=eval_start, end=eval_end)

    # This workaround need sto happen internally at the moment
    # We must use the last window_size timestamps from training as the first window_size timestamps
    # for evaluation, due to the sliding window framework
    x_train, _ = get_data(dataset, mode="train", start=-window_size, end=None)
    x_new = np.concatenate((x_train, x_new), axis=0)

    # Cast data into tensor objects
    x_new = torch.from_numpy(x_new).float()
    n_features = x_new.shape[1]

    # We want to perform forecasting/reconstruction on all features
    out_dim = n_features

    # Construct dataset from tensor objects - no stride here
    new_dataset = SlidingWindowDataset(x_new, window_size)

    print("Predicting:")
    # Create the data loader - no shuffling here
    new_loader, _ = create_data_loader(new_dataset, model_args['batch_size'], None, False)

    # Initialize the Handler module
    handler = Handler(
        model=model,
        optimizer=None,
        scheduler=None,
        window_size=window_size,
        n_features=n_features,
        batch_size=model_args['batch_size'],
        n_epochs=None,
        patience=None,
        forecast_criterion=None,
        recon_criterion=None,
        use_cuda=use_cuda,
        print_every=None,
        gamma=model_args['gamma']
    )

    # Get new scores (inference needs to be fast, no details needed)
    print("Calculating scores on new data...")
    new_scores = handler.score(loader=new_loader, details=False)

    # Calculate threshold via POT based on the new_scores
    if threshold_type == "POT":
        # Load stored scores for training data
        print("Loading scores from data used for training...")
        train_scores = json_to_numpy(train_art_uri+"/anom_scores.json")
        
        if use_mov_av:
            smoothing_window = int(model_args['batch_size'] * window_size * 0.05)
            train_scores = pd.DataFrame(train_scores).ewm(span=smoothing_window).mean().values.flatten()
            new_scores = pd.DataFrame(new_scores).ewm(span=smoothing_window).mean().values.flatten()

        pot_thresh = pot_threshold(train_scores, new_scores, q=q, level=level, dynamic=dynamic_pot)

        # Log the POT threshold as part of this run, do not override anything from training/eval
        mlflow.log_dict({"POT": pot_thresh}, "thresholds.json")

        threshold = pot_thresh
    # Pick among the selected thresholds
    else:
        thresholds = mlflow.artifacts.load_dict(train_art_uri+"/thresholds.json")
        threshold = thresholds["epsilon"]

    print(f"Predicting anomalies based on {threshold_type}-generated threshold - threshold value: {threshold}")

    # Make predictions based on threshold
    anomalies = handler.predict(new_scores, threshold)
    
    # ---------------------------- END PREDICTION ------------------------------

    # save results
    with open('anomalies.txt', 'w') as f:
        for anom in anomalies:
            f.write(f"{anom}\n")
    mlflow.log_artifact('anomalies.txt')
    os.remove('anomalies.txt')

print("Finished.")

No model found in Production stage, using model from latest run for predictions.
The model's run ID is: 0dac450f8b2d4e5294070621318ae72b
Predicting:
The size of the dataset is: 989 sample(s).
Calculating scores on new data...


  0%|          | 0/4 [00:00<?, ?it/s] 25%|██▌       | 1/4 [00:00<00:02,  1.35it/s] 50%|█████     | 2/4 [00:00<00:00,  2.44it/s] 75%|███████▌  | 3/4 [00:01<00:00,  3.36it/s]100%|██████████| 4/4 [00:01<00:00,  4.22it/s]100%|██████████| 4/4 [00:01<00:00,  3.26it/s]


Loading scores from data used for training...
Running POT with q=0.001, level=0.99..
Initial threshold : 0.7201882004737854
Number of peaks : 283
Grimshaw maximum log-likelihood estimation ... [done]
	γ = -0.016250451967990354
	σ = 0.2116042329432291
	L = 161.10851155849537
Extreme quantile (probability = 0.001): 1.197853647367851


  0%|          | 0/989 [00:00<?, ?it/s]100%|██████████| 989/989 [00:00<00:00, 387787.85it/s]


Predicting anomalies based on POT-generated threshold - threshold value: 1.1978536473678507
Finished.
