In [1]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
!pip install tensorflow_model_optimization
!pip install keras_tuner

In [3]:
# ============================================================
# Imports & setup
# ============================================================

import sys
import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow.keras import layers, models
from tensorflow.keras.callbacks import LearningRateScheduler
from scipy.signal import stft
from collections import Counter
from sklearn.model_selection import StratifiedKFold
from sklearn.metrics import f1_score, classification_report

# Optional: path to local modules
sys.path.append('/content/drive/MyDrive/deep_learning_quantized')

In [4]:
# ============================================================
# Configuration
# ============================================================

params = {
    "seed": 256,
    "batch_size": 32,
    "epochs": 30,
    "fine_tune_epochs": 5,
    "learning_rate": 1e-3,
    "fine_tune_lr": 1e-4,
    "step_size": 3,
    "gamma": 0.85,
    "window_size": 512,
    "overlap": 512 * 7 // 8,
    "num_bits": 8,
    "num_classes": None
}

np.random.seed(params["seed"])
tf.random.set_seed(params["seed"])

In [5]:
# Choose dataset and the biomedical signal modality to be used

dataset_id = 1
data_used = 1

In [6]:
# ============================================================
# Data loading
# ===========================================================

def read_data_from_csv(csv_file_name):
      data = pd.read_csv(csv_file_name)
      x_data = data.iloc[:, :-2].values
      y_data = data.iloc[:, -2].values
      subj = data.iloc[:, -1].values

      return x_data, y_data, subj

if dataset_id == 1:
  csv_file_name = '/content/drive/MyDrive/deep_learning_quantized/multimodal_data.csv'
  labels = ('low', 'medium', 'high')
  avg = True
  fs = 64
  step_size = 16

elif dataset_id == 2:
  csv_file_name = '/content/drive/MyDrive/deep_learning_quantized/combined_data.csv'
  labels = ('rest', 'squat', 'step')
  avg = False
  fs = 400
  step_size = 64

else:
  csv_file_name = '/content/drive/MyDrive/deep_learning_quantized/output_dataset.csv'
  labels = ('baseline', 'stress', 'amusement', 'meditation')
  avg = False
  fs = 64
  step_size = 16

features, targets, subj_data = read_data_from_csv(csv_file_name)

In [None]:
# Biomedical signal (modality) selection

if data_used == 1: # PPG only
    features = features[:, 0].reshape(-1, 1)

elif data_used == 2:  # ACC only (3-axis accelerometer)
    features = features[:, 1:4]

elif data_used == 3: # ECG only
    features = features[:, 4].reshape(-1, 1)

elif data_used == 4: # EDA only
    features = features[:, 5].reshape(-1, 1)

elif data_used == 5: # EMG only
    features = features[:, 6].reshape(-1, 1)

else: # Combined PPG + ACC (multimodal baseline)
    features = features[:, 0:4]

print("Selected feature shape:", features.shape)
print("Targets shape:", targets.shape)

# features: (N, C) where C depends on selected modality
# targets:  (N,)

In [8]:
# ============================================================
# Sliding window segmentation
# ============================================================

def map_values(val):
    """
    Map continuous affective scores to discrete classes.

    This mapping is used for the AffectiveROAD dataset, where labels
    are provided as continuous values (e.g., stress intensity).
    Thresholds define low / medium / high affective states.
    """
    if val < 0.4:
        return 0   # low
    if val < 0.75:
        return 1   # medium
    return 2       # high


def apply_sliding_window(features, targets, subj_data,
                         window_size, overlap, avg=False):
    """
    Segment the time series into overlapping sliding windows.

    Label assignment strategy:
    ---------------------------
    - avg = True:
        Used for datasets with continuous labels (e.g., AffectiveROAD).
        Labels within a window are averaged and mapped to discrete classes
        using `map_values`.

    - avg = False:
        Used for datasets with discrete class labels.
        The window label is determined by majority voting.

    Subject consistency:
    --------------------
    Windows spanning multiple subjects are discarded to avoid
    subject leakage.
    """

    sliding_X_data = []
    sliding_y_data = []
    i = 0

    while i < len(features) - window_size:
        window_X = features[i:i + window_size]
        window_y = targets[i:i + window_size]
        subj_window = subj_data[i:i + window_size]

        # Ensure all samples in the window belong to the same subject
        if len(np.unique(subj_window)) == 1:

            if avg:
                # Continuous labels → average + thresholding
                y_avg = np.mean(window_y)
                y_label = map_values(y_avg)

            else:
                # Discrete labels → majority vote
                y_label = Counter(window_y).most_common(1)[0][0]

            sliding_X_data.append(window_X)
            sliding_y_data.append(y_label)

        # Move window forward
        i += (window_size - overlap)

    return (
        np.array(sliding_X_data),
        np.array(sliding_y_data).reshape(-1,)
    )

In [27]:
# ------------------------------------------------------------
# Signal downsampling and segmentation
# ------------------------------------------------------------

# To reduce temporal redundancy and computational cost,
# every `step_size`-th sample is retained.
features_ds = features[::step_size]
targets_ds  = targets[::step_size]
subj_ds     = subj_data[::step_size]

# Sliding window segmentation
X_win, y_win = apply_sliding_window(
    features_ds,
    targets_ds,
    subj_ds,
    params["window_size"],
    params["overlap"],
    avg
)

# Final tensors used for training
X_data = X_win.astype(np.float32)
y_data = y_win.astype(np.uint8)

print("X_data shape:", X_data.shape)
print("y_data shape:", y_data.shape)

params["num_classes"] = len(np.unique(y_data))

X_data shape: (2444, 512, 1)
y_data shape: (2444,)


In [28]:
def compute_stft_features(X, fs=64, nperseg=32):
    """Compute real + imaginary STFT per channel."""
    stft_out = []

    for sample in X:
        channels = []
        for ch in range(sample.shape[1]):
            _, _, Zxx = stft(sample[:, ch], fs=fs, nperseg=nperseg)
            channels.append(np.real(Zxx))
            channels.append(np.imag(Zxx))
        stft_out.append(np.stack(channels, axis=-1))

    stft_out = np.array(stft_out)
    stft_out = stft_out[:, 1:, 1:]  # remove DC bins

    # Per-sample, per-channel normalization
    for i in range(len(stft_out)):
        mean = stft_out[i].mean(axis=(0, 1), keepdims=True)
        std = stft_out[i].std(axis=(0, 1), keepdims=True)
        stft_out[i] = (stft_out[i] - mean) / (std + 1e-8)

    return stft_out.astype(np.float32)


X_stft = compute_stft_features(X_win, fs // step_size, 32)

In [29]:
# ============================================================
# Uniform int8 quantization (input-side)
# ============================================================

def quantize_array(x, num_bits=8):
    """Symmetric uniform quantization."""
    qmax = (2 ** (num_bits - 1)) - 1
    scale = np.max(np.abs(x)) / qmax if np.max(np.abs(x)) != 0 else 1.0
    x_q = np.clip(np.round(x / scale), -qmax - 1, qmax)
    return x_q.astype(np.int8)


X_data = quantize_array(X_stft, params["num_bits"])

In [30]:
# ============================================================
# Model definition
# ============================================================

def create_functional_resnet(input_shape, num_classes):
    inputs = layers.Input(shape=input_shape)

    x = layers.Conv2D(8, 3, strides=2, padding="same", activation="relu")(inputs)
    x = layers.BatchNormalization()(x)

    x = layers.Conv2D(16, 3, strides=2, padding="same", activation="relu")(x)
    x = layers.BatchNormalization()(x)

    shortcut = x

    x = layers.Conv2D(32, 1, activation="relu")(x)
    x = layers.BatchNormalization()(x)

    x = layers.DepthwiseConv2D(3, padding="same", activation="relu")(x)
    x = layers.BatchNormalization()(x)

    x = layers.Conv2D(16, 1, activation="relu")(x)
    x = layers.BatchNormalization()(x)

    x = layers.Add()([shortcut, x])

    x = layers.GlobalAveragePooling2D()(x)
    x = layers.Dropout(0.5)(x)
    outputs = layers.Dense(num_classes, activation="softmax")(x)

    return models.Model(inputs, outputs)

In [31]:
# ============================================================
# Weight quantization (post-training)
# ============================================================

def quantize_tensor(x, num_bits):
    qmax = (2 ** (num_bits - 1)) - 1
    scale = tf.reduce_max(tf.abs(x)) / qmax
    scale = tf.where(scale == 0, 1.0, scale)
    return tf.clip_by_value(tf.round(x / scale), -qmax - 1, qmax) * scale


def apply_weight_quantization(model, num_bits):
    """Quantize Conv2D and Dense weights."""
    for layer in model.layers:
        if isinstance(layer, (layers.Conv2D, layers.Dense, layers.DepthwiseConv2D)):
            weights = layer.get_weights()
            if weights:
                q_weights = [quantize_tensor(tf.constant(w), num_bits).numpy() for w in weights]
                layer.set_weights(q_weights)
    return model

In [34]:
# ============================================================
# Learning rate scheduler
# ============================================================

def lr_schedule(epoch, lr):
    if epoch > 0 and epoch % params["step_size"] == 0:
        return lr * params["gamma"]
    return lr

In [32]:
# ============================================================
# TFLite quantization
# ============================================================

def representative_data_gen(X):
    """
    Representative dataset for full integer quantization.
    Uses a small subset of training data.
    """
    for i in range(min(100, len(X))):
        yield [X[i:i+1].astype(np.float32)]

def evaluate_tflite_model(tflite_model, X, y_true):
    """Run inference with a TFLite int8 model."""
    interpreter = tf.lite.Interpreter(model_content=tflite_model)
    interpreter.allocate_tensors()

    input_details = interpreter.get_input_details()
    output_details = interpreter.get_output_details()

    y_pred = []

    for i in range(len(X)):
        interpreter.set_tensor(
            input_details[0]["index"],
            X[i:i+1].astype(np.float32)
        )
        interpreter.invoke()
        output = interpreter.get_tensor(output_details[0]["index"])
        y_pred.append(output)

    y_pred = np.argmax(np.vstack(y_pred), axis=1)
    acc = np.mean(y_pred == y_true)
    f1 = f1_score(y_true, y_pred, average="weighted")

    return acc, f1

In [None]:
from sklearn.model_selection import StratifiedKFold
from sklearn.metrics import f1_score, classification_report
import tensorflow as tf
import numpy as np

num_folds = 5
skf = StratifiedKFold(n_splits=num_folds, shuffle=True, random_state=params["seed"])

acc_scores, f1_scores = [], []
tflite_acc_scores, tflite_f1_scores = [], []

def evaluate_tflite_model(tflite_model, X, y_true):
    interpreter = tf.lite.Interpreter(model_content=tflite_model)
    interpreter.allocate_tensors()
    input_details = interpreter.get_input_details()
    output_details = interpreter.get_output_details()

    # Quantize input to int8
    scale, zero_point = input_details[0]['quantization']
    X_int8 = (X / scale + zero_point).astype(np.int8)

    y_pred_list = []
    for i in range(len(X_int8)):
        interpreter.set_tensor(input_details[0]['index'], X_int8[i:i+1])
        interpreter.invoke()
        out = interpreter.get_tensor(output_details[0]['index'])
        # Dequantize output
        out_float = (out.astype(np.float32) - output_details[0]['quantization'][1]) * output_details[0]['quantization'][0]
        y_pred_list.append(out_float)

    y_pred = np.vstack(y_pred_list)
    y_pred_classes = np.argmax(y_pred, axis=1)
    acc = np.mean(y_pred_classes == y_true)
    f1 = f1_score(y_true, y_pred_classes, average="weighted")
    return acc, f1

for fold, (tr_idx, val_idx) in enumerate(skf.split(X_data, y_win), 1):
    print(f"\n{'='*40}\nFold {fold}\n{'='*40}")

    # Split data
    X_tr, X_val = X_data[tr_idx], X_data[val_idx]
    y_tr, y_val = y_win[tr_idx], y_win[val_idx]

    # One-hot encoding for Keras
    y_tr_cat = tf.keras.utils.to_categorical(y_tr, params["num_classes"])
    y_val_cat = tf.keras.utils.to_categorical(y_val, params["num_classes"])

    # --- Step 1: Create float32 model ---
    model = create_functional_resnet(X_tr.shape[1:], params["num_classes"])
    model.compile(
        optimizer=tf.keras.optimizers.AdamW(params["learning_rate"]),
        loss="categorical_crossentropy",
        metrics=["accuracy"]
    )

    if fold == 1: print(model.summary())

    # Train float model
    model.fit(
        X_tr, y_tr_cat,
        epochs=params["epochs"],
        batch_size=params["batch_size"],
        validation_data=(X_val, y_val_cat),
        callbacks=[tf.keras.callbacks.LearningRateScheduler(lr_schedule)],
        verbose=1
    )

    # --- Step 2: Apply weight quantization ---
    model_quant = apply_weight_quantization(model, params["num_bits"])

    # --- Step 3: Fine-tune quantized model ---
    model_quant.compile(
        optimizer=tf.keras.optimizers.Adam(params["fine_tune_lr"]),
        loss="categorical_crossentropy",
        metrics=["accuracy"]
    )

    model_quant.fit(
        X_tr, y_tr_cat,
        epochs=params["fine_tune_epochs"],
        batch_size=8,
        validation_data=(X_val, y_val_cat),
        verbose=1
    )

    # --- Step 4: Evaluate Keras model ---
    val_loss, val_acc = model_quant.evaluate(X_val, y_val_cat, verbose=0)
    y_pred_classes = np.argmax(model_quant.predict(X_val), axis=1)
    f1 = f1_score(y_val, y_pred_classes, average="weighted")

    acc_scores.append(val_acc)
    f1_scores.append(f1)

    print("Keras float/quantized model metrics:")
    print(classification_report(y_val, y_pred_classes))

    # --- Step 5: Convert to INT8 TFLite ---
    converter = tf.lite.TFLiteConverter.from_keras_model(model_quant)
    converter.optimizations = [tf.lite.Optimize.DEFAULT]

    # Use representative dataset for full integer quantization
    def representative_data_gen():
        for i in range(100):
            yield [X_tr[i:i+1].astype(np.float32)]
    converter.representative_dataset = representative_data_gen
    converter.target_spec.supported_ops = [tf.lite.OpsSet.TFLITE_BUILTINS_INT8]
    converter.inference_input_type = tf.int8
    converter.inference_output_type = tf.int8

    tflite_model = converter.convert()

    # --- Step 6: Evaluate TFLite INT8 model ---
    tflite_acc, tflite_f1 = evaluate_tflite_model(tflite_model, X_val, y_val)
    tflite_acc_scores.append(tflite_acc)
    tflite_f1_scores.append(tflite_f1)

    print(f"TFLite INT8 model - Accuracy: {tflite_acc:.4f}, F1: {tflite_f1:.4f}")

In [None]:
# --- Summary ---
print("\n===== Cross-validation results =====")
print("Keras Accuracy:", np.mean(acc_scores), "F1:", np.mean(f1_scores))
print("TFLite INT8 Accuracy:", np.mean(tflite_acc_scores), "F1:", np.mean(tflite_f1_scores))