In [None]:
import os
import glob
import sqlite3
import numpy as np
import pandas as pd
import tensorflow as tf
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import classification_report, confusion_matrix
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv1D, MaxPooling1D, GlobalAveragePooling1D, Dense
from tensorflow.keras.callbacks import EarlyStopping

# --- CONFIGURATION ---
DATA_FOLDER = "MHOOP_dataset"
OUTPUT_FOLDER = "MHOOP_outputs"
WINDOW_SIZE = 150         # 3 seconds @ 50Hz
STEP_SIZE = 75            # 50% overlap
EPOCHS = 200
BATCH_SIZE = 32

# Create the output folder if it doesn't exist
if not os.path.exists(OUTPUT_FOLDER):
    os.makedirs(OUTPUT_FOLDER)
    print(f"Created output directory: {OUTPUT_FOLDER}")

# ==========================================
# 1. DATA LOADING
# ==========================================
def load_and_merge_data(folder_path):
    all_data = []
    db_files = glob.glob(os.path.join(folder_path, "*.db"))
    
    if not db_files:
        print("No .db files found! Make sure your files are in the 'datasets' folder.")
        return pd.DataFrame()

    print(f"Found {len(db_files)} database files.")

    for db_file in db_files:
        filename = os.path.basename(db_file)
        label = os.path.splitext(filename)[0]
        
        print(f"Loading '{filename}' as class: '{label}'...")
        
        try:
            conn = sqlite3.connect(db_file)
            df = pd.read_sql_query("SELECT x, y, z FROM accelerometer_data ORDER BY local_timestamp", conn)
            conn.close()
            
            df['label'] = label
            all_data.append(df)
            print(f"  -> Loaded {len(df)} samples.")
        except Exception as e:
            print(f"  -> Error loading {db_file}: {e}")

    if not all_data:
        return pd.DataFrame()

    full_df = pd.concat(all_data, ignore_index=True)
    return full_df

# Load the data
print("--- STARTING DATA LOAD ---")
data = load_and_merge_data(DATA_FOLDER)

# ==========================================
# 2. PREPROCESSING & SPLITTING
# ==========================================
def create_windows_from_array(data_array, label, window_size, step_size):
    X = []
    y = []
    # Loop over the array to create windows
    for i in range(0, len(data_array) - window_size + 1, step_size):
        window = data_array[i : i + window_size]
        X.append(window)
        y.append(label)
    return X, y

def robust_time_split(df, window_size, step_size, val_ratio=0.15, test_ratio=0.15):
    X_train_list, y_train_list = [], []
    X_val_list, y_val_list = [], []
    X_test_list, y_test_list = [], []

    # Process each class separately
    for label, group in df.groupby('label'):
        sensor_data = group[['x', 'y', 'z']].values
        total_len = len(sensor_data)
        
        # Calculate split indices based on time
        # Train ends where Val begins
        train_end_idx = int(total_len * (1 - val_ratio - test_ratio))
        # Val ends where Test begins
        val_end_idx = int(total_len * (1 - test_ratio))
        
        # Split raw data by time
        train_segment = sensor_data[:train_end_idx]
        val_segment = sensor_data[train_end_idx:val_end_idx]
        test_segment = sensor_data[val_end_idx:]
        
        # Create windows for each segment
        w_train, l_train = create_windows_from_array(train_segment, label, window_size, step_size)
        w_val, l_val = create_windows_from_array(val_segment, label, window_size, step_size)
        w_test, l_test = create_windows_from_array(test_segment, label, window_size, step_size)
        
        X_train_list.extend(w_train)
        y_train_list.extend(l_train)
        X_val_list.extend(w_val)
        y_val_list.extend(l_val)
        X_test_list.extend(w_test)
        y_test_list.extend(l_test)

    return (np.array(X_train_list), np.array(y_train_list), 
            np.array(X_val_list), np.array(y_val_list),
            np.array(X_test_list), np.array(y_test_list))

if not data.empty:
    # Perform the Time-Based Split into 3 Buckets
    X_train_raw, y_train_raw, X_val_raw, y_val_raw, X_test_raw, y_test_raw = robust_time_split(
        data, WINDOW_SIZE, STEP_SIZE, val_ratio=0.15, test_ratio=0.15
    )
    
    print(f"\nTotal Windows: {len(X_train_raw) + len(X_val_raw) + len(X_test_raw)}")
    print(f"Train Windows: {len(X_train_raw)}")
    print(f"Val Windows:   {len(X_val_raw)}")
    print(f"Test Windows:  {len(X_test_raw)}")

    # Encode Labels
    label_encoder = LabelEncoder()
    # Fit on all labels to ensure we know all classes
    all_labels = np.concatenate([y_train_raw, y_val_raw, y_test_raw])
    label_encoder.fit(all_labels)
    
    class_names = label_encoder.classes_
    num_classes = len(class_names)
    print("Label Mapping:", dict(zip(range(num_classes), class_names)))
    # Convert to One-Hot Encoding
    y_train = to_categorical(label_encoder.transform(y_train_raw))
    y_val = to_categorical(label_encoder.transform(y_val_raw))
    y_test = to_categorical(label_encoder.transform(y_test_raw))

else:
    print("No data loaded. Please check your .db files.")
    exit()

# Plot Raw Counts
def plot_class_distribution(y_train, y_val, y_test, classes):
    train_indices = np.argmax(y_train, axis=1)
    val_indices = np.argmax(y_val, axis=1)
    test_indices = np.argmax(y_test, axis=1)

    plot_data = []
    for split_name, indices in [("Train", train_indices), ("Validation", val_indices), ("Test", test_indices)]:
        counts = pd.Series(indices).value_counts().sort_index()
        for class_idx in range(len(classes)):
            count = counts.get(class_idx, 0)
            plot_data.append({
                "Class": classes[class_idx],
                "Split": split_name,
                "Count": count
            })
            
    df_plot = pd.DataFrame(plot_data)

    plt.figure(figsize=(12, 6))
    sns.set_style("whitegrid")
    ax = sns.barplot(data=df_plot, x="Class", y="Count", hue="Split", palette="muted")
    for container in ax.containers:
        ax.bar_label(container, fontsize=10)
    
    plt.title("Class Distribution (Raw Sample Counts)", fontsize=14)
    plt.ylabel("Number of Samples", fontsize=12)
    plt.xlabel("Activity Label", fontsize=12)
    plt.xticks(rotation=45)
    plt.legend(title="Dataset Split")
    plt.tight_layout()
    plt.show()

print("\n--- GENERATING DISTRIBUTION PLOT ---")
plot_class_distribution(y_train, y_val, y_test, class_names)

# Normalization
X_train_flat = X_train_raw.reshape(-1, 3)
means = np.mean(X_train_flat, axis=0)
stds = np.std(X_train_flat, axis=0)

print(f"\nNormalization Stats:")
print(f"Means: {means}")
print(f"Stds:  {stds}")

X_train = (X_train_raw - means) / stds
X_val = (X_val_raw - means) / stds
X_test = (X_test_raw - means) / stds


In [None]:
# ==========================================
# 4. PLOT AVERAGE WAVEFORMS PER CLASS
# ==========================================
def plot_average_waveforms(X, y_encoded, classes):
    num_classes = len(classes)
    fig, axes = plt.subplots(num_classes, 1, figsize=(12, 4 * num_classes), sharex=True)
    
    if num_classes == 1:
        axes = [axes]

    time_steps = np.arange(X.shape[1])

    for i, class_name in enumerate(classes):
        # Filter all windows belonging to this class
        class_indices = np.where(y_encoded == i)[0]
        class_windows = X[class_indices]
        
        # Calculate the mean across all windows for this class (axis 0)
        # Result shape: (WINDOW_SIZE, 3)
        mean_waveform = np.mean(class_windows, axis=0)
        
        # Plot X, Y, Z
        ax = axes[i]
        ax.plot(time_steps, mean_waveform[:, 0], label='X-axis', color='r', alpha=0.8)
        ax.plot(time_steps, mean_waveform[:, 1], label='Y-axis', color='g', alpha=0.8)
        ax.plot(time_steps, mean_waveform[:, 2], label='Z-axis', color='b', alpha=0.8)
        
        ax.set_title(f"Average Waveform: {class_name} (n={len(class_indices)})")
        ax.set_ylabel("Amplitude")
        ax.legend(loc="upper right")
        ax.grid(True)

    plt.xlabel("Time Steps (Window Size)")
    plt.tight_layout()
    plt.show()

print("\n--- PLOTTING AVERAGE WAVEFORMS ---")
plot_average_waveforms(X_train_raw, np.argmax(y_train, axis=1), class_names)

In [None]:
# ==========================================
# 5. MODEL TRAINING (1D CNN)
# ==========================================
model = Sequential([
    Conv1D(filters=16, kernel_size=3, activation='relu', input_shape=(WINDOW_SIZE, 3)),
    MaxPooling1D(pool_size=2),
    Conv1D(filters=32, kernel_size=3, activation='relu'),
    GlobalAveragePooling1D(),
    Dense(num_classes, activation='softmax')
])

model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])

early_stopping = EarlyStopping(
    monitor='val_loss',  # Watch the validation loss
    min_delta=0.001,
    patience=10,         # Stop if it doesn't improve for 10 epochs
    restore_best_weights=True # Go back to the best model found
)

print("\n--- TRAINING MODEL ---")
history = model.fit(
    X_train, y_train,
    epochs=EPOCHS,
    batch_size=BATCH_SIZE,
    validation_data=(X_val, y_val),
    callbacks=[early_stopping],
    verbose=1
)

# SAVE ORIGINAL H5 MODEL
h5_path = os.path.join(OUTPUT_FOLDER, "model.h5")
model.save(h5_path)
print(f"Original model saved to: {h5_path}")

# ==========================================
# 6. PLOT TRAINING HISTORY
# ==========================================
def plot_training_history(history):
    acc = history.history['accuracy']
    val_acc = history.history['val_accuracy']
    loss = history.history['loss']
    val_loss = history.history['val_loss']
    epochs_range = range(1, len(acc) + 1)

    plt.figure(figsize=(14, 5))

    # Plot Accuracy
    plt.subplot(1, 2, 1)
    plt.plot(epochs_range, acc, label='Training Accuracy', marker='o')
    plt.plot(epochs_range, val_acc, label='Validation Accuracy', marker='o')
    plt.title('Model Accuracy')
    plt.xlabel('Epochs')
    plt.ylabel('Accuracy')
    plt.legend(loc='lower right')
    plt.grid(True)

    # Plot Loss
    plt.subplot(1, 2, 2)
    plt.plot(epochs_range, loss, label='Training Loss', marker='o')
    plt.plot(epochs_range, val_loss, label='Validation Loss', marker='o')
    plt.title('Model Loss')
    plt.xlabel('Epochs')
    plt.ylabel('Loss')
    plt.legend(loc='upper right')
    plt.grid(True)

    plt.tight_layout()
    plt.show()

print("\n--- PLOTTING TRAINING CURVES ---")
plot_training_history(history)

# ==========================================
# 7. EVALUATION
# ==========================================
print("\n--- EVALUATION ON TEST SET ---")
y_pred = model.predict(X_test)
y_pred_classes = np.argmax(y_pred, axis=1)
y_true_classes = np.argmax(y_test, axis=1)

print(classification_report(y_true_classes, y_pred_classes, target_names=class_names))

# ==========================================
# 8. PLOT CONFUSION MATRIX
# ==========================================
def plot_confusion_matrix(y_true, y_pred, classes):
    # Compute confusion matrix
    cm = confusion_matrix(y_true, y_pred)

    plt.figure(figsize=(10, 8))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', 
                xticklabels=classes, yticklabels=classes)
    plt.xlabel('Predicted Label')
    plt.ylabel('True Label')
    plt.title('Confusion Matrix')
    plt.show()

print("\n--- PLOTTING CONFUSION MATRIX ---")
plot_confusion_matrix(y_true_classes, y_pred_classes, class_names)

In [None]:
# ==========================================
# 9. EXPORT & SIZE COMPARISON
# ==========================================
print("\n--- EXPORTING MODELS & COMPARING SIZES ---")

# Define paths
float_model_path = os.path.join(OUTPUT_FOLDER, "model_float32.tflite")
quant_model_path = os.path.join(OUTPUT_FOLDER, "model_quantized.tflite")

# Export Float32 Model (Non-Quantized)
converter_float = tf.lite.TFLiteConverter.from_keras_model(model)
tflite_float_model = converter_float.convert()

with open(float_model_path, "wb") as f:
    f.write(tflite_float_model)

# Export Int8 Model (Quantized)
def representative_data_gen():
    for input_value in tf.data.Dataset.from_tensor_slices(X_train).batch(1).take(100):
        yield [tf.cast(input_value, tf.float32)]

converter_quant = tf.lite.TFLiteConverter.from_keras_model(model)
converter_quant.optimizations = [tf.lite.Optimize.DEFAULT]
converter_quant.representative_dataset = representative_data_gen
converter_quant.target_spec.supported_ops = [tf.lite.OpsSet.TFLITE_BUILTINS_INT8]
converter_quant.inference_input_type = tf.int8
converter_quant.inference_output_type = tf.int8

tflite_quant_model = converter_quant.convert()

with open(quant_model_path, "wb") as f:
    f.write(tflite_quant_model)

# Calculate and Print Sizes
h5_size = os.path.getsize(h5_path)
float_size = os.path.getsize(float_model_path)
quant_size = os.path.getsize(quant_model_path)
reduction_h5_to_float = (1 - (float_size / h5_size)) * 100
reduction_h5_to_int = (1 - (quant_size / h5_size)) * 100
reduction_float_to_int = (1 - (quant_size / float_size)) * 100

print(f"\nModel Size Comparison:")
print(f"  H5 Model:   {h5_size / 1024:.2f} KB")
print(f"  Float32 Model:   {float_size / 1024:.2f} KB")
print(f"  Quantized Model: {quant_size / 1024:.2f} KB")
print(f"  Reduction (TFlite Convert): {reduction_h5_to_float:.2f}%")
print(f"  Reduction (TFlite Convert + quantisation): {reduction_h5_to_int:.2f}%")
print(f"  Reduction (quantisation): {reduction_float_to_int:.2f}%")

# ============================================
# 10. Generate C++ Header for Validation Data
# ============================================

print("\n--- GENERATING C++ HEADER (validation_data.h) ---")

num_samples = 5
indices = np.random.choice(len(X_test), num_samples)
samples_raw = X_test_raw[indices]
true_labels = y_test[indices]

cpp_content = f"""
#ifndef VALIDATION_DATA_H
#define VALIDATION_DATA_H

// Auto-generated validation data
// Class Map: {dict(zip(range(num_classes), class_names))}

const int num_validation_samples = {num_samples};

// Raw Input Data (Before Normalization)
const int16_t validation_samples[{num_samples}][{WINDOW_SIZE}][3] = {{
"""

for sample in samples_raw:
    cpp_content += "    {\n"
    for row in sample:
        cpp_content += f"        {{ {int(row[0])}, {int(row[1])}, {int(row[2])} }},\n"
    cpp_content += "    },\n"

cpp_content += "};\n\n"
cpp_content += f"// Expected Classes (Indices): {np.argmax(true_labels, axis=1).tolist()}\n"
cpp_content += "#endif // VALIDATION_DATA_H\n"

header_path = os.path.join(OUTPUT_FOLDER, "validation_data.h")
with open(header_path, "w") as f:
    f.write(cpp_content)

print("Saved 'validation_data.h'")
print("Done!")