In this notebook, we will see how we can use UpTrain package to identify data drift and identify out of distribution cases for the same orientation classification example.

In [1]:
import sys
import os
import subprocess
import zipfile
import numpy as np
import uptrain
import sys
sys.path.insert(0,'..')

from model_files import input_to_dataset_transformation, read_json, write_json, KpsDataset
from model_files import body_length_signal, plot_all_cluster

import joblib
import json
import torch

Download dataset from remote

In [2]:
data_dir = "data"
remote_url = "https://oodles-dev-training-data.s3.amazonaws.com/data.zip"
orig_training_file = 'data/training_data.json'
if not os.path.exists(data_dir):
    try:
        # Most Linux distributions have Wget installed by default.
        # Below command is to install wget for MacOS
        wget_installed_ok = subprocess.call("brew install wget", shell=True, stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT)
        print("Successfully installed wget")
    except:
        dummy = 1
    try:
        if not os.path.exists("data.zip"):
            file_downloaded_ok = subprocess.call("wget " + remote_url, shell=True, stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT)
            print("Data downloaded")
        with zipfile.ZipFile("data.zip", 'r') as zip_ref:
            zip_ref.extractall("./")
        full_training_data = read_json(orig_training_file)
        np.random.seed(1)
        np.random.shuffle(full_training_data)
        reduced_training_data = full_training_data[0:1000]
        write_json(orig_training_file, reduced_training_data)
        print("Prepared Example Dataset")
        os.remove("data.zip")
    except Exception as e:
        print(e)
        print("Could not load training data")

In [3]:
real_world_test_cases = 'data/real_world_testing_data.json'
golden_testing_file = 'data/golden_testing_data.json'
annotation_args = {'master_file': 'data/master_annotation_data.json'}

inference_batch_size = 16

Next, we train our network using Deep Neural Network

In [4]:
from model_files import get_accuracy_torch, train_model_torch, BinaryClassification
train_model_torch('data/training_data.json', 'version_0')

Training on:  data/training_data.json  which has  1000  data-points
Trained model exists. Skipping training again.


Next, we get the model accuracy on testing dataset

In [5]:
get_accuracy_torch(golden_testing_file, 'version_0')

Evaluating on  15731  data-points


0.9184412942597419

Let's define the UpTrain config to identify data drifts. We define 3 checks here:

1. Data Drift for input features - keypoints: Keypoints is a 34-dimensional vector (x,y for 17 body joints). We will use Embedding based clustering to calculate Earth Moving Distance to identify if we see data distributions very different from the reference dataset (ie original training file). Additionally, it also collects the edge datapoints.

2. Data Drift for only the first feature - X co-ordinate of the Head keypoint. In addition to the overall input, we also want to see specific features in the input are drifting or not. To specify the same, we defined a complex measurable to extract scalar from the input keypoints embeddings.

3. Data Drift for Body Length: Many of the times, functions on the input features might be more meaningful to look as compared to the raw features. In this case, we can use the location of the keypoints to determine body length of the user using the UpTrain Signals and monitor if we see any shifts in it's distribution.

In [6]:
cfg = {
    "checks": [{
        'type': uptrain.Anomaly.DATA_DRIFT,
        'reference_dataset': orig_training_file,
        'is_embedding': True,
        'cluster_plot_func': plot_all_cluster,
    },
    {
        'type': uptrain.Anomaly.DATA_DRIFT,
        'reference_dataset': orig_training_file,
        "save_edge_cases": False,
        "measurable_args": {
            'type': uptrain.MeasurableType.SCALAR_FROM_EMBEDDING,
            'idx': 0,
            'extract_from': {
                'type': uptrain.MeasurableType.INPUT_FEATURE,
                'feature_name': 'kps'
            }
        },
    },
    {
        'type': uptrain.Anomaly.DATA_DRIFT,
        'reference_dataset': orig_training_file,
        'is_embedding': False,
        "save_edge_cases": False,
        "measurable_args": {
            'type': uptrain.MeasurableType.CUSTOM,
            'signal_formulae': uptrain.Signal("Body Length", body_length_signal),
        }
    }],
    "data_identifier": "id",
    "feat_name_list": ["kps"],

    # Connect training pipeline to annotate data and retrain the model
    "training_args": {
        "data_transformation_func": input_to_dataset_transformation,  
        "annotation_method": {"method": uptrain.AnnotationMethod.MASTER_FILE, "args": annotation_args}, 
        "training_func": train_model_torch, 
        "fold_name": "uptrain_smart_data__data_drift",
        "orig_training_file": orig_training_file,
        "cluster_plot_func": plot_all_cluster
    },

    # Retrain once 250 edge cases are collected
    "retrain_after": 100,

    # Connect evaluation pipeline to test retrained model against original model
    "evaluation_args": {
        "inference_func": get_accuracy_torch,
        "golden_testing_dataset": golden_testing_file,
        "metrics_to_check": ['accuracy']
    }

}

In [7]:
framework_torch = uptrain.Framework(cfg)

model_dir = 'trained_models_torch/'
model_save_name = 'version_0'
real_world_dataset = KpsDataset(
    real_world_test_cases, batch_size=inference_batch_size, is_test=True
)
model = BinaryClassification()
model.load_state_dict(torch.load(model_dir + model_save_name))
model.eval()

for i,elem in enumerate(real_world_dataset):

    # Do model prediction
    inputs = {"data": {"kps": elem[0]["kps"]}, "id": elem[0]["id"]}
    x_test = torch.tensor(inputs["data"]["kps"]).type(torch.float)
    test_logits = model(x_test).squeeze() 
    preds = torch.round(torch.sigmoid(test_logits)).detach().numpy()

    # Log model inputs and outputs to the uptrain Framework to monitor data drift
    idens = framework_torch.log(inputs=inputs, outputs=preds)

    # Retrain only once
    if framework_torch.version > 1:
        break

Deleting the folder:  uptrain_logs


In [8]:
!tensorboard --logdir uptrain_logs

TensorFlow installation not found - running with reduced feature set.
Serving TensorBoard on localhost; to expose to the network, use a proxy or pass --bind_all
TensorBoard 2.11.0 at http://localhost:6006/ (Press CTRL+C to quit)
^C
