In [None]:
import os
import warnings
import numpy as np
import pandas as pd
import tensorflow as tf
import matplotlib.pyplot as plt
from tensorflow.keras import layers, models, regularizers
from sklearn.model_selection import train_test_split, KFold
from sklearn.preprocessing import StandardScaler
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau
from sklearn.utils.class_weight import compute_class_weight

# Suppress warnings and TensorFlow logs
warnings.filterwarnings('ignore')
tf.get_logger().setLevel('ERROR')

# Directory and groups
durations = ['5s', '5s_overlap', '10s', '10s_overlap', '15s', '15s_overlap']
csv_dir = 'final_dataset/CSVs'

# Model builder with reduced complexity, increased regularization, dropout, batch normalization
def build_model(input_shape, num_classes):
    model = models.Sequential([
        layers.Dense(128, activation='relu', input_shape=(input_shape,),
                     kernel_regularizer=regularizers.l2(0.01),  # Stronger L2 Regularization
                     bias_regularizer=regularizers.l1(0.01)),  # Stronger L1 Regularization
        layers.Dropout(0.8),  # Increased dropout rate
        layers.BatchNormalization(),
        layers.Dense(64, activation='relu',
                     kernel_regularizer=regularizers.l2(0.01),  # Stronger L2 Regularization
                     bias_regularizer=regularizers.l1(0.01)),  # Stronger L1 Regularization
        layers.Dropout(0.7),  # Increased dropout rate
        layers.BatchNormalization(),
        layers.Dense(num_classes, activation='softmax')
    ])
    return model

# Data augmentation function
def augment_data(X, noise_factor=0.05):
    return X + noise_factor * np.random.normal(loc=0.0, scale=1.0, size=X.shape)

# Training config
batch_size = 32
epochs = 20
results = {}

# Columns
numerical_cols = ['mean_flow', 'std_flow', 'edge_ratio', 'keypoint_count', 'fft_peak']
categorical_col = 'view'

# Training + Evaluation + Visualization loop
for group in durations:
    csv_path = os.path.join(csv_dir, f"{group}.csv")
    if not os.path.isfile(csv_path):
        print(f"Skipping {group}: CSV not found at {csv_path}")
        continue

    print(f"\n=== Training for group: {group} ===")

    # Load CSV
    df = pd.read_csv(csv_path)
    
    # Check if the required columns exist
    missing_cols = [col for col in numerical_cols if col not in df.columns]
    if missing_cols:
        print(f"Skipping {group}: Missing columns {', '.join(missing_cols)}")
        continue
    
    if categorical_col not in df.columns or 'class' not in df.columns:
        print(f"Skipping {group}: Missing 'view' or 'class' column.")
        continue

    # Encode labels
    y = df['class'].astype(str).values
    label_names = sorted(set(y))
    label_to_index = {name: idx for idx, name in enumerate(label_names)}
    y_encoded = np.array([label_to_index[label] for label in y])

    # Features
    X_num = df[numerical_cols].values.astype(np.float32)
    view_raw = df[categorical_col].astype(str).values
    view_map = {'front': 0, 'angle': 1}
    view_encoded = np.array([view_map[v] for v in view_raw]).reshape(-1, 1).astype(np.float32)

    # Split data: First Split: 85% for training/validation and 15% for testing
    Xn_train_val, Xn_test, V_train_val, V_test, y_train_val, y_test = train_test_split(
        X_num, view_encoded, y_encoded, test_size=0.15, stratify=y_encoded, random_state=42
    )

    # Second Split: 70% for training and 15% for validation (from the remaining 85%)
    Xn_train, Xn_val, V_train, V_val, y_train, y_val = train_test_split(
        Xn_train_val, V_train_val, y_train_val, test_size=0.1765, stratify=y_train_val, random_state=42
    )

    # Normalize numerical features
    scaler = StandardScaler()
    Xn_train = scaler.fit_transform(Xn_train)
    Xn_val = scaler.transform(Xn_val)
    Xn_test = scaler.transform(Xn_test)

    # Augment the data
    Xn_train = augment_data(Xn_train)
    Xn_val = augment_data(Xn_val)
    Xn_test = augment_data(Xn_test)

    # Combine numerical features with categorical 'view' feature
    X_train = np.concatenate([Xn_train, V_train], axis=1)
    X_val = np.concatenate([Xn_val, V_val], axis=1)
    X_test = np.concatenate([Xn_test, V_test], axis=1)

    # Datasets
    train_ds = tf.data.Dataset.from_tensor_slices((X_train, y_train)).shuffle(len(X_train)).batch(batch_size)
    val_ds = tf.data.Dataset.from_tensor_slices((X_val, y_val)).batch(batch_size)
    test_ds = tf.data.Dataset.from_tensor_slices((X_test, y_test)).batch(batch_size)

    # Compute class weights to handle class imbalance
    class_weights = compute_class_weight('balanced', classes=np.unique(y_encoded), y=y_encoded)
    class_weight_dict = {i: class_weights[i] for i in range(len(label_names))}

    # Build model
    model = build_model(input_shape=X_train.shape[1], num_classes=len(label_names))
    model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=1e-4),  # Lower initial learning rate
                  loss='sparse_categorical_crossentropy',
                  metrics=['accuracy'])

    # Early stopping with increased patience
    early_stop = EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True)

    # ReduceLROnPlateau with more aggressive learning rate decay
    lr_scheduler = ReduceLROnPlateau(monitor='val_loss', factor=0.3, patience=2, min_lr=1e-6)

    # Training
    history = model.fit(train_ds, validation_data=val_ds, epochs=epochs, 
                        callbacks=[early_stop, lr_scheduler], class_weight=class_weight_dict)

    # Evaluate model on test set
    test_loss, test_acc = model.evaluate(test_ds)
    results[group] = {'accuracy': test_acc, 'loss': test_loss}
    print(f"Group {group} — Test Accuracy: {test_acc:.4f}")

    # Visualization
    plt.figure(figsize=(12, 5))

    # Plot Loss
    plt.subplot(1, 2, 1)
    plt.plot(history.history['loss'], label='Train Loss', marker='o')
    plt.plot(history.history['val_loss'], label='Val Loss', marker='o')
    plt.title(f'{group} - Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend()
    plt.grid(True)

    # Plot Accuracy
    plt.subplot(1, 2, 2)
    plt.plot(history.history['accuracy'], label='Train Accuracy', marker='o')
    plt.plot(history.history['val_accuracy'], label='Val Accuracy', marker='o')
    plt.title(f'{group} - Accuracy')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy')
    plt.legend()
    plt.grid(True)

    plt.tight_layout()
    plt.show()

# Summary of Results
print("\n=== Summary ===")
for grp, res in results.items():
    print(f"{grp}: Accuracy={res['accuracy']:.4f}, Loss={res['loss']:.4f}")




=== Training for group: 5s ===
Epoch 1/20
Epoch 2/20
Epoch 3/20
Epoch 4/20
Epoch 5/20
Epoch 6/20
Epoch 7/20
Epoch 8/20
Epoch 9/20
Epoch 10/20
Epoch 11/20
Epoch 12/20