In [None]:
# Imports
import os
import cv2
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from datetime import datetime
from collections import defaultdict
import numpy as np

import tensorflow as tf
from sklearn.model_selection import StratifiedKFold
from sklearn.metrics import confusion_matrix, classification_report, f1_score
from tensorflow.keras.layers import Conv1D, MaxPooling1D, Dropout, LSTM, Dense, BatchNormalization

In [None]:
from google.colab import drive
drive.mount('/content/drive')

#ModelTrainer

In [None]:
def load_data(data_dir):
    print("Loading preprocessed data...")
    X = np.load(os.path.join(data_dir, 'X.npy'))
    y = np.load(os.path.join(data_dir, 'y.npy'))
    confidences = np.load(os.path.join(data_dir, 'confidences.npy'))
    class_mapping = np.load(os.path.join(data_dir, 'class_mapping.npy'), allow_pickle=True).item()

    print(f"Loaded {len(X)} samples")
    print(f"Input shape: {X.shape}")
    print("\nClass distribution:")
    for class_name, class_idx in class_mapping.items():
        total = sum(1 for label in y if label == class_idx)
        print(f"{class_name}: {total} samples")

    return X, y, confidences, class_mapping

def build_model(input_shape, num_classes):
    model = tf.keras.Sequential([
        Conv1D(64, kernel_size=3, padding='same', activation='relu', kernel_regularizer=tf.keras.regularizers.l2(0.01)),
        BatchNormalization(),
        MaxPooling1D(pool_size=2),
        Dropout(0.35),
        LSTM(128, return_sequences=True, kernel_regularizer=tf.keras.regularizers.l2(0.01)),
        BatchNormalization(),
        Dropout(0.35),
        LSTM(64, kernel_regularizer=tf.keras.regularizers.l2(0.01)),
        BatchNormalization(),
        Dropout(0.35),
        Dense(64, activation='relu', kernel_regularizer=tf.keras.regularizers.l2(0.01)),
        BatchNormalization(),
        Dropout(0.3),
        Dense(num_classes, activation='softmax')
    ])
    model.compile(optimizer=tf.keras.optimizers.Adam(0.0003),
                  loss='sparse_categorical_crossentropy',
                  metrics=['accuracy'])
    model.build(input_shape=(None,) + input_shape)
    model.summary()
    return model

def get_callbacks():
    class OverfittingDetectionCallback(tf.keras.callbacks.Callback):
        def on_epoch_end(self, epoch, logs=None):
            if logs.get('accuracy') - logs.get('val_accuracy') > 0.2:
                print("\nWarning: Possible overfitting detected!")
    return [
        tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=12, restore_best_weights=True),
        tf.keras.callbacks.ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=5, min_lr=1e-6),
        OverfittingDetectionCallback()
    ]

def calculate_class_weights(y_train):
    class_weights = {}
    unique, counts = np.unique(y_train, return_counts=True)
    max_count = np.max(counts)
    for c, count in zip(unique, counts):
        class_weights[c] = max_count / count
    return class_weights

def smooth_curve(points, factor=0.8):
    smoothed_points = []
    for point in points:
        if smoothed_points:
            previous = smoothed_points[-1]
            smoothed_points.append(previous * factor + point * (1 - factor))
        else:
            smoothed_points.append(point)
    return smoothed_points

def train_with_kfold(X, y, class_mapping, epochs=100, batch_size=16, n_splits=5, save_path="/content/drive/MyDrive/k_fold_CNN_LSTM_landmark"):
    os.makedirs(save_path, exist_ok=True)
    print(f"\nStarting {n_splits}-fold stratified cross-validation...")
    kfold = StratifiedKFold(n_splits=n_splits, shuffle=True, random_state=42)
    fold_results = []
    fold_histories = []

    for fold, (train_idx, val_idx) in enumerate(kfold.split(X, y)):
        print(f"\nTraining Fold {fold+1}/{n_splits}")
        X_train, X_val = X[train_idx], X[val_idx]
        y_train, y_val = y[train_idx], y[val_idx]

        model = build_model((X_train.shape[1], X_train.shape[2]), len(class_mapping))

        history = model.fit(
            X_train, y_train,
            validation_data=(X_val, y_val),
            epochs=epochs,
            batch_size=batch_size,
            callbacks=get_callbacks(),
            class_weight=calculate_class_weights(y_train),
            verbose=1
        )

        y_pred = model.predict(X_val)
        y_pred_classes = np.argmax(y_pred, axis=1)
        f1 = f1_score(y_val, y_pred_classes, average='weighted', zero_division=0)

        cm = confusion_matrix(y_val, y_pred_classes, normalize='true')

        fold_results.append({
            'val_accuracy': history.history['val_accuracy'][-1],
            'val_loss': history.history['val_loss'][-1],
            'f1_score': f1,
            'confusion_matrix': cm,
            'classification_report': classification_report(
                y_val, y_pred_classes,
                target_names=list(class_mapping.keys()),
                output_dict=True, zero_division=0),
            'y_true': y_val,
            'y_pred': y_pred_classes
        })
        fold_histories.append(history.history)
        model.save(os.path.join(save_path, f'model_fold_{fold+1}.keras'))

    save_training_plots(fold_histories, save_path)
    save_fold_results(fold_results, fold_histories, save_path)
    return fold_results, fold_histories

def save_training_plots(fold_histories, save_path):
    os.makedirs(os.path.join(save_path, 'training_plots'), exist_ok=True)
    plt.figure(figsize=(15, 5))
    plt.subplot(1, 2, 1)
    for h in fold_histories:
        plt.plot(smooth_curve(h['accuracy']), label='Train', alpha=0.5)
        plt.plot(smooth_curve(h['val_accuracy']), label='Val', alpha=0.5)
    plt.title('Smoothed Accuracy Across Folds')
    plt.legend()
    plt.subplot(1, 2, 2)
    for h in fold_histories:
        plt.plot(smooth_curve(h['loss']), label='Train', alpha=0.5)
        plt.plot(smooth_curve(h['val_loss']), label='Val', alpha=0.5)
    plt.title('Smoothed Loss Across Folds')
    plt.legend()
    plt.tight_layout()
    plt.savefig(os.path.join(save_path, 'training_plots', 'combined_training_curves.png'), dpi=300)
    plt.close()

def save_fold_results(fold_results, fold_histories, save_path):
    metrics = []
    for i, (result, history) in enumerate(zip(fold_results, fold_histories)):
        fold_dir = os.path.join(save_path, 'fold_' + str(i+1))
        os.makedirs(fold_dir, exist_ok=True)

        # Confusion matrix
        plt.figure(figsize=(12, 10))
        sns.heatmap(result['confusion_matrix'], annot=True, fmt=".2f", cmap='Blues',
                    xticklabels=list(class_mapping.keys()),
                    yticklabels=list(class_mapping.keys()))
        plt.title(f'Normalized Confusion Matrix - Fold {i+1}')
        plt.ylabel('True Label')
        plt.xlabel('Predicted Label')
        plt.xticks(rotation=45)
        plt.tight_layout()
        plt.savefig(os.path.join(fold_dir, 'confusion_matrix.png'), dpi=300)
        plt.close()

        pd.DataFrame(result['classification_report']).transpose().to_csv(os.path.join(fold_dir, 'classification_report.csv'))
        metrics.append({
            'fold': i+1,
            'val_accuracy': result['val_accuracy'],
            'val_loss': result['val_loss'],
            'f1_score': result['f1_score'],
            'epochs': len(history['loss'])
        })

    df = pd.DataFrame(metrics)
    df.to_csv(os.path.join(save_path, 'all_fold_metrics.csv'), index=False)

    avg = df.mean()
    pd.DataFrame([avg]).to_csv(os.path.join(save_path, 'average_metrics.csv'), index=False)

    print("\nK-Fold Summary:")
    print(df.describe())

# Entry point for Colab
if __name__ == "__main__":
    data_dir = '/content/drive/MyDrive/CVP/processed_landmark_data'
    save_path = "/content/drive/MyDrive/CVP/k_fold_CNN_LSTM_landmark"
    X, y, confidences, class_mapping = load_data(data_dir)
    train_with_kfold(X, y, class_mapping, epochs=100, batch_size=16, n_splits=5, save_path=save_path)
