### Use Antagonist to train a symptom detection model 

#### Reproducibility

In [3]:
# Torch
import torch
torch.manual_seed(0)
torch.use_deterministic_algorithms(True)

# Python
import random
random.seed(0)

# Numpy
import numpy as np
np.random.seed(0)

#### Dataset preparation

Note: the dataset needs to be downloaded using the script `download_SMD_dataset.sh` in the `scripts/antagonist-ml` folder.

In [4]:
from utils import SMD

In [5]:
db = SMD(dataset_folder=r"D:\antagonist\data\ServerMachineDataset")

In [6]:
dataframes_train, _ = db.read_dataset(group_name="Group 1", train=True, retrieve_labels=True)
dataframes, files = db.read_dataset(group_name="Group 1", train=False, retrieve_labels=True)


In [7]:
service_idx = 0
df,service_id = dataframes[service_idx], files[service_idx]
df,labels = df[df.columns[:-2].tolist()+['timestamp']], df[['label']]
df_train = dataframes_train[service_idx]
df_train = df_train[df_train.columns[:-1].tolist()+['timestamp']]

In [8]:
network_incidents = db.get_interpretation_labels(service_id)

#### Train model utils

In [9]:
import pandas as pd
from auto_encoder import Vanilla_AE
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import f1_score

In [10]:
n_inputs = df.shape[1] - 1
layer_sizes = [8, 4, 2]
lr = 0.005
batch_size = 32
epochs = 40
validation_split = 0.2
early_stopping = True
patience = 3
Q = 0.99  # residual cut

In [11]:
def train_model(df: pd.DataFrame):
    ae = Vanilla_AE(n_inputs=n_inputs, layer_sizes=layer_sizes)

    # Get data but the timestamp
    X_train = df.values[:, :-1]

    # scaler init and fitting
    scaler = StandardScaler()
    X_train_scaled = scaler.fit_transform(X_train)

    # model fitting
    ae.fit(
        X_train_scaled,
        early_stopping=early_stopping,
        validation_split=validation_split,
        epochs=epochs,
        lr=lr,
        batch_size=batch_size,
        verbose=0,
        shuffle=True,
        patience=patience,
        delta=0.001,
    )

    # results predicting
    residuals_train = (
        pd.DataFrame(X_train_scaled - ae.predict(X_train_scaled)).abs()
    )
    threshold = residuals_train.quantile(Q,axis=0) * 5 / 2

    return scaler, ae, threshold 

In [12]:
def find_consecutive_true_np(arr):
    result = []
    for i in range(arr.shape[1]):
        s = arr[:, i]
        m = np.r_[False, s, False]
        idx = np.flatnonzero(m[1:] != m[:-1])
        result.append(list(zip(idx[::2], idx[1::2])) )
    return result

def predict(df, scaler, ae, threshold, aggregate=False ):

    X_hat = scaler.transform(df.values[:,:-1])
    residuals_full_df = X_hat - ae.predict(X_hat)
    residuals_full_df = pd.DataFrame(residuals_full_df).abs()
    
    symptoms = (residuals_full_df > threshold).values

    return symptoms.any(axis=1) if aggregate else symptoms

In [13]:
def eval_model(df, labels,  scaler, ae, threshold ):

    X_hat = scaler.transform(df.values[:,:-1])
    residuals_full_df = X_hat - ae.predict(X_hat)
    residuals_full_df = pd.DataFrame(residuals_full_df).abs()
    residuals_full_df['outlier'] = (residuals_full_df > threshold).any(axis=1).astype(int).values

    return f1_score(labels['label'].values,residuals_full_df['outlier'].values, average='binary')

#### Iterative process simulation

Every day the model is retrained with the new data and new labels

In [14]:
import datetime
from collections import defaultdict

In [15]:
start_date = datetime.datetime.fromtimestamp(df['timestamp'].astype('int64').min()/10**9)
end_date = datetime.datetime.fromtimestamp(df['timestamp'].dt.ceil('D').astype('int64').max()/10**9)

In [16]:
scaler, ae, threshold = None, None, None
previous_day = None

symptoms_predictions = dict()

for current_day in pd.date_range(start=start_date, end=end_date, freq="D"):
    current_day = datetime.datetime.fromtimestamp(current_day.timestamp())
    df_today = df.loc[df["timestamp"] < current_day.ctime()]

    if df_today.shape[0] == 0:
        # first day
        continue

    # Predict symptoms for the current day
    if scaler is not None and previous_day is not None:
        df_pred = df.loc[
            (df["timestamp"] >= previous_day.ctime())
            & (df["timestamp"] < current_day.ctime())
        ]
        y_pred = predict(df_pred, scaler, ae, threshold, aggregate=False)

        intervals = find_consecutive_true_np(y_pred)
        for metric_id, symptoms in enumerate(intervals):
            if len(symptoms) > 0:
                symptoms_predictions[current_day] = defaultdict(list)
            for symp in symptoms:
                symptoms_predictions[current_day][metric_id].append(
                    [
                        df_pred["timestamp"].iloc[symp[0]].timestamp(),
                        df_pred["timestamp"].iloc[symp[1] - 1].timestamp(),
                    ]
                )

        break

    # Retrain the model on the available data removing anomalies (simulating human validation)
    mask_train = labels[: df_today.shape[0]]["label"] == 0
    df_today = pd.concat([df_train, df_today[mask_train]], ignore_index=True)
    scaler, ae, threshold = train_model(df_today)
    previous_day = current_day

Early stopping at epoch 7


In [17]:
# Evaluation on the overall test set
f1 = eval_model(df, labels, scaler, ae, threshold)
print(f"F1 score: {round(f1,4)}")

F1 score: 0.2942


### Convert to objects

In [18]:
# Aggregate symptoms in network incidents according to the OR rule
incidents_predictions = {}
for model_date, symptoms in symptoms_predictions.items():
    print(model_date.strftime('%Y-%m-%d'))

    # aggregate overlapping symptoms coming from different metrics
    day_symptoms = [
        (metric_id, symptom[0], symptom[1]) for metric_id, symptoms_list in symptoms.items() for symptom in symptoms_list 
    ]

    # sort by starting timestamp
    day_symptoms.sort(key=lambda x: x[1])

    if len(day_symptoms) == 0:
        continue


    # create a list of incident in the form [(start_timestamp, end_timestamp, [symptom1, symptom2]),...]
    start = day_symptoms[0][1] 
    end = day_symptoms[0][2]
    network_incidents = [(start, end, [day_symptoms[0]])]
    for symptom in day_symptoms[1:]:
        # if overlapping add to the current incident, new incident otherwise
        if symptom[1] <= end:
            network_incidents[-1][2].append(symptom)
            end = max(end, symptom[2])
            network_incidents[-1][1] = end
        else:
            start = symptom[1]
            end = symptom[2]
            network_incidents.append((start, end, [symptom]))

    incidents_predictions[model_date] = network_incidents

2020-01-03


In [20]:
import sys 
sys.path.append("..")
from antagonist_ml.service import store_network_anomalies_labels, store_network_symptom_labels

In [21]:
for model_date, network_incidents in incidents_predictions.items():
    for network_incident in network_incidents:

        ni_uuid = store_network_anomalies_labels(
            author_name="ae_continual_learning",
            author_type="algorithm",
            author_version=int(model_date.timestamp()),
            description=model_date.strftime("%Y-%m-%d"),
            start=network_incident[0],
            end=network_incident[1],
            state="incident-potential",
            version=1
        )


        for symptom in network_incident[2]:
            store_network_symptom_labels(
                author_name="ae_continual_learning",
                author_type="algorithm",
                author_version=int(model_date.timestamp()),
                confidence=1.0,
                description=symptom[0],
                start=symptom[1],
                end=symptom[2],
                version=1,
                tags={
                    "machine":service_id,
                    "metric_id":symptom[0]
                },
                network_anomaly_uuid=ni_uuid
            )

{"author": {"author_type": "algorithm", "name": "ae_continual_learning", "version": 1578009600}, "description": "2020-01-03", "start": "2020-01-02T00:45:00", "end": "2020-01-02T00:46:00", "id": "aaabe381-4b4f-4f02-94bc-ba683ccca5f6", "state": "incident-potential", "version": 1}
{"confidence-score": 1.0, "description": 33, "start-time": "2020-01-02T00:45:00", "end-time": "2020-01-02T00:46:00", "event-id": "cf6b733c-7cc1-400b-895d-dda3c2ff2849", "id": "fa9f7969-072e-4baa-966d-71ebab1cfc42", "source-name": "ae_continual_learning_1578009600", "source-type": "algorithm", "tags": {"machine": "machine-1-1.txt", "metric_id": 33}}
{"symptom-id": "fa9f7969-072e-4baa-966d-71ebab1cfc42", "incident-id": "aaabe381-4b4f-4f02-94bc-ba683ccca5f6"}
{"author": {"author_type": "algorithm", "name": "ae_continual_learning", "version": 1578009600}, "description": "2020-01-03", "start": "2020-01-02T00:50:00", "end": "2020-01-02T00:50:00", "id": "f2d1b180-dc79-4346-b844-161edfafb48b", "state": "incident-potenti