In [1]:
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt
import pandas as pd
import os

import pickle

from tmu.models.autoencoder.autoencoder import TMAutoEncoder
from src.lib.care import calculate_care_score

2025-04-01 20:55:03,286 - tmu.clause_bank.clause_bank_cuda - ERROR - No module named 'pycuda'
Traceback (most recent call last):
  File "/Users/kjellhaaland/Documents/GitHub/uia-master-thesis/.venv/lib/python3.12/site-packages/tmu/clause_bank/clause_bank_cuda.py", line 41, in <module>
    from pycuda._driver import Device, Context
ModuleNotFoundError: No module named 'pycuda'


In [15]:
bits = 3

In [16]:
# Create folder figures if it does not exist
os.makedirs("figures", exist_ok=True)

In [17]:
def load_test_dataset(farm, event_id):
    X = np.loadtxt(f"./data_test/X_{farm}_{event_id}.txt", dtype=np.uint32)
    X = np.array(X).astype(np.uint32)

    # Take the last 4000 rows
    X = X[-10000:]

    return X


def load_thresh_dataset(farm, event_id):
    X = np.loadtxt(f"./data_train/X_{farm}_{event_id}.txt", dtype=np.uint32)
    X = np.array(X).astype(np.uint32)

    # Take the first 5000 rows
    X = X[:10000]

    return X


def load_test_labels(farm, event_id):
    # Load dataframe from file
    df = pd.read_csv(f"./data_test/y_{farm}_{event_id}.csv")

    labels = df['label'].values
    status_ids = df['status_type_id'].values
    train_test = df['train_test'].values

    # Take the first 3000 rows
    labels = labels[-10000:]
    status_ids = status_ids[-10000:]
    train_test = train_test[-10000:]

    return np.array(labels).astype(np.uint32), np.array(status_ids).astype(np.uint32), train_test


def load_test_label(farm, event_id):
    event_info = pd.read_csv(f"../../../data/care_to_compare/Wind Farm {farm}/event_info.csv", delimiter=';')

    metadata = event_info[event_info['event_id'] == event_id]

    event_label = metadata["event_label"].values[0]

    return False if event_label == "anomaly" else True

In [18]:
def calculate_accuracy(labels, predictions):
    # Calculate the accuracy
    accuracy = np.sum(labels == predictions) / len(labels)

    return accuracy


def calculate_threshold(X, pred, percentile=95):
    losses = [huber_loss(X[i], pred[i]) for i in range(len(X))]

    # Set the threshold as the lowest 1% of the losses
    threshold = np.percentile(losses, percentile)

    return threshold


In [19]:
def load_model(filename) -> TMAutoEncoder:
    with open(filename, "rb") as f:
        model = pickle.load(f)

    return model

In [20]:
def hamming_loss(pred, X_test):
    """
    Computes the Hamming loss between predicted and ground truth binary arrays.

    Parameters:
    - pred (numpy array): Binary predictions of shape (n_samples, n_bits).
    - X_test (numpy array): Ground truth binary values of shape (n_samples, n_bits).

    Returns:
    - float: Hamming loss (fraction of incorrect bits).
    """
    assert pred.shape == X_test.shape, "Shapes of pred and X_test must match"

    # Compute the number of differing bits
    incorrect_bits = np.sum(pred != X_test)

    # Total number of bits
    total_bits = np.prod(X_test.shape)

    # Hamming loss is the fraction of incorrect bits
    return incorrect_bits / total_bits


def binary_to_decimal(arr, bit_length):
    # Split the array into chunks of bit_length
    numbers = [int("".join(map(str, arr[i:i + bit_length])), 2) for i in range(0, len(arr), bit_length)]
    return numbers


def mse_loss(pred, X_test):
    # Reconstruct the original values (5 bits)
    p = binary_to_decimal(pred, bits)
    x = binary_to_decimal(X_test, bits)

    # Compute the MSE
    mse = np.mean((np.array(p) - np.array(x)) ** 2)
    return mse


def huber_loss(pred, X_test, delta=1.0):
    # Reconstruct the original values (5 bits)
    p = binary_to_decimal(pred, bits)
    x = binary_to_decimal(X_test, bits)

    # Compute the Huber loss
    loss = np.where(np.abs(np.array(p) - np.array(x)) < delta, 0.5 * ((np.array(p) - np.array(x)) ** 2),
                    delta * (np.abs(np.array(p) - np.array(x)) - 0.5 * delta))

    return np.mean(loss)


def kl_divergence(pred, X_test):
    # Reconstruct the original values (5 bits)
    p = binary_to_decimal(pred, bits)
    x = binary_to_decimal(X_test, bits)

    # Compute the KL divergence
    kl = np.sum(np.array(x) * np.log(np.array(x) / np.array(p)))
    return kl


In [21]:
def reconstruction_accuracy(X, pred):
    correct = np.sum(X == pred)
    accuracy = correct / len(X)
    return accuracy


def plot_mse(X, y, pred, name, threshold):
    # Compute MSE for each row
    mse_per_row = [huber_loss(X[i], pred[i]) for i in range(len(X))]

    # Plot SNS plot of all MSE values
    plt.figure(figsize=(8, 4))
    sns.histplot(mse_per_row, bins=50, kde=True, color='b')

    # Add a threshold line
    plt.axvline(threshold, color='r', linestyle='--')

    # Save the plot
    plt.savefig(f"./figures/plot_reconstruction_acc_{name}.png")

    plt.close()


def plot_predictions(X, y, z, pred, p, name, threshold):
    x = np.arange(0, len(X))  # Time or index
    r = [huber_loss(X[i], pred[i]) for i in range(len(X))]

    anomalies = np.array([1 if (p[i] == 1 and z[i] == 0) else 0 for i in range(len(X))])

    y_mapped = np.where(y == 0, -0.2, -0.1)
    a_mapped = np.where(anomalies == 0, -0.4, -0.3)
    p_mapped = np.where(p == 0, -0.6, -0.5)
    z_mapped = np.where(np.logical_or(z == 0, z == 2), -0.8, -0.7)

    # Create a figure with two subplots (1 row, 2 columns)
    fig, axes = plt.subplots(2, 1, figsize=(12, 10))

    # First chart
    # If z is 0 or 2, then the color is orange, if y[i] is 1, then the color is red, otherwise blue
    colors = ['red' if y[i] == 1 else 'blue' if z[i] == 0 or z[i] == 2 else 'orange' for i in range(len(z))]

    axes[0].scatter(x, r, label="Reconstruction Loss", c=colors, alpha=0.7)
    axes[0].axhline(threshold, color='red', linestyle='--', label="Threshold")

    axes[0].set_title("Reconstruction Loss")
    axes[0].set_xlabel("Time")
    axes[0].set_ylabel("Reconstruction Loss")
    axes[0].legend()

    # Second chart
    axes[1].plot(x, y_mapped, label="Actual Anomalies (y)", color='red', linestyle='-', linewidth=2)
    axes[1].plot(x, a_mapped, label="Detected Anomalies (a)", color='purple', linestyle='-', linewidth=2)
    axes[1].plot(x, p_mapped, label="Predicted Anomalies (p)", color='green', linestyle='-', linewidth=2)
    axes[1].plot(x, z_mapped, label="Status Type Id (z)", color='orange', linestyle='-', linewidth=2)

    axes[1].axhline(y=-0.1, color='black', linestyle='dotted', linewidth=1, alpha=0.5)
    axes[1].axhline(y=-0.2, color='black', linestyle='dotted', linewidth=1, alpha=0.5)
    axes[1].axhline(y=-0.3, color='black', linestyle='dotted', linewidth=1, alpha=0.5)
    axes[1].axhline(y=-0.4, color='black', linestyle='dotted', linewidth=1, alpha=0.5)
    axes[1].axhline(y=-0.5, color='black', linestyle='dotted', linewidth=1, alpha=0.5)
    axes[1].axhline(y=-0.6, color='black', linestyle='dotted', linewidth=1, alpha=0.5)
    axes[1].axhline(y=-0.7, color='black', linestyle='dotted', linewidth=1, alpha=0.5)
    axes[1].axhline(y=-0.8, color='black', linestyle='dotted', linewidth=1, alpha=0.5)

    axes[1].set_ylim(-0.9, 0)

    axes[1].set_title("Anomalies")
    axes[1].legend()

    # Adjust layout
    plt.tight_layout()
    plt.grid(True)

    # plt.show()

    # Save the plot
    plt.savefig(f"./figures/plot_detections_{name}.png")

    plt.close()


def get_predictions(X, pred, y, z, name, threshold):
    # For each row in pred, if the MSE is greater than the threshold, then it is an anomaly
    losses = [huber_loss(X[i], pred[i]) for i in range(len(X))]

    X_predictions = np.array([1 if losses[i] > threshold else 0 for i in range(len(X))])

    # Accuracy
    accuracy = calculate_accuracy(y, X_predictions)

    plot_predictions(X, y, z, pred, X_predictions, name, threshold)

    return X_predictions, accuracy

In [25]:
farm = "B"
test_datasets = [34, 7, ]  #53, 74, 86, 82, 27, 19, 77, 83, 52, 21, 2, 23, 87, ]

tm_autoencoder = load_model("models/latest_14.pkl")

# Create a dataframe with status_type_id;label;prediction
reconstructions = []

for dataset in test_datasets:
    X = load_test_dataset(farm, dataset)

    pred = tm_autoencoder.predict(X)
    reconstructions.append({'farm': farm, 'dataset': dataset, 'X': X, 'pred': pred})

    print(f"Done with {dataset}")

Done with 34
Done with 7


In [26]:
def run_prediction(farm, dataset, X, pred):
    labels, status_ids, train_test = load_test_labels(farm, dataset)
    is_normal = load_test_label(farm, dataset)

    predictions, accuracy = get_predictions(X, pred, labels, status_ids, f"{farm}_{dataset}", threshold)

    return labels, status_ids, train_test, is_normal, predictions, accuracy

In [28]:
X_thresh = load_thresh_dataset("B", 83)
X_thresh_pred = tm_autoencoder.predict(X_thresh)

threshold = calculate_threshold(X_thresh, X_thresh_pred)

print(f"Threshold: {threshold}")

# Create a dataframe with status_type_id;label;prediction
elements = []

for element in reconstructions:
    res = run_prediction(element['farm'], element['dataset'], element['X'], element['pred'])

    result_df = pd.DataFrame({
        'status_type_id': res[1],
        'label': res[0],
        'prediction': res[4],
        'train_test': res[2],
    })

    print(f"Done with {set}. Accuracy: {res[5]}")

    elements.append({'dataset': set, 'normal': res[3], 'data': result_df, 'accuracy': res[5]})

Threshold: 0.17261904761904762
Done with 87. Accuracy: 0.6695
Done with 87. Accuracy: 0.4732


In [29]:
# Safe the results to results.pkl
with open("results_full_99.pkl", "wb") as f:
    pickle.dump(elements, f)

In [30]:
score = calculate_care_score(elements)
print(score)

0
