# Viam Anomaly Detector Jupyter Training Script

In [None]:
# Install required packages

!pip3 install -r requirements.txt

In [None]:
############################################################################################################
# Training script for an Isolation Forest model using data from app.viam.com
############################################################################################################

import numpy as np
from onnx import ModelProto

import pandas as pd

from skl2onnx.common.data_types import FloatTensorType
from skl2onnx import to_onnx
from sklearn.ensemble import IsolationForest

import os
import bson
from dotenv import load_dotenv
from viam.app.viam_client import ViamClient
from viam.rpc.dial import DialOptions
import logging



In [None]:
# Set up / config
load_dotenv()
model_name = "isolation_forest.onnx"
data_file = "data.csv"


In [None]:
############################################################################################################
# Get data from app.viam.com
############################################################################################################


async def connect() -> ViamClient:
    """Connect to the VIAM cloud api"""
    dial_options = DialOptions.with_api_key(
        api_key=os.environ.get("API_KEY"), api_key_id=os.environ.get("API_KEY_ID")
    )
    return await ViamClient.create_from_dial_options(dial_options)


async def download(limit: int = 10000):
    """Download the training data"""
    logging.info("Downloading data...")
    viam_client = await connect()
    data_client = viam_client.data_client

    # Query the data set using MongoDB Query Language (MQL)
    tabular_data = await data_client.tabular_data_by_mql(
        organization_id="96b696a0-51b9-403b-ae0d-63753923652f",  # os.getenv("ORGANIZATION_ID"),
        # The MQL aggregation pipeline extract and preprocess the data
        # TODO: Set this to suit your data! https://www.mongodb.com/docs/manual/core/aggregation-pipeline/
        mql_binary=[
            bson.dumps(
                {
                    "$match": {
                        "component_name": "fake-sensor",
                    }
                }
            ),
            bson.dumps(
                {
                    "$project": {
                        "timestamp": {"$dateToString": {"date": "$time_received"}},
                        "value": "$data.readings.a",
                    }
                }
            ),
            bson.dumps({"$limit": limit}),
        ],
    )
    df = pd.DataFrame(tabular_data)
    # TODO: timestamp conversion shouldn't be necessary once the Viam SDK date type in the MQL API is fixed
    # Also remove above in the MQL pipeline
    df["timestamp"] = pd.to_datetime(df["timestamp"])
    viam_client.close()
    logging.info(f"{len(df)} reading(s) downloaded")
    return df


In [None]:
############################################################################################################
# Save to CSV
############################################################################################################

def save_to_csv(tabular_data):
    df = pd.DataFrame(tabular_data)
    df.to_csv(data_file, index=False)

In [None]:
############################################################################################################
# Load from CSV
############################################################################################################

def loadCSV():
    print("Loading CSV...")
    df = pd.read_csv(data_file, parse_dates=["timestamp"])
    print("CSV Loaded")
    return df

In [None]:
############################################################################################################
# Feature Engineering
############################################################################################################


def featureEng(df: pd.DataFrame):
    logging.info("Feature Engineering...")
    # A variety of resamples which I may or may not use
    # TODO: Push down to the backend -> Agg. pipeline
    df_sampled = df.set_index("timestamp").resample("5min").mean().reset_index()
    # df_sampled = df.set_index("timestamp").resample("h").mean().reset_index()
    # df_sampled = df.set_index("timestamp").resample("D").mean().reset_index()
    # df_sampled = df.set_index("timestamp").resample("W").mean().reset_index()

    # Calculate the rolling mean and lag (weekdays are not used in the model)
    for DataFrame in [df_sampled]:
        DataFrame["Weekday"] = pd.Categorical(
            DataFrame["timestamp"].dt.strftime("%A"),
            categories=[
                "Monday",
                "Tuesday",
                "Wednesday",
                "Thursday",
                "Friday",
                "Saturday",
                "Sunday",
            ],
        )
        DataFrame["Hour"] = DataFrame["timestamp"].dt.hour
        DataFrame["Day"] = DataFrame["timestamp"].dt.weekday
        DataFrame["Month"] = DataFrame["timestamp"].dt.month
        DataFrame["Year"] = DataFrame["timestamp"].dt.year
        DataFrame["Month_day"] = DataFrame["timestamp"].dt.day
        DataFrame["Lag"] = DataFrame["value"].shift(1)
        DataFrame["Rolling_Mean"] = DataFrame["value"].rolling(7, min_periods=1).mean()
        DataFrame = DataFrame.dropna()
    df_sampled.dropna(inplace=True)
    logging.info("Feature Engineering Completed")
    return df_sampled



In [None]:

############################################################################################################
# Model Training
############################################################################################################


def fit_isolation_forest(
    model_data: pd.DataFrame,
) -> ModelProto:
    logging.info("Fitting Isolation Forest...")
    model_data = (
        model_data[
            [
                "value",
                "Hour",
                "Day",
                "Month_day",
                "Month",
                "Rolling_Mean",
                "Lag",
                "timestamp",
            ]
        ]
        .set_index("timestamp")
        .dropna()
    )

    IF = IsolationForest(
        # Parameter tuning:
        # https://scikit-learn.org/stable/modules/generated/sklearn.ensemble.IsolationForest.html#sklearn.ensemble.IsolationForest
        random_state=0,
        contamination="auto",
        n_estimators=100,
        max_samples="auto",
    )

    IF.fit(model_data)

    onx = to_onnx(
        IF,
        model_data.to_numpy().astype(np.float32),
        target_opset={"": 15, "ai.onnx.ml": 3},
        initial_types=[("observation", FloatTensorType([None, 7]))],
        final_types=[
            ("label", FloatTensorType([None, 1])),
            ("scores", FloatTensorType([None, 1])),
        ],
    )
    return onx


In [None]:
############################################################################################################
# Model inference for validation purposes
############################################################################################################

# https://onnxruntime.ai/docs/api/python/tutorial.html

from onnxruntime import InferenceSession, get_available_providers

def inference(df: pd.DataFrame):
    print("Inference...")
    model_data = (
        df[
            [
                "value",
                "Hour",
                "Day",
                "Month_day",
                "Month",
                "Rolling_Mean",
                "Lag",
                "timestamp",
            ]
        ]
        .set_index("timestamp")
        .dropna()
    )

    ort_sess = InferenceSession(
        model_name,
        providers=get_available_providers(),
    )

    nparray = model_data.to_numpy(dtype=np.float32)

    input_name = ort_sess.get_inputs()[0].name
    output_name = ort_sess.get_outputs()[0].name
    print("Input Name:", input_name)
    print("Output Name:", output_name)

    pred_onx = ort_sess.run(None, {input_name: nparray})

    inference = pd.DataFrame(
        np.column_stack([pred_onx[0], pred_onx[1]]), columns=["outlier", "score"]
    )
    print("Inference Completed")
    return inference

In [None]:
# Download the data
training_data = await download()
print(training_data)

In [None]:
# Feature Engineering
df = featureEng(training_data)

In [None]:
# Train the model
onx = fit_isolation_forest(df)

In [None]:
# Save the model to disk
with open(model_name, "wb") as f:
    f.write(onx.SerializeToString())

In [None]:
# Run inference on the training data

inference(df)