In [None]:
from google.colab import drive
drive.mount('/content/drive')

# Data Loading

In [None]:
import pandas as pd

df1 = pd.read_csv('/content/drive/MyDrive/Colab Notebooks/THESIS/data_oct9/1.csv')
df2 = pd.read_csv('/content/drive/MyDrive/Colab Notebooks/THESIS/data_oct9/1.csv')
df3 = pd.read_csv('/content/drive/MyDrive/Colab Notebooks/THESIS/data_oct9/3.csv')
df4 = pd.read_csv('/content/drive/MyDrive/Colab Notebooks/THESIS/data_oct9/4.csv')
df5 = pd.read_csv('/content/drive/MyDrive/Colab Notebooks/THESIS/data_oct9/5.csv')
df6 = pd.read_csv('/content/drive/MyDrive/Colab Notebooks/THESIS/data_oct9/6.csv')
df7 = pd.read_csv('/content/drive/MyDrive/Colab Notebooks/THESIS/data_oct9/7.csv')
df8 = pd.read_csv('/content/drive/MyDrive/Colab Notebooks/THESIS/data_oct9/8.csv')
df9 = pd.read_csv('/content/drive/MyDrive/Colab Notebooks/THESIS/data_oct9/9.csv')
df10 = pd.read_csv('/content/drive/MyDrive/Colab Notebooks/THESIS/data_oct9/10.csv')

df_validation = pd.read_csv('/content/drive/MyDrive/Colab Notebooks/THESIS/data_oct9/validation.csv')
df_test = pd.read_csv('/content/drive/MyDrive/Colab Notebooks/THESIS/data_oct9/test.csv')

## Data preparation


In [None]:
# outliers removal

df4 = df4.drop(range(3500, 5500))
df6 = df6.drop(range(47000, len(df6)))
df9 = df9[(df9['contacts'])==0]

# Data Splitting


In [None]:
df_train = pd.concat([df1, df2, df3, df4, df5, df6, df7, df8, df9, df10], ignore_index=True)

print("Training test:", df_train.shape)
print("Validation test:", df_validation.shape)
print("Test test:", df_test.shape)

## Data Visualization

In [None]:
df_train.plot(subplots=True, figsize=(50, 50))

In [None]:
df_validation.plot(subplots=True, figsize=(50, 50))

In [None]:
df_test.plot(subplots=True, figsize=(50, 50))

## Training Inputs (X and y)

In [None]:
X_train = df_train.drop(['timestamps_sensor', 'contacts'], axis=1)
y_train = df_train['contacts']
print("Training: ", X_train.shape)

X_validation = df_validation.drop(['timestamps_sensor', 'contacts'], axis=1)
y_validation = df_validation['contacts']
print("Validation: ", X_validation.shape)

X_test = df_test.drop(['timestamps_sensor', 'contacts'], axis=1)
y_test = df_test['contacts']
print("Test: ", X_test.shape)

# Data Scaling

In [None]:
# Scale data between -1 and 1
import numpy as np

max_train = X_train.max()
min_train = X_train.min()

print("max_train = ", list(max_train.values))
print("min_train = ", list(min_train.values), "\n")

X_train_scaled = (X_train - min_train) / (max_train - min_train) * 2 - 1
X_validation_scaled = (X_validation - min_train) / (max_train - min_train) * 2 - 1
X_test_scaled = (X_test - min_train) / (max_train - min_train) * 2 - 1

print("Training: ", X_train_scaled.shape)
print("Training: ", X_validation_scaled.shape)
print("Test: ", X_test_scaled.shape)

# Sliding Windows


## Dataset Generator (Train e Validation)

In [None]:
import numpy as np

def window_generator(data, labels, window_size=100, step=10, strategy='center'):
    n_samples = len(data)
    for i in range(0, n_samples - window_size + 1, step):
        window = data[i:i+window_size]
        window_labels = labels[i:i+window_size]

        # label definition with different strategies
        if strategy == 'center':
            label = window_labels[window_size // 2]
        elif strategy == 'max':
            label = np.max(window_labels)
        elif strategy == 'mode':
            label = np.bincount(window_labels).argmax()
        elif strategy == 'probability':
            label = np.mean(window_labels)
        else:
            raise ValueError(f"Invalid strategy: {strategy}")

        yield window.astype(np.float32), np.float32(label)    # yield returns one window at time -> should not saturate RAM

In [None]:
import tensorflow as tf

window_size = 100
step = 10
strategy = 'probability'
batch_size = 64

train_dim = (len(X_train_scaled) - window_size + 1) // step
validation_dim = (len(X_validation_scaled) - window_size + 1) // step

train_windows = tf.data.Dataset.from_generator(
    lambda: window_generator(X_train_scaled, y_train, window_size, step, strategy),
    output_signature=(
        tf.TensorSpec(shape=(window_size, X_train_scaled.shape[1]), dtype=tf.float32),  # X
        tf.TensorSpec(shape=(), dtype=tf.float32)                                       # y
    )
)

train_windows = train_windows.shuffle(buffer_size=train_dim, reshuffle_each_iteration=True)
train_windows = train_windows.batch(batch_size).repeat().prefetch(tf.data.AUTOTUNE)


validation_windows = tf.data.Dataset.from_generator(
    lambda: window_generator(X_validation_scaled, y_validation, window_size, step, strategy),
    output_signature=(
        tf.TensorSpec(shape=(window_size, X_validation_scaled.shape[1]), dtype=tf.float32),  # X
        tf.TensorSpec(shape=(), dtype=tf.float32)                                            # y
    )
)

# no shuffle nel validation set!
validation_windows = validation_windows.batch(batch_size).repeat().prefetch(tf.data.AUTOTUNE)

## Static Dataset (Test)

In [None]:
import numpy as np

def create_windows(data, labels, window_size=100, step=10, strategy='center'):

    X, y, idx = [], [], []
    n_samples, n_features = data.shape

    for i in range(0, n_samples - window_size + 1, step):
        window = data[i:i+window_size]
        window_labels = labels[i:i+window_size]
        index = i + window_size // 2

        if strategy == 'center':
            label = window_labels[window_size // 2]
        elif strategy == 'max':
            label = np.max(window_labels)
        elif strategy == 'mode':
            label = np.bincount(window_labels).argmax()
        elif strategy == 'probability':
            label = np.mean(window_labels)
        else:
            raise ValueError(f"Invalid strategy: {strategy}")

        X.append(window)
        y.append(label)
        idx.append(index)

    return (
        np.array(X, dtype=np.float32),
        np.array(y, dtype=np.float32),
        np.array(idx, dtype=np.int64)
    )

In [None]:
X_test_windows, y_test_windows, index_test = create_windows(X_test_scaled.values, y_test.values,
                                                            window_size=window_size,
                                                            step=step,
                                                            strategy=strategy)

print("Test test:", X_test_windows.shape, y_test_windows.shape)

# Model

In [None]:
# clear keras from previous session
import tensorflow as tf
tf.keras.backend.clear_session()

In [None]:
from tensorflow.keras import Input
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv1D, MaxPooling1D, BatchNormalization, LSTM, Dense, Dropout
from tensorflow.keras.metrics import Recall, Precision, AUC

time_steps = X_test_windows.shape[1]    # window size
n_features = X_test_windows.shape[2]    # number of features per sample

model = Sequential([
    Input(shape=(time_steps, n_features)),

    # LSTM
    LSTM(128, return_sequences=False, dropout=0.3),

    # Final dense
    Dense(32, activation='relu'),
    Dropout(0.3),
    Dense(1, activation='sigmoid')
])

model.compile(
    optimizer='adam',
    loss='mean_squared_error',
    metrics=['accuracy', Recall(), Precision()]
)

print("Input dimensions: ", model.input_shape)
model.summary()

In [None]:
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint

# Early stop to avoid overfitting
early_stop = EarlyStopping(
    monitor='val_loss',
    patience=10,
    restore_best_weights=True
)

# Save best model
checkpoint = ModelCheckpoint("/content/drive/MyDrive/Colab Notebooks/THESIS/Saved Models oct12/best_model_.keras",
                             monitor='val_loss',
                             save_best_only=True,
                             mode='min')

## Training

In [None]:
import math

history = model.fit(
    train_windows,
    validation_data = validation_windows,
    epochs=50,
    callbacks=[early_stop, checkpoint],
    steps_per_epoch = math.ceil(train_dim / batch_size),
    validation_steps = math.ceil(validation_dim / batch_size)
)

## Evaluation

In [None]:
from sklearn.metrics import classification_report, confusion_matrix

thr = 0.5

y_pred_prob = saved_model.predict(X_test_windows).flatten()
y_pred = (y_pred_prob > thr).astype(int)
y_test = (y_test_windows > thr).astype(int)

print(classification_report(y_test, y_pred))
print("CONFUSION MATRIX:\n", confusion_matrix(y_test, y_pred))

## Visualization

In [None]:
import matplotlib.pyplot as plt

plt.figure(figsize=(50, 30))

plt.subplot(311)
plt.plot(df_test['acc_x'], color='blue', alpha=0.3)
plt.plot(index_test, y_test_windows, color='gray', label='true')
plt.plot(index_test, y_pred_prob, color='black', label='pred')
plt.legend()

plt.subplot(312)
plt.plot(df_test['acc_y'], color='red', alpha=0.3)
plt.plot(index_test, y_test_windows, color='gray', label='true')
plt.plot(index_test, y_pred_prob, color='black', label='pred')
plt.legend()

plt.subplot(313)
plt.plot(df_test['acc_z'], color='green', alpha=0.3)
plt.plot(index_test, y_test_windows, color='gray', label='true')
plt.plot(index_test, y_pred_prob, color='black', label='pred')
plt.legend()

plt.show()

# ONNX Convertion

In [None]:
pip install "numpy<2.0"

In [None]:
!pip install tf2onnx onnx

In [None]:
import tensorflow as tf
import tf2onnx

# force the use of the CPU
with tf.device("/cpu:0"):
    model = tf.keras.models.load_model(
        "/content/drive/MyDrive/Colab Notebooks/THESIS/Saved Models oct12/best_model_1.keras",
        compile=False
    )

model_func = tf.keras.Model(inputs=model.inputs, outputs=model.outputs)

time_steps = 100
n_features = 19
spec = (tf.TensorSpec((None, time_steps, n_features), tf.float32, name="input"),)

output_path = "/content/drive/MyDrive/Colab Notebooks/THESIS/Saved Models oct12/best_model_1_cpu.onnx"
model_proto, _ = tf2onnx.convert.from_keras(
    model_func,
    input_signature=spec,
    output_path=output_path,
    opset=13
)

print("Model converted to ONNX format (CPU-compatibile) and saved in:", output_path)