In [None]:
import pandas
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from tslearn.preprocessing import TimeSeriesResampler
import random

from oscillogram_classification.cam import gen_heatmap_dictionary, plot_heatmaps_as_overlay

In [None]:
def load_measurement(filename: str) -> pandas.DataFrame:
    df = pandas.read_csv(filename, delimiter=";", na_values=["-∞", "∞"])
    df = df[1:].apply(lambda x: x.str.replace(",", "."))
    df = df.astype(float).dropna()
    return df

def z_normalize_time_series(series):
    return (series - np.mean(series)) / np.std(series)

def plot_signals_with_channels(signals, colors, channel_titles, signal_titles, figsize):
    fig, axs = plt.subplots(len(signals), len(colors), figsize=figsize)
    for signal_idx, signal in enumerate(signals):
        for channel_idx, channel in enumerate(signal):
            axs[signal_idx, channel_idx].plot(channel, color=colors[channel_idx])
            if signal_idx == 0:
                axs[signal_idx, channel_idx].set_title(channel_titles[channel_idx])
            if channel_idx == 0:
                axs[signal_idx, channel_idx].set_ylabel(signal_titles[signal_idx])
    plt.tight_layout()
    # plt.savefig("data_vis.svg", format="svg", bbox_inches='tight')
    plt.show()
    
def resample(signals: np.ndarray, znorm: bool) -> np.ndarray:
    target_len = 500 # int(np.average([len(chan) for signal in signals for chan in signal ]))
    print("target len", target_len)
    for i in range(len(signals)):
        for j in range(len(signals[i])):
            sig_arr = np.array(signals[i][j])
            sig_arr = sig_arr.reshape((1, len(signals[i][j]), 1))  # n_ts, sz, d
            signals[i][j] = TimeSeriesResampler(sz=target_len).fit_transform(sig_arr).tolist()[0]
            
            # z-normalization
            if znorm:
                signals[i][j] = z_normalize_time_series(signals[i][j])
            
    return np.array(signals)

In [None]:
def gen_dataset_from_csv(csv_files, dir_path):
    signals = []
    labels = []
    signal_titles = []
    time_values = []
    for sample in csv_files:
        sig = load_measurement(dir_path + sample)
        signals.append([sig[channel_name] for channel_name in sig.columns.tolist() if not channel_name == "Zeit"])
        signal_titles.append(str(len(signals)) + "_" + sample)
        time_values.append([sig["Zeit"]])
        # 0 -> neg, 1 -> pos
        if "POS" in sample:
            labels.append(1)
        elif "NEG" in sample:
            labels.append(0)
    return signals, labels, signal_titles, time_values

import os
train_data = "../data/samples/Messwoche_06112023-10112023/train/"
test_data = "../data/samples/Messwoche_06112023-10112023/test/"
train_csv_files = [f for f in os.listdir(train_data) if f.endswith('.csv')]
test_csv_files = [f for f in os.listdir(test_data) if f.endswith('.csv')]

train_signals, train_labels, train_titles, train_time_values = gen_dataset_from_csv(train_csv_files, train_data)
test_signals, test_labels, test_titles, test_time_values = gen_dataset_from_csv(test_csv_files, test_data)

print("pos train:", train_labels.count(1), "neg train:", train_labels.count(0), "prop:", train_labels.count(1) / train_labels.count(0))
print("pos test:", test_labels.count(1), "neg test:", test_labels.count(0), "prop:", test_labels.count(1) / test_labels.count(0))

In [None]:
# downsampling

# True -> znorm
train_signals = resample(train_signals, True)
test_signals = resample(test_signals, True)
train_time_values = resample(train_time_values, True)
test_time_values = resample(test_time_values, True)

In [None]:
channel_titles = ["Kanal A - Heizung (Masse)", "Kanal B - Heizung (Plus)", "Kanal C - Signal (+)", "Kanal D - Signal (-)", 
                  "Kanal E - 5V", "Kanal F - Temperatur Signal", "Kanal G - Masse", "Kanal H - Druck Signal"] if \
    "Messwoche_KW16" in train_data else ['Lambdasonde', 'Luftmassenmesser', 'Differenzdrucksensor', 'Abgastemperatur', 'Nockenwellendrehzahlsensor']
colors = ['#1f77b4', '#ff7f0e', '#2ca02c', '#d62728', '#9467bd', '#fff700', '#00fbff', '#ff68d1']
colors = colors[:len(channel_titles)]
plot_signals_with_channels(train_signals, colors, channel_titles, train_titles, figsize=(20, 3 * len(train_signals)))

## Training with z-normalized data

In [None]:
import keras
import numpy as np
import matplotlib.pyplot as plt

import os
# deactivate tensorflow logs
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3' 
import tensorflow as tf

print("before:", train_signals.shape)
print("before:", test_signals.shape)

num_samples = train_signals.shape[0]
sample_len = train_signals.shape[2]
num_chan = train_signals.shape[1]

In [None]:
# multivariate - using all channels

# shape[0] samples, sample length shape[2], shape[1] channels
train_signals = train_signals.reshape(train_signals.shape[0], train_signals.shape[2], train_signals.shape[1])
test_signals = test_signals.reshape(test_signals.shape[0], test_signals.shape[2], test_signals.shape[1])

assert len(train_signals) == num_samples
assert len(train_signals[0]) == sample_len
assert len(train_signals[0][0]) == num_chan

In [None]:
# univariate - using only "Lambdasonde"
train_signals = train_signals[:, 0, :]
test_signals = test_signals[:, 0, :]

print(train_signals.shape)
plt.plot(train_signals[0])
plt.title("Lambdasonde")
plt.show()

In [None]:
# univariate - using only "Luftmassenmesser"
train_signals = train_signals[:, 1, :]
test_signals = test_signals[:, 1, :]

print(train_signals.shape)
plt.plot(train_signals[0])
plt.title("Luftmassenmesser")
plt.show()

In [None]:
# univariate - using only "Differenzdrucksensor"
train_signals = train_signals[:, 2, :]
test_signals = test_signals[:, 2, :]

print(train_signals.shape)
plt.plot(train_signals[0])
plt.title("Differenzdrucksensor")
plt.show()

In [None]:
# univariate - using only "Abgastemperatur"
train_signals = train_signals[:, 3, :]
test_signals = test_signals[:, 3, :]

print(train_signals.shape)
plt.plot(train_signals[0])
plt.title("Abgastemperatur")
plt.show()

In [None]:
# univariate - using only "Nockenwellendrehzahlsensor"
train_signals = train_signals[:, 4, :]
test_signals = test_signals[:, 4, :]

print(train_signals.shape)
plt.plot(train_signals[0])
plt.title("Nockenwellendrehzahlsensor")
plt.show()

In [None]:
print("after:", train_signals.shape)
print("after:", test_signals.shape)

num_classes = len(np.unique(train_labels))
train_labels = np.array(train_labels)
test_labels = np.array(test_labels)

## Build model

- FCN
- hyperparameters (`kernel_size, filters, usage of BatchNorm`) found using `KerasTuner`

In [None]:
def build_model(input_shape):
    input_layer = keras.layers.Input(input_shape)

    conv1 = keras.layers.Conv1D(filters=64, kernel_size=3, padding="same")(input_layer)
    conv1 = keras.layers.BatchNormalization()(conv1)
    conv1 = keras.layers.ReLU()(conv1)

    conv2 = keras.layers.Conv1D(filters=64, kernel_size=3, padding="same")(conv1)
    conv2 = keras.layers.BatchNormalization()(conv2)
    conv2 = keras.layers.ReLU()(conv2)

    conv3 = keras.layers.Conv1D(filters=64, kernel_size=3, padding="same")(conv2)
    conv3 = keras.layers.BatchNormalization()(conv3)
    conv3 = keras.layers.ReLU()(conv3)

    gap = keras.layers.GlobalAveragePooling1D()(conv3)

    output_layer = keras.layers.Dense(num_classes, activation="softmax")(gap)

    return keras.models.Model(inputs=input_layer, outputs=output_layer)


model = build_model(input_shape=train_signals.shape[1:])
keras.utils.plot_model(model, show_shapes=True)

In [None]:
from oscillogram_classification import models

model = models.create_resnet_model(input_shape=train_signals.shape[1:],num_classes=2)
keras.utils.plot_model(model, show_shapes=True)

In [None]:
# train model

# there should be no model, otherwise retraining!
assert not os.path.isfile("best_model.keras")

epochs = 500
batch_size = 32

callbacks = [
    keras.callbacks.ModelCheckpoint(
        "best_model.keras", save_best_only=True, monitor="val_loss"
    ),
    keras.callbacks.ReduceLROnPlateau(
        monitor="val_loss", factor=0.5, patience=20, min_lr=0.0001
    ),
    keras.callbacks.EarlyStopping(monitor="val_loss", patience=50, verbose=1)
]

model.compile(
    optimizer="adam",
    loss="sparse_categorical_crossentropy",
    metrics=["sparse_categorical_accuracy"],
)

history = model.fit(
    train_signals,
    train_labels,
    batch_size=batch_size,
    epochs=epochs,
    callbacks=callbacks,
    validation_split=0.2,
    verbose=1,
)

In [None]:
# eval model on test data

model = keras.models.load_model("best_model.keras")
test_loss, test_acc = model.evaluate(test_signals, test_labels)

print("test acc.:", test_acc)
print("test loss", test_loss)

In [None]:
# plot training and validation loss

metric = "sparse_categorical_accuracy"
plt.figure()
plt.plot(history.history[metric])
plt.plot(history.history["val_" + metric])
plt.title("model " + metric)
plt.ylabel(metric, fontsize="large")
plt.xlabel("epoch", fontsize="large")
plt.legend(["train", "val"], loc="best")
plt.show()
plt.close()

### Grad-CAM on univariate data

In [None]:
method = "tf-keras-gradcam++"

random_index = random.randint(0, len(test_signals)-1)
net_input = test_signals[random_index]
assert net_input.shape[1] == 1
ground_truth = test_labels[random_index]
prediction = model.predict(np.array([net_input]))
heatmaps = gen_heatmap_dictionary(method, np.array(net_input), model, prediction)

In [None]:
plot_heatmaps_as_overlay(heatmaps, net_input, 'test_plot', test_time_values.squeeze()[random_index].tolist())

## tsai Training

In [None]:
from tsai.all import *
import sklearn.metrics as skm
my_setup()

In [None]:
def generate_tsai_dataset(train_signals, train_labels):

    # randomly split the indices of the training samples into two sets (train (80%) and validation (20%))
    # 'splits' contains a tuple of lists ([train_indices], [validation_indices])
    #    - stratify=True -> split the data in such a way that each class's proportion in the train and validation
    #      datasets is approximately the same as the proportion in the original dataset
    #    - random_state is the seed
    splits = get_splits(train_labels, valid_size=.2, stratify=True, random_state=23, shuffle=True)
    print(splits)
    print("--> currently, the above plot wrongly labels 'Valid' as 'Test'")

    # define transformations:
    #    - None -> no transformation to the input (X)
    #    - Categorize() -> convert labels into categorical format; converts the labels to integers
    # my labels are already ints, but I'll leave it here as a more general case
    tfms  = [None, [Categorize()]]
    
    # creates tensors to train on, e.g., 
    #     dsets[0]: (TSTensor(vars:5, len:500, device=cpu, dtype=torch.float32), TensorCategory(0))
    dsets = TSDatasets(train_signals, train_labels, tfms=tfms, splits=splits, inplace=True)
    
    print("#train samples:", len(dsets.train))
    print("#valid samples:", len(dsets.valid))
    
    # data loaders: loading data in batches; batch size 64 for training and 128 for validation
    #    - TSStandardize: batch normalization
    #    - num_workers: 0 -> data loaded in main process
    dls = TSDataLoaders.from_dsets(
        dsets.train, dsets.valid, bs=[64, 128], batch_tfms=[TSStandardize()], num_workers=0
    )
    
    # vis a batch
    dls.show_batch(nrows=3, ncols=3, sharey=True)

    return dls

def train_tsai_model():
    # learner encapsulates the data, the model, and other details related to the training process
    learn = Learner(dls, model, metrics=accuracy)
    
    # saves curr state of learner (model + weights) to a file named stage0
    learn.save('stage0')
    
    # load state of model
    learn.load('stage0')
    
    # training over range of learning rates -- find suitable LR (or LR range)
    #   - learning rate range where the loss decreases most effectively
    learn.lr_find()
    
    # 150 -> num of epochs
    #    - involves varying the learning rate in a specific way during training
    #    - the cyclical nature helps in faster convergence, avoids getting stuck in local minima,
    #      and sometimes achieves better overall performance
    #    - it provides a balance between exploring the loss landscape (with higher learning rates)
    #    - and exploiting known good areas of the landscape (with lower learning rates)
    learn.fit_one_cycle(150, lr_max=1e-3)
    
    learn.save('stage1')
    return learn

# labeled test data

def test_tsai_model(test_signals, test_labels):

    test_ds = TSDatasets(test_signals, test_labels, tfms=[None, [Categorize()]])
    test_dl = dls.valid.new(test_ds)
    
    test_probas, test_targets, test_preds = learn.get_preds(
        dl=test_dl, with_decoded=True, save_preds=None, save_targs=None
    )

    return skm.accuracy_score(test_targets, test_preds)
    
    

In [None]:
# tsai expects the data in a diff format: (samples, variables, length)

# variables = 1 for univariate datasets and >1 for multivariate

train_signals = train_signals.reshape(train_signals.shape[0], train_signals.shape[2], train_signals.shape[1])
test_signals = test_signals.reshape(test_signals.shape[0], test_signals.shape[2], test_signals.shape[1])

print(train_signals.shape)
print(test_signals.shape)

In [None]:
dls = generate_tsai_dataset(train_signals, train_labels)

## Select Model

In [None]:
######################################################
### models trained on normalized version of RAW TS ###
######################################################

# creating InceptionTime (is a CNN) model (vars: 5 (5 channels), c: 2 (2 classes))
# model = InceptionTime(dls.vars, dls.c)

# TODO: InceptionTimePlus

# TODO: XceptionTime

# TODO: XceptionTimePlus

# TODO: OmniScaleCNN

# TODO: XCM

# creating XCMPlus
model = XCMPlus(dls.vars, dls.c, dls.len)

# create FCN (CNN model)
# model = FCN(dls.vars, dls.c)

# TODO: FCNPlus

# creating ResNet (CNN)
# model = ResNet(dls.vars, dls.c)

# TODO: ResNetPlus

# TODO: XResNet1d

# TODO: XResNet1dPlus

# TODO: ResCNN

# TODO: TCN

# creating RNN
# model = RNN(dls.vars, dls.c)

# creating RNNPlus (RNN model + including a feature extractor to the RNN network)
# model = RNNPlus(dls.vars, dls.c)

# TODO: RNNAttention

# creating GRU (RNN model)
# model = GRU(dls.vars, dls.c)

# creating GRUPlus (RNN model + including a feature extractor to the RNN network)
# model = GRUPlus(dls.vars, dls.c)

# creating GRUAttention (RNN model + attention)
#model = GRUAttention(dls.vars, dls.c, seq_len=500)

# creating LSTM (RNN model)
# model = LSTM(dls.vars, dls.c)

# creating LSTMPlus (RNN model + including a feature extractor to the RNN network)
# model = LSTMPlus(dls.vars, dls.c)

# creating LSTMAttention (RNN model + attention)
# model = LSTMAttention(dls.vars, dls.c, seq_len=500)

# model = TSSequencerPlus(dls.vars, dls.c, seq_len=500)

# model = TransformerModel(dls.vars, dls.c)

# TODO: TST

# TODO: TSTPlus

# TODO: TSPerceiver

# TODO: TSiT

# TODO: PatchTST

# TODO: ROCKETs category

# TODO: Wavelet-based NNs category

# TODO: Hybrid models category

# TODO: Tabular models category

#########################################
### models trained on feature vectors ###
#########################################

# TODO: extract + select features, i.e., generate feature vectors

# TODO: MLP

# TODO: gMLP

## Training

In [None]:
learn = train_tsai_model()

In [None]:
# losses -> loss development over all epochs for 'train' and 'valid'
# final losses ->  zoomed-in view of the final epochs, focusing on loss values towards the end of training
# accuracy -> validation accuracy of the model
learn.recorder.plot_metrics()

In [None]:
learn.save_all(path='export', dls_fname='dls', model_fname='model', learner_fname='learner')

In [None]:
learn.show_results(nrows=3, ncols=3)

In [None]:
learn.show_probas()

In [None]:
interp = ClassificationInterpretation.from_learner(learn)
interp.plot_confusion_matrix()

In [None]:
# intested in cases where the model made incorrect predictions at least 3 times
confusions = interp.most_confused(min_val=3)
for actual_class, pred_class, times in confusions:
    print("pred:", pred_class)
    print("actual:", actual_class)
    print(times, "times")

## Inference on additional test data

In [None]:
test_acc = test_tsai_model(test_signals, test_labels)
print("test accuracy:", test_acc)

## gradcam for XCMPlus

In [None]:
assert type(model) == tsai.models.XCMPlus.XCMPlus

xb, yb = dls.one_batch()
model.show_gradcam(xb[0], yb[0], figsize=(12,1))

In [None]:
# as the built-in gradcam method creates plots that are sometimes unreadable, it is better to visualize it with the
# methods from oscillogram_classification.cam

input, probabilities, targets, predictions = learn.get_X_preds(xb, yb, with_input = True)
predictions = predictions.strip('][').split(', ')

In [None]:
random_index = random.randint(0, len(xb)-1)

att_maps = get_attribution_map(model, [model.backbone.conv2dblock, model.backbone.conv1dblock], xb[random_index], detach=True, apply_relu=True)
att_maps[0] = (att_maps[0] - att_maps[0].min()) / (att_maps[0].max() - att_maps[0].min())
att_maps[1] = (att_maps[1] - att_maps[1].min()) / (att_maps[1].max() - att_maps[1].min())

print("Ground truth: ", int(yb[random_index]), " Prediction: ", predictions[random_index])

for i in range(input.vars):
    plot_heatmaps_as_overlay({"Variables attribution map":att_maps[0].numpy()[i]},  xb[random_index,i].numpy(), 'test_plot', range(len(xb[random_index,i])))
    plot_heatmaps_as_overlay({"Time attribution map":att_maps[1].numpy()[i]},  xb[random_index,i].numpy(), 'test_plot', range(len(xb[random_index,i])))

## Cross-validation for tsai training

In [None]:
k = 10

train_test_splits = get_splits(np.concatenate((train_labels, test_labels), axis=0), n_splits = k, valid_size=.2,  stratify=True, random_state=23, shuffle=True)

In [None]:
all_signals = np.concatenate((train_signals, test_signals), axis = 0)
all_labels = np.concatenate((train_labels, test_labels), axis = 0)

test_accuracies = []

for train_test_split in train_test_splits:

    train_split_signals = all_signals[train_test_split[0]]
    test_split_signals = all_signals[train_test_split[1]]
    train_split_labels = all_labels[train_test_split[0]]
    test_split_labels = all_labels[train_test_split[1]]

    # the training data will be further split into train and validation
    dls = generate_tsai_dataset(train_split_signals, train_split_labels)

    model = ResNet(dls.vars, dls.c)
    learn = train_tsai_model()
    learn.save_all(path='export', dls_fname='dls', model_fname='model', learner_fname='learner')
    test_acc = test_tsai_model(test_split_signals, test_split_labels)
    test_accuracies.append(test_acc)

In [None]:
print(test_accuracies)
print("Mean accuracy over all folds: ", np.mean(test_accuracies)) 