In [None]:
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from tslearn.preprocessing import TimeSeriesResampler, TimeSeriesScalerMeanVariance
import random
from typing import List, Tuple

from oscillogram_classification.cam import gen_heatmap_dictionary, plot_heatmaps_as_overlay

In [None]:
def load_signals(path: str) -> Tuple[List[int], List[List[float]]]:
    # dataframe containing all signals from the dataset + labels in col 0
    df = pd.read_csv(path, delimiter='\t', header=None, na_values=['-∞', '∞'])
    labels = df.iloc[:, 0].tolist()
    samples = df.iloc[:, 1:].values.tolist()
    return labels, samples

def z_normalize_time_series(series: List[float]) -> np.ndarray:
    return (series - np.mean(series)) / np.std(series)

def plot_signals(signals: np.ndarray, figsize: Tuple[int, int]) -> None:
    fig, axs = plt.subplots(len(signals), figsize=figsize)
    for signal_idx, signal in enumerate(signals):
        axs[signal_idx].plot(signal)
        axs[signal_idx].set_title("sample " + str(signal_idx))
    plt.tight_layout()
    plt.savefig("data_vis.svg", format="svg", bbox_inches='tight')
    plt.show()
    
def resample(signals: np.ndarray, znorm: bool, target_len: int) -> np.ndarray:
    assert signals.shape == (362, 1250)
    print("target len", target_len)
    # reshape to (n_ts, sz, d)
    signals = signals.reshape((signals.shape[0], signals.shape[1], 1))
    # downsample
    signals_processed = TimeSeriesResampler(sz=target_len).fit_transform(signals)
    if znorm:
        # z-normalize each sample individually
        signals_processed = TimeSeriesScalerMeanVariance().fit_transform(signals_processed)
        
    assert signals_processed.shape == (362, target_len, 1)
    return signals_processed.squeeze(-1)

In [None]:
def save_signals(path: str, labels: List[int], samples: List[List[float]]):
    df = pd.DataFrame(samples)
    df.insert(0, "label", labels)  # insert labels as first col
    df.to_csv(path, sep='\t', header=False, index=False)

In [None]:
# creating a binary classification dataset for EOGVerticalSignal -- first basic approach (!)

train_data = "EOGVerticalSignal/EOGVerticalSignal_TRAIN.tsv"
train_labels, train_samples = load_signals(train_data)
train_labels = [i - 1 for i in train_labels]
# adjust the labels -- turn into binary classification problem
#   - subsume 0-10 as regular (0) and 11 as anomaly (1)
train_labels = [0 if label <= 10 else 1 for label in train_labels]
save_signals('EOGVerticalSignal/EOGVerticalSignal_TRAIN_BINARY.tsv', train_labels, train_samples)

test_data = "EOGVerticalSignal/EOGVerticalSignal_TEST.tsv"
test_labels, test_samples = load_signals(test_data)
test_labels = [i - 1 for i in test_labels]
# adjust the labels -- turn into binary classification problem
#   - subsume 0-10 as regular (0) and 11 as anomaly (1)
test_labels = [0 if label <= 10 else 1 for label in test_labels]
save_signals('EOGVerticalSignal/EOGVerticalSignal_TEST_BINARY.tsv', test_labels, test_samples)

In [None]:
# creating a binary classification dataset for EOGVerticalSignal -- improved label subsumption (!)

train_data = "EOGVerticalSignal/EOGVerticalSignal_TRAIN.tsv"
train_labels, train_samples = load_signals(train_data)
train_labels = [i - 1 for i in train_labels]
# adjust the labels -- turn into binary classification problem
#   - subsume 0-5 as regular (0) and 6-11 as anomaly (1)
train_labels = [0 if label <= 5 else 1 for label in train_labels]
save_signals('EOGVerticalSignal/EOGVerticalSignal_TRAIN_BINARY.tsv', train_labels, train_samples)

test_data = "EOGVerticalSignal/EOGVerticalSignal_TEST.tsv"
test_labels, test_samples = load_signals(test_data)
test_labels = [i - 1 for i in test_labels]
# adjust the labels -- turn into binary classification problem
#   - subsume 0-5 as regular (0) and 6-11 as anomaly (1)
test_labels = [0 if label <= 5 else 1 for label in test_labels]
save_signals('EOGVerticalSignal/EOGVerticalSignal_TEST_BINARY.tsv', test_labels, test_samples)

In [None]:
import os

# train_data = "Lightning7/Lightning7_TRAIN.tsv"
# test_data = "Lightning7/Lightning7_TEST.tsv"

#train_data = "Plane/Plane_TRAIN.tsv"
#test_data = "Plane/Plane_TEST.tsv"

#train_data = "MelbournePedestrian/MelbournePedestrian_TRAIN.tsv"
#test_data = "MelbournePedestrian/MelbournePedestrian_TEST.tsv"

#train_data = "SemgHandSubjectCh2/SemgHandSubjectCh2_TRAIN.tsv"
#test_data = "SemgHandSubjectCh2/SemgHandSubjectCh2_TEST.tsv"

#train_data = "EthanolLevel/EthanolLevel_TRAIN.tsv"
#test_data = "EthanolLevel/EthanolLevel_TEST.tsv"

#train_data = "EOGVerticalSignal/EOGVerticalSignal_TRAIN.tsv"
multiclass_test_data = "EOGVerticalSignal/EOGVerticalSignal_TEST.tsv"
train_data = "EOGVerticalSignal/EOGVerticalSignal_TRAIN_BINARY.tsv"
test_data = "EOGVerticalSignal/EOGVerticalSignal_TEST_BINARY.tsv"

original_labels, _ = load_signals(multiclass_test_data)
original_labels = [i - 1 for i in original_labels]
print("ori:", original_labels)

train_labels, train_signals = load_signals(train_data)
print("#samples (train):", len(train_signals))
print("sample len:", len(train_signals[0]))
print("#classes (train):", len(np.unique(train_labels)))

test_labels, test_signals = load_signals(test_data)
print("\n#samples (test):", len(test_signals))
print("sample len:", len(test_signals[0]))
print("#classes (test):", len(np.unique(test_labels)))

if not 0 in train_labels:
    train_labels = [i - 1 for i in train_labels]
    test_labels = [i - 1 for i in test_labels]

print(np.unique(train_labels))

print(
    "train\tclass 0:", train_labels.count(0),
    "\tclass 1:", train_labels.count(1),
    "\tclass 2:", train_labels.count(2),
    "\tclass 3:", train_labels.count(3),
    "\tclass 4:", train_labels.count(4),
    "\tclass 5:", train_labels.count(5),
    "\tclass 6:", train_labels.count(6),
)
print(
    "test\tclass 0:", test_labels.count(0),
    "\tclass 1:", test_labels.count(1),
    "\tclass 2:", test_labels.count(2),
    "\tclass 3:", test_labels.count(3),
    "\tclass 4:", test_labels.count(4),
    "\tclass 5:", test_labels.count(5),
    "\tclass 6:", test_labels.count(6),
)

In [None]:
# downsampling - True -> znorm

target_len = 1250 # 1250

train_signals = np.array(train_signals)
test_signals = np.array(test_signals)

assert train_signals.shape == (362, 1250)
assert test_signals.shape == (362, 1250)

train_signals = resample(train_signals, True, target_len)
test_signals = resample(test_signals, True, target_len)

print(train_signals.shape)
print(test_signals.shape)

In [None]:
# vis - one sample for each class

unique_labels = np.unique(train_labels)

one_sample_each = []
for label in unique_labels:
    for i in range(len(train_labels)):
        if train_labels[i] == label:
            one_sample_each.append(train_signals[i])
            break

plot_signals(one_sample_each, figsize=(10, len(one_sample_each)))

In [None]:
# vis - all samples for one class

label = 1
all_of_class = []
for i in range(len(train_labels)):
    if train_labels[i] == label:
        all_of_class.append(train_signals[i])

plot_signals(all_of_class, figsize=(10, len(all_of_class)))

In [None]:
# vis - "avg" or most representative sample for each class

from tslearn.barycenters import dtw_barycenter_averaging
from tslearn.metrics import cdist_dtw

unique_labels = np.unique(train_labels)

avg_sample_each = []
for label in unique_labels:
    curr_label_signals = []
    for i in range(len(train_labels)):
        if train_labels[i] == label:
            curr_label_signals.append(z_normalize_time_series(train_signals[i]))
    curr_label_signals = np.array(curr_label_signals)
    print(curr_label_signals.shape)

    # FIRST APPROACH
    # avg_sample_each.append(np.mean(curr_label_signals, axis=0))
    # --> not reasonable

    # SECOND APPROACH
    # tslearn expects shape (n_ts, sz, d)
    # data = curr_label_signals[:, :, np.newaxis]
    # dba_mean = dtw_barycenter_averaging(data).squeeze().tolist()
    # avg_sample_each.append(dba_mean)
    # --> not reasonable

    # THIRD APPROACH
    data = curr_label_signals[:, :, np.newaxis]
    # pairwise DTW dist matrix
    dtw_matrix = cdist_dtw(data)
    
    # sample with the smallest avg DTW distance to others
    medoid_idx = np.argmin(dtw_matrix.mean(axis=1))
    avg_sample_each.append(curr_label_signals[medoid_idx])

plot_signals(avg_sample_each, figsize=(10, len(avg_sample_each)))

## Training with z-normalized data

In [None]:
import keras

# deactivate tensorflow logs
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'
import tensorflow as tf

print("before:", train_signals.shape)
print("before:", test_signals.shape)

num_samples = train_signals.shape[0]
sample_len = train_signals.shape[1]

In [None]:
# univariate -- only if raW
print(train_signals.shape)

train_signals = train_signals[:, :, np.newaxis]
test_signals = test_signals[:, :, np.newaxis]

plt.plot(train_signals[0])
plt.title("First Sig")
plt.show()

In [None]:
print("after:", train_signals.shape)
print("after:", test_signals.shape)

num_classes = len(np.unique(train_labels))
train_labels = np.array(train_labels)
test_labels = np.array(test_labels)

## Build model

- FCN
- hyperparameters (`kernel_size, filters, usage of BatchNorm`) found using `KerasTuner`

In [None]:
def build_model(input_shape: np.ndarray) -> keras.models.Model:
    input_layer = keras.layers.Input(input_shape)

    conv1 = keras.layers.Conv1D(filters=64, kernel_size=3, padding="same")(input_layer)
    conv1 = keras.layers.BatchNormalization()(conv1)
    conv1 = keras.layers.ReLU()(conv1)

    conv2 = keras.layers.Conv1D(filters=64, kernel_size=3, padding="same")(conv1)
    conv2 = keras.layers.BatchNormalization()(conv2)
    conv2 = keras.layers.ReLU()(conv2)

    conv3 = keras.layers.Conv1D(filters=64, kernel_size=3, padding="same")(conv2)
    conv3 = keras.layers.BatchNormalization()(conv3)
    conv3 = keras.layers.ReLU()(conv3)

    gap = keras.layers.GlobalAveragePooling1D()(conv3)

    output_layer = keras.layers.Dense(num_classes, activation="softmax")(gap)

    return keras.models.Model(inputs=input_layer, outputs=output_layer)


model = build_model(input_shape=train_signals.shape[1:])
keras.utils.plot_model(model, show_shapes=True)

- predefined ResNet

In [None]:
from oscillogram_classification import models

model = models.create_resnet_model(input_shape=train_signals.shape[1:], num_classes=num_classes)
keras.utils.plot_model(model, show_shapes=True)

## Train model

In [None]:
# there should be no model, otherwise retraining!
assert not os.path.isfile("best_model.keras")

epochs = 500
batch_size = 32

callbacks = [
    keras.callbacks.ModelCheckpoint(
        "best_model.keras", save_best_only=True, monitor="val_loss"
    ),
    keras.callbacks.ReduceLROnPlateau(
        monitor="val_loss", factor=0.5, patience=20, min_lr=0.0001
    ),
    keras.callbacks.EarlyStopping(monitor="val_loss", patience=50, verbose=1)
]

model.compile(
    optimizer="adam",
    loss="sparse_categorical_crossentropy",
    metrics=["sparse_categorical_accuracy"],
)

history = model.fit(
    train_signals,
    train_labels,
    batch_size=batch_size,
    epochs=epochs,
    callbacks=callbacks,
    validation_split=0.2,
    verbose=1,
)

In [None]:
# eval model on test data

model = keras.models.load_model("best_model.keras")
test_loss, test_acc = model.evaluate(test_signals, test_labels)

print("test acc.:", test_acc)
print("test loss:", test_loss)

In [None]:
# plot training and validation loss

metric = "sparse_categorical_accuracy"
plt.figure()
plt.plot(history.history[metric])
plt.plot(history.history["val_" + metric])
plt.title("model " + metric)
plt.ylabel(metric, fontsize="large")
plt.xlabel("epoch", fontsize="large")
plt.legend(["train", "val"], loc="best")
plt.show()
plt.close()

### GradCAM++ on univariate data

In [None]:
method = "tf-keras-gradcam++"

random_index = random.randint(0, len(test_signals) - 1)
net_input = test_signals[random_index]
assert net_input.shape[1] == 1
ground_truth = test_labels[random_index]
prediction = model.predict(np.array([net_input]))
heatmaps = gen_heatmap_dictionary(method, np.array(net_input), model, prediction)

In [None]:
plot_heatmaps_as_overlay(heatmaps, net_input, 'test_plot', test_time_values.squeeze()[random_index].tolist())

## tsai training

In [None]:
from tsai.all import *
import sklearn.metrics as skm
my_setup()

In [None]:
def generate_tsai_dataset(train_signals: np.ndarray, train_labels: np.ndarray) -> TSDataLoaders:
    # randomly split the indices of the training samples into two sets (train (80%) and validation (20%))
    # 'splits' contains a tuple of lists ([train_indices], [validation_indices])
    #    - stratify=True -> split the data in such a way that each class's proportion in the train and validation
    #      datasets is approximately the same as the proportion in the original dataset
    #    - random_state is the seed
    splits = get_splits(train_labels, valid_size=.2, stratify=True, random_state=23, shuffle=True)
    print(splits)
    print("--> currently, the above plot wrongly labels 'Valid' as 'Test'")

    # define transformations:
    #    - None -> no transformation to the input (X)
    #    - Categorize() -> convert labels into categorical format; converts the labels to integers
    # my labels are already ints, but I'll leave it here as a more general case
    tfms  = [None, [Categorize()]]

    # creates tensors to train on, e.g.,
    #    dsets[0]: (TSTensor(vars:5, len:500, device=cpu, dtype=torch.float32), TensorCategory(0))
    dsets = TSDatasets(train_signals, train_labels, tfms=tfms, splits=splits, inplace=True)

    print("#train samples:", len(dsets.train))
    print("#valid samples:", len(dsets.valid))

    # data loaders: loading data in batches; batch size 64 for training and 128 for validation
    #    - TSStandardize: batch normalization
    #    - num_workers: 0 -> data loaded in main process
    dls = TSDataLoaders.from_dsets(
        dsets.train, dsets.valid, bs=[16, 32], batch_tfms=[TSStandardize()], num_workers=0
    )
    # vis a batch
    dls.show_batch(nrows=3, ncols=3, sharey=True)
    return dls

def train_tsai_model(dls: TSDataLoaders, model: XCM) -> Learner:
    # learner encapsulates the data, the model, and other details related to the training process
    # -- loss_func=CrossEntropyLossFlat()
    learn = Learner(dls, model, metrics=accuracy)

    # saves curr state of learner (model + weights) to a file named stage0
    learn.save('stage0')

    # load state of model
    learn.load('stage0')

    # training over range of learning rates -- find suitable LR (or LR range)
    #    - learning rate range where the loss decreases most effectively
    learn.lr_find()

    # 150 -> num of epochs
    #    - involves varying the learning rate in a specific way during training
    #    - the cyclical nature helps in faster convergence, avoids getting stuck in local minima,
    #      and sometimes achieves better overall performance
    #    - it provides a balance between exploring the loss landscape (with higher learning rates)
    #    - and exploiting known good areas of the landscape (with lower learning rates)
    learn.fit_one_cycle(300, lr_max=1e-3)

    learn.save('stage1')
    return learn

def test_tsai_model(test_signals: np.ndarray, test_labels: np.ndarray, learn: Learner) -> np.float64:
    # labeled test data
    test_ds = TSDatasets(test_signals, test_labels, tfms=[None, [Categorize()]])
    test_dl = dls.valid.new(test_ds)

    test_probas, test_targets, test_preds = learn.get_preds(
        dl=test_dl, with_decoded=True, save_preds=None, save_targs=None
    )    
    return skm.accuracy_score(test_targets, test_preds)

In [None]:
# tsai expects the data in a diff format: (samples, variables, length)

# variables = 1 for univariate datasets and >1 for multivariate

train_signals = train_signals.reshape(train_signals.shape[0], 1, train_signals.shape[1])
test_signals = test_signals.reshape(test_signals.shape[0], 1, test_signals.shape[1])

print(train_signals.shape)
print(test_signals.shape)

In [None]:
dls = generate_tsai_dataset(train_signals, train_labels)

## Select model

In [None]:
######################################################
### models trained on normalized version of RAW TS ###
######################################################

# creating InceptionTime (is a CNN) model (vars: 5 (5 channels), c: 2 (2 classes))
# model = InceptionTime(dls.vars, dls.c)

# TODO: InceptionTimePlus

# TODO: XceptionTime

# TODO: XceptionTimePlus

# TODO: OmniScaleCNN

# creating XCM
# eXplainable Convolutional neural network for Multivariate time series classification (XCM)
# model = XCM(dls.vars, dls.c, dls.len)

# creating XCMPlus
# eXplainable Convolutional neural network for Multivariate time series classification (XCM)
model = XCMPlus(dls.vars, dls.c, dls.len)

# creating FCN (CNN model)
# model = FCN(dls.vars, dls.c)

# TODO: FCNPlus

# creating ResNet (CNN)
# model = ResNet(dls.vars, dls.c)

# TODO: ResNetPlus

# TODO: XResNet1d

# TODO: XResNet1dPlus

# TODO: ResCNN

# TODO: TCN

# creating RNN
# model = RNN(dls.vars, dls.c)

# creating RNNPlus (RNN model + including a feature extractor to the RNN network)
# model = RNNPlus(dls.vars, dls.c)

# TODO: RNNAttention

# creating GRU (RNN model)
# model = GRU(dls.vars, dls.c)

# creating GRUPlus (RNN model + including a feature extractor to the RNN network)
# model = GRUPlus(dls.vars, dls.c)

# creating GRUAttention (RNN model + attention)
# model = GRUAttention(dls.vars, dls.c, seq_len=500)

# creating LSTM (RNN model)
# model = LSTM(dls.vars, dls.c)

# creating LSTMPlus (RNN model + including a feature extractor to the RNN network)
# model = LSTMPlus(dls.vars, dls.c)

# creating LSTMAttention (RNN model + attention)
# model = LSTMAttention(dls.vars, dls.c, seq_len=500)

# creating TSSequencerPlus
# model = TSSequencerPlus(dls.vars, dls.c, seq_len=500)

# creating TransformerModel
# model = TransformerModel(dls.vars, dls.c)

# TODO: TST

# TODO: TSTPlus

# TODO: TSPerceiver

# TODO: TSiT

# TODO: PatchTST

# TODO: ROCKETs category

# TODO: Wavelet-based NNs category

# TODO: Hybrid models category

# TODO: Tabular models category

#########################################
### models trained on feature vectors ###
#########################################

# TODO: extract + select features, i.e., generate feature vectors

# TODO: MLP

# TODO: gMLP

## Training

In [None]:
# there should be no model, otherwise retraining!
assert not os.path.isdir("export")
assert not os.path.isdir("models")

learn = train_tsai_model(dls, model)

In [None]:
# losses -> loss development over all epochs for 'train' and 'valid'
# final losses ->  zoomed-in view of the final epochs, focusing on loss values towards the end of training
# accuracy -> validation accuracy of the model
learn.recorder.plot_metrics()

In [None]:
learn.save_all(path='export', dls_fname='dls', model_fname='model', learner_fname='learner')

In [None]:
learn.show_results(nrows=3, ncols=3)

In [None]:
learn.show_probas()

In [None]:
interp = ClassificationInterpretation.from_learner(learn)
interp.plot_confusion_matrix()

In [None]:
# interested in cases where the model made incorrect predictions at least 3 times
confusions = interp.most_confused(min_val=3)
for actual_class, pred_class, times in confusions:
    print("pred:", pred_class)
    print("actual:", actual_class)
    print(times, "times")

## Inference on additional data

In [None]:
test_acc = test_tsai_model(test_signals, test_labels, learn)
print("test accuracy:", test_acc)

## GradCAM for XCM and XCMPlus

In [None]:
assert type(model) in [tsai.models.XCMPlus.XCMPlus, tsai.models.XCM.XCM]

xb, yb = dls.one_batch()

print(yb[0])
print(xb[0])

model.show_gradcam(xb[0], yb[0], figsize=(10, 3))

In [None]:
# as the built-in gradcam method creates plots that are sometimes unreadable, it is better to
# visualize it with the methods from oscillogram_classification.cam

# determine number of signals to be used for saliency map gen
num_test_samples = len(test_signals)

# convert test data to CUDA tensors
xb = torch.tensor(test_signals[:num_test_samples], dtype=torch.float32).to('cuda:0')
yb = torch.tensor(test_labels[:num_test_samples], dtype=torch.float32).to('cuda:0')

input_data, probabilities, targets, predictions = learn.get_X_preds(xb.cpu(), yb.cpu(), with_input=True)
predictions = predictions.strip('][').split(', ')

In [None]:
import math

def gen_heatmaps_overlay_side_by_side_new(cams: Dict, signals: np.ndarray, title: str, time_vals: List[float]) -> None:
    """
    Generates the class activation map (heatmap) side-by-side plot - time series as overlay.
    This time we actually have many-to-many, i.e., one heatmap for each signal.

    :param cams: dictionary containing the class activation maps to be visualized (+ method names)
    :param signals: list of signals
    :param title: window title, e.g., recorded vehicle component and classification result
    :param time_vals: time values to be visualized on the x-axis
    """
    cols = 8
    rows = math.ceil(len(cams) / float(cols))
    plt.rcParams["figure.figsize"] = cols * 5, rows * 3
    fig, axes = plt.subplots(nrows=rows, ncols=cols, sharex=True, sharey=True)
    
    fig.canvas.set_window_title(title)
    # bounding box in data coordinates that the image will fill (left, right, bottom, top)
    min_vals = []
    max_vals = []
    for sig in signals:
        min_vals.append(np.min(sig))
        max_vals.append(np.max(sig))
    extent = [0, time_vals[-1], np.floor(np.min(min_vals)), np.ceil(np.max(max_vals))]
    frequency = round(len(signals[0]) / time_vals[-1], 2)
    fig.text(0.5, -0.035, "time (s), %d Hz" % frequency, ha="center", va="center", fontsize=18)

    if len(cams) == 1 or rows == 1:
        axes = [axes]

    for j in range(rows):
        axes[j][0].set_ylabel("norm. voltage (V)", fontsize=18)
        for i in range(cols):
            axes[j][i].set_xlim(extent[0], extent[1])
            axes[j][i].title.set_text(list(cams.keys())[j * cols + i])
            axes[j][i].title.set_fontsize(18)
            axes[j][i].tick_params(axis='x', labelsize=12)
            axes[j][i].tick_params(axis='y', labelsize=12)

            # heatmap
            axes[j][i].imshow(
                cams[list(cams.keys())[j * cols + i]][np.newaxis, :], cmap="plasma", aspect="auto", alpha=.75, extent=extent
            )
            # data
            axes[j][i].plot(time_vals, signals[j * cols + i], '#000000')

            # i is not allowed to go off limits in the extra row
            if j * cols + i == len(cams) - 1:
                break
            
    plt.tight_layout(pad=1.4)
    fig.savefig(title + '.png', dpi=300, transparent=False)

## Gen Saliency Maps

In [None]:
# random_index = random.randint(0, len(xb) - 1)

all_attr_maps = {}

for idx in range(len(xb)):
    if type(model) == tsai.models.XCMPlus.XCMPlus:
        att_maps = get_attribution_map(
            model,
            [model.backbone.conv2dblock, model.backbone.conv1dblock],
            xb[idx],
            detach=True,
            apply_relu=True
        )
    else:  # XCM
        att_maps = get_attribution_map(
            model,
            [model.conv2dblock, model.conv1dblock],
            xb[idx],
            detach=True,
            apply_relu=True
        )
    att_maps[0] = (att_maps[0] - att_maps[0].min()) / (att_maps[0].max() - att_maps[0].min())
    att_maps[1] = (att_maps[1] - att_maps[1].min()) / (att_maps[1].max() - att_maps[1].min())

    all_attr_maps[idx] = att_maps
    print("Ground truth: ", int(yb[idx]), " Prediction: ", predictions[idx])

var_attr_maps = {"var. attr. map " + str(i) + "(gt: " + str(int(yb[i])) + ", pr: " + predictions[i] + ", or:" + str(original_labels[i]) + ")": all_attr_maps[i][0].cpu().numpy()[0] for i in range(len(xb))}
time_attr_maps = {"time attr. map " + str(i): all_attr_maps[i][1].cpu().numpy()[0] for i in range(len(xb))}

In [None]:
filtered_var_attr_maps = {}

original_indices_of_used_saliency_maps = []

# filter out the NaN heatmaps
for i, k in enumerate(var_attr_maps.keys()):
    if not math.isnan(var_attr_maps[k][0]):
        original_indices_of_used_saliency_maps.append(i)
        filtered_var_attr_maps[k] = var_attr_maps[k]

saliency_maps = []
gt_labels = []
# 1D saliency maps
for k in filtered_var_attr_maps.keys():
    saliency_maps.append(filtered_var_attr_maps[k])
    gt_labels.append(int(k.split(",")[-1].split(":")[-1].replace(")", "")))

saliency_maps = np.array(saliency_maps)
print("number of heatmaps to cluster:", len(saliency_maps))

In [None]:
from tslearn.clustering import TimeSeriesKMeans
from kneed import KneeLocator
import joblib

def determine_k_with_elbow(saliency_maps: np.ndarray) -> int:
    """
    Performs Euclidean k-means clustering with the provided saliency maps over a range of k values.
    The aim is to use the elbow method to find the optimal k.

    :param saliency_maps: saliency maps to cluster
    :return number of clusters (k)
    """
    inertias = []
    k_values = range(3, 30)
    
    for k in k_values:
        km = TimeSeriesKMeans(
            n_clusters=k,
            n_init=N_INIT,
            max_iter=MAX_ITER,
            verbose=False,
            random_state=SEED
        )
        km.fit(saliency_maps)
        inertias.append(km.inertia_)  # inertia = sum of distances to closest centroid

    knee = KneeLocator(k_values, inertias, curve='convex', direction='decreasing')
    optimal_k = knee.knee
    print(f"optimal number of clusters: {optimal_k}")
    
    plt.plot(k_values, inertias, marker='o')
    plt.xlabel("number of clusters")
    plt.ylabel("inertia")
    plt.title("Elbow Method")
    plt.show()
    return optimal_k

def perform_euclidean_k_means_clustering(
    saliency_maps: np.ndarray, k: int, gt_labels: np.ndarray, fig: Figure
) -> Tuple[Dict, List[List[float]]]:
    """
    Performs Euclidean k-means clustering with the provided saliency maps.

    :param saliency_maps: saliency maps to cluster
    :param k: number of clusters
    :param gt_labels: ground truth labels of the corresponding input signal
    :param fig: matplotlib figure to be extended
    :return (ground truth labels per cluster, cluster centroids)
    """
    print("Euclidean k-means")
    euclidean_km = TimeSeriesKMeans(
        n_clusters=k,
        n_init=N_INIT,
        max_iter=MAX_ITER,
        verbose=True,
        random_state=SEED
    )
    pred_labels = euclidean_km.fit_predict(saliency_maps)
    centroids = euclidean_km.cluster_centers_
    ground_truth = plot_results(1, "Euclidean $k$-means", euclidean_km, np.array(saliency_maps), gt_labels, pred_labels, fig, k)
    #joblib.dump((euclidean_km, pred_labels, ground_truth), 'trained_models/euclidean_km.pkl')  # save model
    return ground_truth, centroids

def perform_dba_k_means_clustering(
    saliency_maps: np.ndarray, k: int, gt_labels: np.ndarray, fig: Figure, model_target: str
) -> Tuple[Dict, List[List[float]], List[int]]:
    """
    Performs DBA k-means clustering with the provided saliency maps.

    :param saliency_maps: saliency maps to cluster
    :param k: number of clusters
    :param gt_labels: ground truth labels of the corresponding input signal
    :param fig: matplotlib figure to be extended
    :param model_target: type of input the model is applied to, i.e., 'saliency', 'input' or 'multivariate'
    :return (ground truth labels per cluster, cluster centroids, pred_labels)
    """
    print("DBA k-means")
    dba_km = TimeSeriesKMeans(
        n_clusters=k,
        n_init=N_INIT,
        max_iter=MAX_ITER,
        metric="dtw",
        verbose=False,
        max_iter_barycenter=MAX_ITER_BARYCENTER,
        random_state=SEED
    )
    pred_labels = dba_km.fit_predict(saliency_maps)
    centroids = dba_km.cluster_centers_
    ground_truth_per_cluster = plot_results(1 + k, "DBA $k$-means", dba_km, np.array(saliency_maps), gt_labels, pred_labels, fig, k)
    joblib.dump(
        (dba_km, pred_labels, ground_truth_per_cluster, centroids, multivariate_test_signals_filtered),
        'trained_models/dba_km_' + model_target + '.pkl'
    )  # save model to file
    return ground_truth_per_cluster, centroids, pred_labels

def perform_soft_dtw_k_means_clustering(
    saliency_maps: np.ndarray, k: int, gt_labels: np.ndarray, fig: Figure
) -> Tuple[Dict, List[List[float]]]:
    """
    Performs Soft-DTW k-means clustering with the provided saliency maps.

    :param saliency_maps: saliency maps to cluster
    :param k: number of clusters
    :param gt_labels: ground truth labels of the corresponding input signal
    :param fig: matplotlib figure to be extended
    :return (ground truth labels per cluster, cluster centroids)
    """
    print("Soft-DTW k-means")
    sdtw_km = TimeSeriesKMeans(
        n_clusters=k,
        n_init=N_INIT,
        max_iter=MAX_ITER,
        metric="softdtw",
        metric_params={"gamma": .01},
        verbose=True,
        max_iter_barycenter=MAX_ITER_BARYCENTER,
        random_state=SEED
    )
    pred_labels = sdtw_km.fit_predict(saliency_maps)
    centroids = sdtw_km.cluster_centers_
    ground_truth = plot_results(
        1 + 2 * k, "Soft-DTW $k$-means", sdtw_km, np.array(saliency_maps), gt_labels, pred_labels, fig, k
    )
    return ground_truth, centroids
    # joblib.dump((sdtw_km, pred_labels, ground_truth), 'trained_models/sdtw_km.pkl')  # save model to file

def plot_results(
        offset: int, title: str, clustering: TimeSeriesKMeans, saliency_maps: np.ndarray, gt_labels: np.ndarray,
        pred_labels: np.ndarray, fig: plt.Figure, k: int
) -> Dict:
    """
    Plots the results of the clustering procedure.

    :param offset: y-offset for the data to be displayed
    :param title: plot title
    :param clustering: clustering results
    :param saliency_maps: saliency maps that were clustered
    :param gt_labels: ground truth labels
    :param pred_labels: predictions (cluster assignments)
    :param fig: figure to add plot to
    :param k: number of clusters
    :return: ground truth dictionary
    """
    print("#########################################################################################")
    print("results for", title)
    print("#########################################################################################")
    ground_truth_dict = evaluate_performance(gt_labels, pred_labels, k)
    for y in range(k):
        ax = fig.add_subplot(3, k, y + offset)
        for x in saliency_maps[pred_labels == y]:
            # TODO: impl multivariate case
            ax.plot(x.ravel(), "k-", alpha=.2)
        ax.plot(clustering.cluster_centers_[y].ravel(), "r-")
        # ax.set_xlim(0, x_train.shape[1])
        # ax.set_ylim(6, 15)
        ax.text(0.55, 0.85, "Cluster %d" % y, transform=fig.gca().transAxes)
        if y == 0:
            plt.title(title)
    return ground_truth_dict

def evaluate_performance(ground_truth: np.ndarray, predictions: np.ndarray, k: int) -> Dict:
    """
    Evaluates the clustering performance.

    :param ground_truth: ground truth labels
    :param predictions: predicted labels (clusters)
    :param k: number of clusters
    :return: ground truth dictionary
    """
    cluster_dict = {i: 0 for i in range(k)}
    ground_truth_per_cluster = {i: [] for i in range(k)}

    for i in range(len(predictions)):
        cluster_dict[predictions[i]] += 1
        ground_truth_per_cluster[predictions[i]].append(ground_truth[i])

    # ideal would be (n, n, n, n, n) - equally distributed
    print("cluster distribution:", list(cluster_dict.values()))

    # each cluster should contain heatmaps with identical labels, you don't know which one, but it must be identical
    print("ground truth per cluster:")
    for val in ground_truth_per_cluster.values():
        print("\t-", val, "\n")
    return ground_truth_per_cluster

## Clustering Config

In [None]:
N_INIT = 20 # 50  # ensures stability; avoids bad local minima - default in many packages is ~10
MAX_ITER = 500  # more than sufficient for convergence in most cases
SEED = 42
MAX_ITER_BARYCENTER = 300 # 500  # might drop this to ~100–200 for short sequences

In [None]:
# loading already 'trained' model
import joblib

dba_km, pred_labels, gt_labels_per_cluster_multivariate, centroids_multivariate, multivariate_test_signals_filtered = joblib.load(
    'trained_models/dba_km_multivariate_RAW_LEN500_EQUALSUBSUMPTION.pkl'
)

## Clustering for Saliency Maps

In [None]:
k = determine_k_with_elbow(saliency_maps)

num_classes_ground_truth = len(np.unique(gt_labels))
NUM_CLASSES = num_classes_ground_truth
print("number of classes to cluster (ground truth):", num_classes_ground_truth)
print("determined k:", k)
cluster_fig = plt.figure(figsize=(5 * k, 8))

# e.g., (n_samples, 500)
assert saliency_maps.shape == (len(saliency_maps), 500)

# perform_euclidean_k_means_clustering(saliency_maps, k, gt_labels, cluster_fig)
gt_labels_per_cluster_saliency, centroids_saliency, pred_labels = perform_dba_k_means_clustering(
    saliency_maps, k, gt_labels, cluster_fig, "saliency"
)
# perform_soft_dtw_k_means_clustering(saliency_maps, k, gt_labels, cluster_fig)

cluster_fig.tight_layout()
plt.show()

## Clustering for Input Samples

In [None]:
test_signals_filtered = np.array([test_signals[i][0] for i in range(len(test_signals)) if i in original_indices_of_used_saliency_maps])
ori_labels_filtered = [original_labels[i] for i in range(len(original_labels)) if i in original_indices_of_used_saliency_maps]

k = determine_k_with_elbow(test_signals_filtered)

num_classes_ground_truth = len(np.unique(original_labels))
print("number of classes to cluster (ground truth):", num_classes_ground_truth)
print("determined k:", k)
cluster_fig = plt.figure(figsize=(5 * k, 8))

# e.g., (n_samples, 500)
assert test_signals_filtered.shape == (len(original_indices_of_used_saliency_maps), 500)

# perform_euclidean_k_means_clustering(test_signals_filtered, k, ori_labels_filtered, cluster_fig)
gt_labels_per_cluster_input, centroids_input, pred_labels = perform_dba_k_means_clustering(
    test_signals_filtered, k, ori_labels_filtered, cluster_fig, "input"
)
# perform_soft_dtw_k_means_clustering(test_signals_filtered, k, ori_labels_filtered, cluster_fig)

cluster_fig.tight_layout()
plt.show()

## Multivariate Clustering (Input Samples + Saliency Maps)

In [None]:
multivariate_test_signals_filtered = np.array(
    [[test_signals[i][0], list(var_attr_maps.values())[i]] for i in range(len(test_signals)) if i in original_indices_of_used_saliency_maps]
)
ori_labels_filtered = [original_labels[i] for i in range(len(original_labels)) if i in original_indices_of_used_saliency_maps]

print("shape before:", multivariate_test_signals_filtered.shape)
multivariate_test_signals_filtered = multivariate_test_signals_filtered.transpose(0, 2, 1)
print("shape after:", multivariate_test_signals_filtered.shape)

k = determine_k_with_elbow(multivariate_test_signals_filtered)

num_classes_ground_truth = len(np.unique(original_labels))
print("number of classes to cluster (ground truth):", num_classes_ground_truth)
print("determined k:", k)
cluster_fig = plt.figure(figsize=(5 * k, 8))

# multivariate, e.g., (n_samples, seq_len, n_vars)
assert multivariate_test_signals_filtered.shape == (len(original_indices_of_used_saliency_maps), 1250, 2)

# perform_euclidean_k_means_clustering(test_signals_filtered, k, ori_labels_filtered, cluster_fig)
gt_labels_per_cluster_multivariate, centroids_multivariate, pred_labels = perform_dba_k_means_clustering(
    multivariate_test_signals_filtered, k, ori_labels_filtered, cluster_fig, "multivariate"
)
# perform_soft_dtw_k_means_clustering(test_signals_filtered, k, ori_labels_filtered, cluster_fig)

cluster_fig.tight_layout()
plt.show()

## Plotting Centroids

In [None]:
print(len(centroids_multivariate))
print(centroids_multivariate.shape)

heatmaps_final = centroids_multivariate[:, :, 1]
signals_final = centroids_multivariate[:, :, 0]

assert len(heatmaps_final) == len(signals_final)

centroid_heats = {"centroid " + str(i): heatmaps_final[i] for i in range(len(heatmaps_final))}

gen_heatmaps_overlay_side_by_side_new(
    centroid_heats,
    signals_final,
    'var_attr',
    np.array(range(len(list(heatmaps_final)[0])))
)

## Quantitative Metrics -- Ground Truth Eval

### (for demonstrating the superiority of heatmap-aided clustering)

In [None]:
from sklearn.metrics import adjusted_rand_score, normalized_mutual_info_score, v_measure_score, confusion_matrix

def purity_score(y_true, y_pred):
    contingency_matrix = confusion_matrix(y_true, y_pred)
    return np.sum(np.max(contingency_matrix, axis=0)) / np.sum(contingency_matrix)

def quantitative_eval_clustering(gt_labels_per_cluster: List[List[int]]) -> Tuple[float, float, float, float]:
    """
    Quantitative evaluation of the clustering results.

    :param gt_labels_per_cluster ground truth labels of the samples assigned to each cluster, i.e., clustering results + gt label
    :return tuple of three metrics: (ari, nmi, v, purity)
    """
    true_labels = []
    cluster_assignments = []

    # e.g., clusters[
    #   [1, 1, 1, 2, 3, 4, 5, 5, 5],
    #   [2, 2, 2, 2, 3, 4, 2, 2, 2]
    # ]
    # -->
    # true labels: [1, 1, 1, 2, 3, 4, 5, 5, 5, 2, 2, 2, 2, 3, 4, 2, 2, 2]
    # cluster_ass: [0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1]

    for cluster_id, label_list in enumerate(gt_labels_per_cluster):
        for label in label_list:
            true_labels.append(label)
            cluster_assignments.append(cluster_id)

    assert len(true_labels) == len(cluster_assignments)

    # true_labels: List[int], e.g., [0, 0, 1, 3, 4, 4, 4, 5, 6, 2, 2, 2, 1]
    #     - this is the ground truth class of each sample
    # cluster_assignments: List[int], e.g., same
    #     - assigns an ID to each cluster and simply counts how many samples are assigned to it
    ari = adjusted_rand_score(true_labels, cluster_assignments)
    nmi = normalized_mutual_info_score(true_labels, cluster_assignments)
    v = v_measure_score(true_labels, cluster_assignments)
    purity = purity_score(true_labels, cluster_assignments)

    return ari, nmi, v, purity

## Quantitative Metrics -- Without Ground Truth

### (for demonstrating the actual process in practice)

In [None]:
from tslearn.metrics import dtw_path_from_metric
from scipy.stats import entropy
from itertools import combinations

SAMPLE_LEN = 1250

def intra_cluster_dtw_distances(clusters, centroids):  # works for multivariate
    intra_dists = []
    for i, cluster in enumerate(clusters):
        # default metric "euclidean"
        dists = [dtw_path_from_metric(ts, centroids[i]) for ts in cluster]
        dists = [d[1] for d in dists]
        intra_dists.append(round(np.median(dists), 2))
    return intra_dists

def inter_cluster_dtw_distances(centroids):  # works for multivariate
    distance_grid = [[0 for i in range(len(centroids))] for _ in range(len(centroids))]
    for c1, c2 in combinations(range(len(centroids)), 2):
        # default metric "euclidean"
        _, dist = dtw_path_from_metric(centroids[c1], centroids[c2])
        dist = round(dist, 2)
        distance_grid[c1][c2] = dist
        distance_grid[c2][c1] = dist
    return distance_grid

def dtw_silhouette_score(clusters):
    all_series = [ts for cluster in clusters for ts in cluster]
    labels = []
    for i, cluster in enumerate(clusters):
        labels.extend([i] * len(cluster))

    scores = []
    for idx, ts in enumerate(all_series):
        label = labels[idx]
        a = np.mean([dtw_path_from_metric(ts, other)[1] for other in clusters[label] if not np.array_equal(ts, other)])

        b = np.inf
        for j, cluster in enumerate(clusters):
            if j == label:
                continue
            dist = np.mean([dtw_path_from_metric(ts, other)[1] for other in cluster])
            b = min(b, dist)

        s = (b - a) / max(a, b) if max(a, b) != 0 else 0
        scores.append(s)

    return round(np.mean(scores), 2)

def cluster_size_entropy(clusters):
    sizes = np.array([len(c) for c in clusters])
    probs = sizes / np.sum(sizes)
    return round(entropy(probs), 2)

def total_variance(clusters):
    variances = []
    for i, cluster in enumerate(clusters):
        stacked = np.stack(cluster)
        # global scalar variance over all features and all samples — compact summary of cluster spread
        variances.append(round(np.mean(np.var(stacked)), 2))
    return variances

def cluster_variance_across_samples(clusters):
    variances = []
    for i, cluster in enumerate(clusters):
        cluster = np.array(cluster)
        assert cluster.shape == (len(clusters[i]), SAMPLE_LEN, 2)
        # for each time step and variable: how much do samples vary? i.e., cluster spread
        variances.append(round(np.mean(np.var(cluster, axis=0)), 2))
    return variances

def cluster_variance_across_time_steps(clusters):
    variances = []
    for i, cluster in enumerate(clusters):
        cluster = np.array(cluster)
        assert cluster.shape == (len(clusters[i]), SAMPLE_LEN, 2)
        # for each sample and variable: how much does the sample fluctuate over time?
        variances.append(round(np.mean(np.var(cluster, axis=1)), 2))
    return variances

def cluster_variance_across_variables(clusters):
    variances = []
    for i, cluster in enumerate(clusters):
        cluster = np.array(cluster)
        assert cluster.shape == (len(clusters[i]), SAMPLE_LEN, 2)
        # for each sample and time step: how different are the variables at that moment?
        variances.append(round(np.mean(np.var(cluster, axis=2)), 2))
    return variances

def intra_class_variance(clusters, centroids):
    variances = []
    for i, cluster in enumerate(clusters):
        cluster = np.array(cluster)
        assert cluster.shape == (len(clusters[i]), SAMPLE_LEN, 2)
        assert centroids[i].shape == (SAMPLE_LEN, 2)
        # squared distances of each signal to centroid
        squared_diffs = (cluster - centroids[i]) ** 2
        # sum over time and variables for each signal
        squared_dists = np.sum(squared_diffs, axis=(1, 2))
        # avg over all signals
        variances.append(round(np.mean(squared_dists), 2))
    return variances

def cov_mat(clusters):
    variances = []
    for i, cluster in enumerate(clusters):
        cluster = np.array(cluster)
        assert cluster.shape == (len(clusters[i]), SAMPLE_LEN, 2)
        X_flat = cluster.reshape(-1, 2)
        cov_matrix = np.cov(X_flat, rowvar=False)
        scalar_covariance = np.mean(cov_matrix)
        variances.append(round(scalar_covariance, 2))
    return variances


clusters = [[] for _ in range(len(centroids_multivariate))]
for i, label in enumerate(pred_labels):
    clusters[label].append(multivariate_test_signals_filtered[i])

intra = intra_cluster_dtw_distances(clusters, centroids_multivariate)
inter = inter_cluster_dtw_distances(centroids_multivariate)
sil_score = dtw_silhouette_score(clusters)
entropy_score = cluster_size_entropy(clusters)

tv = total_variance(clusters)
cvas = cluster_variance_across_samples(clusters)
cvat = cluster_variance_across_time_steps(clusters)
cvav = cluster_variance_across_variables(clusters)
icv = intra_class_variance(clusters, centroids_multivariate)
cm = cov_mat(clusters)

print("intra:", intra)
print("\ninter:", inter)
print("\nsil_score:", sil_score)
print("\nentropy:", entropy_score, "max possible:", math.log(len(centroids_multivariate), 2))
print("\ntotal_variance:", tv)
print("\ncluster_variance_across_samples:", cvas)
print("\ncluster_variance_across_time_steps:", cvat)
print("\ncluster_variance_across_variables:", cvav)
print("\nintra_class_variance:", icv)
print("\ncov_mat:", cm)

In [None]:
# filter things

# when using znorm data
#INTRA_MAX = 110
#VAR_MAX = 1

# when using raw data
INTRA_MAX = 10000
VAR_MAX = 25000

sizes = np.array([len(c) for c in clusters])
inter_means = [np.median(i) for i in inter]

# the smaller the values, the better
all_var = [tv[i] + icv[i] / 1000 + cvat[i] + cvav[i] + cvas[i] + cm[i] for i in range(len(tv))]

# the smaller this value, the better
intra_inter_frac = [round(intra[i] / inter_means[i], 2) for i in range(len(inter))]

# the smaller, the better
punishing_val = [(intra_inter_frac[i] + all_var[i]) for i in range(len(inter))]
print(punishing_val)

# thresholds
intra_thresh = np.percentile(intra, 50)  # keep clusters below median intra
var_thresh = np.percentile(tv, 50)  # keep clusters below median variance
size_thresh = 5  # avoid tiny clusters

selected = []
total_valuations = []
for i in range(len(intra)):
    each_inter_class_dist_larger_than_intra = True
    violation_cnt = 0
    for j in range(len(inter[i])):
        if i != j and inter[i][j] <= intra[i]:
            each_inter_class_dist_larger_than_intra = False
            violation_cnt += 1
            print("inter class distance violation (cluster " + str(i) + ") - inter:", inter[i][j], "intra:", intra[i])
    # relative
    # if intra[i] < intra_thresh and variances[i] < var_thresh and sizes[i] >= size_thresh:
    # absolute
    if intra[i] < INTRA_MAX and tv[i] < VAR_MAX and sizes[i] >= size_thresh and each_inter_class_dist_larger_than_intra and inter_means[i] > intra[i]:
        selected.append(i)

    total_valuations.append(round(all_var[i] * intra_inter_frac[i] + violation_cnt, 2))

print("selected clusters:", selected)
print("total valuations:", total_valuations)

filtered_clusters = [clusters[i] for i in selected]
filtered_centroids = [centroids_multivariate[i] for i in selected]

cluster_fig = plt.figure(figsize=(5 * len(selected), 8))

for idx, y in enumerate(selected):
    ax = cluster_fig.add_subplot(3, len(selected), idx + 1 + len(selected))
    cluster = np.array(clusters[y])
    print("cluster shape:", cluster.shape)

    # it is still multivariate (!)
    for i in range(cluster.shape[0]):  # iterate over multivariate samples in cluster
        ax.plot(cluster[i, :, 0], "r-", alpha=.2)  # signal
        ax.plot(cluster[i, :, 1], "b-", alpha=.2)  # heatmap

    from matplotlib.lines import Line2D
    legend_elements = [
        Line2D([0], [0], color='r', lw=2, label='signal'),
        Line2D([0], [0], color='b', lw=2, label='heatmap')
    ]
    ax.legend(handles=legend_elements, loc='lower right')
    ax.plot(centroids_multivariate[y, :, 0].ravel(), "r-")
    ax.plot(centroids_multivariate[y, :, 1].ravel(), "b-")
    ax.text(0.55, 0.85, "cluster %d" % y, transform=cluster_fig.gca().transAxes)
    if y == 0:
        plt.title("title")

plt.savefig("filtered_clusters.png")

centroid_heats_filtered = {"centroid " + str(i): heatmaps_final[i] for i in selected}

signals_final_filtered = [signals_final[i] for i in selected]

gen_heatmaps_overlay_side_by_side_new(
    centroid_heats_filtered,
    signals_final_filtered,
    'filtered_centroids',
    np.array(range(len(list(heatmaps_final)[0])))
)

## to be provided to LLM

centroid_only_fig = plt.figure(figsize=(5 * len(selected), 8))

for idx, y in enumerate(selected):
    ax = centroid_only_fig.add_subplot(3, len(selected), idx + 1 + len(selected))

    ax.plot(signals_final[y], "r-")
    ax.text(0.55, 0.85, "centroid %d" % y, transform=centroid_only_fig.gca().transAxes)

plt.savefig("centroids4llm.png")

## to be provided to LLM

filtered_centroid_signals = []
for i in selected:
    filtered_centroid_signals.append(signals_final[i])
np.save("centroids4llm.npy", np.array(filtered_centroid_signals))

In [None]:
## informative for own interpretation

print("filtered clusters:")
for i in selected:
    counts = np.bincount(gt_labels_per_cluster_multivariate[i])
    dominant_class = np.argmax(counts)
    print(i, "(dominant", dominant_class, "):", gt_labels_per_cluster_multivariate[i], "\n")

print("------------------------")
print("all clusters:")
for k in gt_labels_per_cluster_multivariate.keys():
    print(k, ":", gt_labels_per_cluster_multivariate[k], "\n")

## Eval the 3 Cases - Comparison

In [None]:
# ARI: 1.0 -> perfect match; 0.0 -> random labeling; < 0.0 -> worse than random
# NMI: 1.0 -> perfect correlation; 0.0 -> no mutual information
# V-Measure (a.k.a. conditional entropy): 1.0 -> perfect clustering; 0.0 -> worst case

ari, nmi, v, purity = quantitative_eval_clustering(list(gt_labels_per_cluster_saliency.values()))
print("for saliency maps:\t" + f"ARI: {ari:.3f}, NMI: {nmi:.3f}, V-measure: {v:.3f}, Purity: {purity:.3f}")

ari, nmi, v, purity = quantitative_eval_clustering(list(gt_labels_per_cluster_input.values()))
print("for input samples:\t" + f"ARI: {ari:.3f}, NMI: {nmi:.3f}, V-measure: {v:.3f}, Purity: {purity:.3f}")

# for both in combination, i.e., multivariate
ari, nmi, v, purity = quantitative_eval_clustering(list(gt_labels_per_cluster_multivariate.values()))
print("for multivariate:\t" + f"ARI: {ari:.3f}, NMI: {nmi:.3f}, V-measure: {v:.3f}, Purity: {purity:.3f}")

In [None]:
# example case

# assumed k is 2; assumed ground truth num of classes is 3
# num of samples: 6

test_clustering_res = [
    [0, 1, 2],
    [1, 0, 2]
]
print(test_clustering_res)

ari, nmi, v, purity = quantitative_eval_clustering(test_clustering_res)
print("for saliency maps:\t" + f"ARI: {ari:.3f}, NMI: {nmi:.3f}, V-measure: {v:.3f}, Purity: {purity:.3f}")

## Plot Saliency Maps

In [None]:
assert xb.cpu().numpy()[0].flatten().shape == var_attr_maps[list(var_attr_maps.keys())[0]].shape

gen_heatmaps_overlay_side_by_side_new(
    var_attr_maps,
    [xb.cpu().numpy()[i].flatten() for i in range(len(xb))],
    'var_attr',
    np.array(range(len(xb[0].cpu().numpy()[0])))
)
gen_heatmaps_overlay_side_by_side_new(
    time_attr_maps,
    [xb.cpu().numpy()[i].flatten() for i in range(len(xb))],
    'time_attr',
    np.array(range(len(xb[0].cpu().numpy()[0])))
)

## Cross-validation for tsai training

In [None]:
k = 10
train_test_splits = get_splits(
    np.concatenate((train_labels, test_labels), axis=0),
    n_splits=k,
    valid_size=.2,
    stratify=True,
    random_state=23,
    shuffle=True
)

In [None]:
all_signals = np.concatenate((train_signals, test_signals), axis=0)
all_labels = np.concatenate((train_labels, test_labels), axis=0)
test_accuracies = []

for train_test_split in train_test_splits:
    train_split_signals = all_signals[train_test_split[0]]
    test_split_signals = all_signals[train_test_split[1]]
    train_split_labels = all_labels[train_test_split[0]]
    test_split_labels = all_labels[train_test_split[1]]

    # the training data will be further split into train and validation
    dls = generate_tsai_dataset(train_split_signals, train_split_labels)

    model = TransformerModel(dls.vars, dls.c)
    learn = train_tsai_model(dls, model)
    learn.save_all(path='export', dls_fname='dls', model_fname='model', learner_fname='learner')
    test_acc = test_tsai_model(test_split_signals, test_split_labels, learn)
    test_accuracies.append(test_acc)

In [None]:
print(test_accuracies)
print("Mean accuracy over all folds: ", np.mean(test_accuracies))

## Load and apply already trained torch model

In [None]:
import torch

model_path = "../data/Lambdasonde.pth"
model = torch.load(model_path)
# ensure model is in evaluation mode
model.eval()

tensors = torch.from_numpy(test_signals).float()

# iterate over test signals
for idx in range(tensors.shape[0]):
    # assumes model outputs logits for a multi-class classification problem
    logits = model(tensors[[idx]])
    # convert logits to probabilities using softmax
    probabilities = torch.softmax(logits, dim=1)
    print(probabilities)
    first_class = float(probabilities[0][0])
    second_class = float(probabilities[0][1])

    if first_class < second_class:
        print("pred POS \t ground truth:", test_titles[idx])
    else:
        print("pred NEG \t ground truth:", test_titles[idx])