# ECG Baseline Model Training

This notebook trains a baseline deep learning model on preprocessed ECG data:
1. Loads preprocessed windows and labels from ECG_Preprocessing.ipynb
2. Performs patient-level train/validation/test splitting
3. Builds and trains a 1D CNN baseline model
4. Evaluates performance and saves results

### Step 1: Install Dependencies and Import Libraries
Install TensorFlow/Keras for deep learning and import necessary packages:
- **tensorflow/keras**: Deep learning framework
- **scikit-learn**: Model evaluation metrics and utilities
- **numpy/pandas**: Numerical computing and data manipulation
- **matplotlib**: Visualization of training metrics

In [None]:
!pip install tensorflow scikit-learn matplotlib seaborn

import os
import json
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from datetime import datetime

import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from sklearn.model_selection import train_test_split
from sklearn.metrics import (
    classification_report, confusion_matrix, accuracy_score,
    precision_recall_fscore_support, roc_auc_score
)

# Set random seeds for reproducibility
np.random.seed(42)
tf.random.set_seed(42)

print("TensorFlow version:", tf.__version__)
print("GPU available:", tf.config.list_physical_devices('GPU'))

### Step 2: Configure Paths and Load Preprocessing Artifacts
Set up directories and load the data, labels, and metadata generated by ECG_Preprocessing.ipynb:
- **X**: Preprocessed ECG windows (N, T, C)
- **y**: Integer labels for each window
- **df_qc**: QC metrics and pass/fail information
- **label_mapping**: Mapping between diagnosis codes and integer indices

In [None]:
# ===== Detect if running in Google Colab and mount Drive =====
IN_COLAB = False
try:
    from google.colab import drive  # type: ignore
    IN_COLAB = True
except Exception:
    drive = None
    IN_COLAB = False

if IN_COLAB:
    drive.mount('/content/drive/')

# ===== Define paths =====
if IN_COLAB:
    # Case 1: You manually placed the dataset in MyDrive
    DATA_PATH = "/content/drive/MyDrive/DeepLearningECG/data/"
    ARTIFACT_DIR = "/content/drive/MyDrive/DeepLearningECG/artifacts/"

else:
    # Case 3: Local fallback (if running outside Colab)
    DATA_PATH = "../DeepLearningECG/data/"
    ARTIFACT_DIR = "../DeepLearningECG/artifacts/"


# Path where the WFDB ECG files (.hea/.dat) live.
ECG_DIR = os.path.join(DATA_PATH, "Child_ecg/")

# Output directory for training results
RESULTS_DIR = os.path.join(ARTIFACT_DIR, "training_results")
os.makedirs(RESULTS_DIR, exist_ok=True)

print("DATA_PATH:", DATA_PATH)
print("ARTIFACT_DIR:", ARTIFACT_DIR)
print("ECG_DIR:", ECG_DIR)
print("RESULTS_DIR:", RESULTS_DIR)
print("Available artifacts:", os.listdir(ARTIFACT_DIR))

### Step 3: Load Preprocessed Data and Metadata
Load the ECG windows, labels, and QC information from the preprocessing step:
- **X_windows.npy**: Preprocessed signal windows (shape: N_windows × 5000 × 12)
- **y_labels.npy**: Integer labels for each window
- **qc_summary.csv**: QC metrics and metadata per ECG record
- **label_mapping.json**: Diagnosis code to integer mapping

In [None]:
# Load preprocessed data
X = np.load(os.path.join(ARTIFACT_DIR, "X_windows.npy"))
y = np.load(os.path.join(ARTIFACT_DIR, "y_labels.npy"))
df_qc = pd.read_csv(os.path.join(ARTIFACT_DIR, "qc_summary.csv"))

# Load label mapping
with open(os.path.join(ARTIFACT_DIR, "label_mapping.json"), "r") as f:
    label_mapping = json.load(f)

label_to_int = label_mapping["label_to_int"]
int_to_label = {int(k): v for k, v in label_mapping["int_to_label"].items()}

print("Data shapes:")
print(f"  X: {X.shape} (windows × time_samples × channels)")
print(f"  y: {y.shape}")
print(f"\nLabel distribution:")
unique, counts = np.unique(y, return_counts=True)
for label_idx, count in zip(unique, counts):
    diagnosis = int_to_label[int(label_idx)]
    print(f"  {diagnosis}: {count} windows ({100*count/len(y):.1f}%)")
print(f"\nNumber of unique diagnoses: {len(label_to_int)}")

### Step 4: Perform Patient-Level Train/Validation/Test Split
Split data at the patient level to avoid data leakage:
- Extract unique Patient_IDs from QC summary
- Split patient IDs (not individual windows) into 60% train, 20% val, 20% test
- Group windows by the patient they came from
- This ensures no patient appears in multiple sets

In [None]:
# Extract patient IDs from filenames in df_qc
df_qc['Patient_ID'] = df_qc['Filename'].str.split('/').str[1]

print("Unique patients in QC summary:", df_qc['Patient_ID'].nunique())
print("Total records in QC summary:", len(df_qc))

# Get patient ID for each window
# Map each window to a patient based on which record it came from
window_to_patient = []
for idx, row in df_qc.iterrows():
    n_windows = row['n_windows'] if pd.notna(row['n_windows']) else 0
    window_to_patient.extend([row['Patient_ID']] * int(n_windows))

window_to_patient = np.array(window_to_patient)
print(f"Windows mapped to patients: {len(window_to_patient)} (should match X shape[0]: {X.shape[0]})")

# Verify length matches
if len(window_to_patient) != X.shape[0]:
    print("WARNING: Mismatch between windows and patient mapping!")
    print(f"Expected {X.shape[0]} windows, got {len(window_to_patient)}")

### Step 5: Split Patient IDs and Create Train/Val/Test Sets
Stratified split of unique patients to balance label distribution:
- **Train**: 60% of patients (≈60% of windows)
- **Validation**: 20% of patients (≈20% of windows)
- **Test**: 20% of patients (≈20% of windows)

In [None]:
# Get unique patient IDs and their primary labels
unique_patients = df_qc['Patient_ID'].unique()
patient_labels = []

for patient_id in unique_patients:
    # Get all records for this patient and use the most common label
    patient_records = df_qc[df_qc['Patient_ID'] == patient_id]
    labels = patient_records['label_int'].dropna().values
    if len(labels) > 0:
        # Use most common label for this patient
        most_common_label = int(np.bincount(labels.astype(int)).argmax())
    else:
        most_common_label = -1
    patient_labels.append(most_common_label)

# First split: 80% train+val, 20% test
train_val_patients, test_patients, train_val_labels, test_labels = train_test_split(
    unique_patients, patient_labels,
    test_size=0.2,
    stratify=patient_labels,
    random_state=42
)

# Second split: 75% train (of train+val), 25% val (of train+val) = 60% train, 20% val overall
train_patients, val_patients, train_labels, val_labels = train_test_split(
    train_val_patients, train_val_labels,
    test_size=0.25,
    stratify=train_val_labels,
    random_state=42
)

print(f"Train patients: {len(train_patients)} ({100*len(train_patients)/len(unique_patients):.1f}%)")
print(f"Val patients: {len(val_patients)} ({100*len(val_patients)/len(unique_patients):.1f}%)")
print(f"Test patients: {len(test_patients)} ({100*len(test_patients)/len(unique_patients):.1f}%)")

### Step 6: Create Window Indices for Each Split
Map patient splits back to individual windows:
- For each split (train/val/test), collect all window indices belonging to patients in that split
- Create corresponding X and y arrays for each split

In [None]:
# Create sets for faster lookup
train_patients_set = set(train_patients)
val_patients_set = set(val_patients)
test_patients_set = set(test_patients)

# Get window indices for each split
train_idx = np.where(np.isin(window_to_patient, list(train_patients_set)))[0]
val_idx = np.where(np.isin(window_to_patient, list(val_patients_set)))[0]
test_idx = np.where(np.isin(window_to_patient, list(test_patients_set)))[0]

# Create split datasets
X_train, y_train = X[train_idx], y[train_idx]
X_val, y_val = X[val_idx], y[val_idx]
X_test, y_test = X[test_idx], y[test_idx]

print("Train set:")
print(f"  X_train: {X_train.shape}")
print(f"  y_train: {y_train.shape}")

print("\nValidation set:")
print(f"  X_val: {X_val.shape}")
print(f"  y_val: {y_val.shape}")

print("\nTest set:")
print(f"  X_test: {X_test.shape}")
print(f"  y_test: {y_test.shape}")

# Verify no overlap
assert len(set(train_idx) & set(val_idx)) == 0, "Train/Val overlap detected!"
assert len(set(train_idx) & set(test_idx)) == 0, "Train/Test overlap detected!"
assert len(set(val_idx) & set(test_idx)) == 0, "Val/Test overlap detected!"
print("\n✓ No overlap between train/val/test sets")

### Step 7: Analyze Label Distribution Across Splits
Verify that label distribution is balanced across train/validation/test sets:
- Show class distribution for each split
- Confirm stratification worked correctly

In [None]:
def analyze_label_distribution(y, split_name, int_to_label):
    """Print label distribution for a split."""
    unique, counts = np.unique(y, return_counts=True)
    print(f"\n{split_name} label distribution:")
    for label_idx, count in zip(unique, counts):
        diagnosis = int_to_label[int(label_idx)]
        print(f"  {diagnosis}: {count} ({100*count/len(y):.1f}%)")

analyze_label_distribution(y_train, "Train", int_to_label)
analyze_label_distribution(y_val, "Validation", int_to_label)
analyze_label_distribution(y_test, "Test", int_to_label)

### Step 8: Define Baseline Model Architecture
Build a 1D CNN baseline model for ECG classification:
- **Input**: Time-series ECG windows (5000 time steps × 12 channels)
- **Architecture**: 
  - Multiple 1D convolution layers (32, 64, 128 filters)
  - Max pooling after each conv block
  - Dropout for regularization
  - Global average pooling
  - Dense classification layers
- **Output**: Softmax probabilities over diagnosis classes

In [None]:
def create_baseline_model(input_shape, num_classes):
    """
    Create a 1D CNN baseline model for ECG classification.
    
    Args:
        input_shape: Tuple (time_steps, channels)
        num_classes: Number of diagnosis classes
    
    Returns:
        Compiled Keras model
    """
    model = keras.Sequential([
        # Block 1
        layers.Input(shape=input_shape),
        layers.Conv1D(32, kernel_size=3, padding='same', activation='relu'),
        layers.Conv1D(32, kernel_size=3, padding='same', activation='relu'),
        layers.MaxPooling1D(pool_size=2),
        layers.Dropout(0.2),
        
        # Block 2
        layers.Conv1D(64, kernel_size=3, padding='same', activation='relu'),
        layers.Conv1D(64, kernel_size=3, padding='same', activation='relu'),
        layers.MaxPooling1D(pool_size=2),
        layers.Dropout(0.2),
        
        # Block 3
        layers.Conv1D(128, kernel_size=3, padding='same', activation='relu'),
        layers.Conv1D(128, kernel_size=3, padding='same', activation='relu'),
        layers.MaxPooling1D(pool_size=2),
        layers.Dropout(0.2),
        
        # Block 4
        layers.Conv1D(256, kernel_size=3, padding='same', activation='relu'),
        layers.Conv1D(256, kernel_size=3, padding='same', activation='relu'),
        layers.MaxPooling1D(pool_size=2),
        layers.Dropout(0.2),
        
        # Global average pooling and dense layers
        layers.GlobalAveragePooling1D(),
        layers.Dense(256, activation='relu'),
        layers.Dropout(0.3),
        layers.Dense(128, activation='relu'),
        layers.Dropout(0.3),
        layers.Dense(num_classes, activation='softmax')
    ])
    
    # Compile model
    model.compile(
        optimizer=keras.optimizers.Adam(learning_rate=1e-3),
        loss='sparse_categorical_crossentropy',
        metrics=['accuracy']
    )
    
    return model

# Create and summarize model
num_classes = len(label_to_int)
input_shape = (X_train.shape[1], X_train.shape[2])  # (time_steps, channels)

model = create_baseline_model(input_shape, num_classes)

print("Model architecture:")
model.summary()

### Step 9: Train Baseline Model
Train the model with early stopping and learning rate scheduling:
- **Batch size**: 32 windows
- **Epochs**: Up to 100 (early stopping if no improvement)
- **Callbacks**:
  - Early stopping: Stop if validation loss doesn't improve for 10 epochs
  - Learning rate reduction: Reduce LR by 50% if plateau for 5 epochs
  - Model checkpoint: Save best model based on validation accuracy

In [None]:
# Create callbacks
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
model_checkpoint_path = os.path.join(RESULTS_DIR, f"baseline_model_{timestamp}.h5")

callbacks = [
    keras.callbacks.EarlyStopping(
        monitor='val_loss',
        patience=10,
        restore_best_weights=True,
        verbose=1
    ),
    keras.callbacks.ReduceLROnPlateau(
        monitor='val_loss',
        factor=0.5,
        patience=5,
        min_lr=1e-6,
        verbose=1
    ),
    keras.callbacks.ModelCheckpoint(
        model_checkpoint_path,
        monitor='val_accuracy',
        save_best_only=True,
        verbose=1
    )
]

# Train model
print("Training baseline model...")
history = model.fit(
    X_train, y_train,
    validation_data=(X_val, y_val),
    epochs=100,
    batch_size=32,
    callbacks=callbacks,
    verbose=1
)

print(f"\n✓ Model saved to: {model_checkpoint_path}")

### Step 10: Plot Training History
Visualize model training progress:
- **Left**: Training and validation loss over epochs
- **Right**: Training and validation accuracy over epochs
- Shows whether model is learning, overfitting, or underfitting

In [None]:
fig, axes = plt.subplots(1, 2, figsize=(14, 5))

# Loss plot
axes[0].plot(history.history['loss'], label='Train Loss', linewidth=2)
axes[0].plot(history.history['val_loss'], label='Val Loss', linewidth=2)
axes[0].set_xlabel('Epoch', fontsize=12)
axes[0].set_ylabel('Loss', fontsize=12)
axes[0].set_title('Training and Validation Loss', fontsize=14, fontweight='bold')
axes[0].legend(fontsize=11)
axes[0].grid(True, alpha=0.3)

# Accuracy plot
axes[1].plot(history.history['accuracy'], label='Train Accuracy', linewidth=2)
axes[1].plot(history.history['val_accuracy'], label='Val Accuracy', linewidth=2)
axes[1].set_xlabel('Epoch', fontsize=12)
axes[1].set_ylabel('Accuracy', fontsize=12)
axes[1].set_title('Training and Validation Accuracy', fontsize=14, fontweight='bold')
axes[1].legend(fontsize=11)
axes[1].grid(True, alpha=0.3)

plt.tight_layout()
plt.savefig(os.path.join(RESULTS_DIR, f'training_history_{timestamp}.png'), dpi=150, bbox_inches='tight')
plt.show()

print(f"✓ Training history plot saved")

### Step 11: Evaluate on Test Set
Comprehensive evaluation of model performance on held-out test data:
- Overall accuracy
- Per-class precision, recall, F1-score
- Confusion matrix
- ROC-AUC scores (for multi-class)

In [None]:
# Predictions on test set
y_test_pred_probs = model.predict(X_test, verbose=0)
y_test_pred = np.argmax(y_test_pred_probs, axis=1)

# Calculate metrics
test_accuracy = accuracy_score(y_test, y_test_pred)

print("="*70)
print("TEST SET EVALUATION")
print("="*70)
print(f"\nOverall Accuracy: {test_accuracy:.4f}\n")

# Classification report
print("Per-Class Metrics:")
print("-"*70)
report = classification_report(
    y_test, y_test_pred,
    target_names=[int_to_label[i] for i in range(num_classes)],
    digits=4
)
print(report)

# Validation accuracy for reference
y_val_pred_probs = model.predict(X_val, verbose=0)
y_val_pred = np.argmax(y_val_pred_probs, axis=1)
val_accuracy = accuracy_score(y_val, y_val_pred)

print(f"\nValidation Accuracy (final epoch): {val_accuracy:.4f}")

### Step 12: Visualize Confusion Matrix
Display confusion matrix to identify which diagnoses are confused:
- Rows: True labels
- Columns: Predicted labels
- Diagonal elements show correct predictions
- Off-diagonal elements show misclassifications

In [None]:
# Compute confusion matrix
cm = confusion_matrix(y_test, y_test_pred)

# Normalize for visualization
cm_normalized = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]

# Plot
fig, ax = plt.subplots(figsize=(12, 10))

diagnosis_names = [int_to_label[i] for i in range(num_classes)]
sns.heatmap(
    cm_normalized,
    annot=True,
    fmt='.2%',
    cmap='Blues',
    xticklabels=diagnosis_names,
    yticklabels=diagnosis_names,
    cbar_kws={'label': 'Percentage'},
    ax=ax
)

ax.set_xlabel('Predicted Label', fontsize=12, fontweight='bold')
ax.set_ylabel('True Label', fontsize=12, fontweight='bold')
ax.set_title('Normalized Confusion Matrix - Test Set', fontsize=14, fontweight='bold')
plt.xticks(rotation=45, ha='right')
plt.yticks(rotation=0)
plt.tight_layout()

plt.savefig(os.path.join(RESULTS_DIR, f'confusion_matrix_{timestamp}.png'), dpi=150, bbox_inches='tight')
plt.show()

print("✓ Confusion matrix saved")

### Step 13: Save Training Results and Summary
Export model, results, and metadata for later analysis:
- Save trained model (HDF5 format)
- Save training metrics (JSON)
- Save test predictions and probabilities (NPZ)
- Create summary report

In [None]:
# Save model in different formats
model_h5_path = os.path.join(RESULTS_DIR, f"baseline_model_{timestamp}.h5")
model_keras_path = os.path.join(RESULTS_DIR, f"baseline_model_{timestamp}.keras")

model.save(model_h5_path)
model.save(model_keras_path)
print(f"✓ Model saved to:")
print(f"  - {model_h5_path}")
print(f"  - {model_keras_path}")

# Save metrics and results
results = {
    'timestamp': timestamp,
    'model_type': 'Baseline 1D CNN',
    'input_shape': list(input_shape),
    'num_classes': num_classes,
    'num_parameters': int(model.count_params()),
    'training_samples': X_train.shape[0],
    'validation_samples': X_val.shape[0],
    'test_samples': X_test.shape[0],
    'epochs_trained': len(history.history['loss']),
    'final_train_loss': float(history.history['loss'][-1]),
    'final_val_loss': float(history.history['val_loss'][-1]),
    'final_train_accuracy': float(history.history['accuracy'][-1]),
    'final_val_accuracy': float(history.history['val_accuracy'][-1]),
    'test_accuracy': float(test_accuracy),
    'label_mapping': label_to_int,
}

results_json_path = os.path.join(RESULTS_DIR, f"training_results_{timestamp}.json")
with open(results_json_path, 'w') as f:
    json.dump(results, f, indent=2)
print(f"✓ Results saved to: {results_json_path}")

# Save predictions
predictions_path = os.path.join(RESULTS_DIR, f"test_predictions_{timestamp}.npz")
np.savez(
    predictions_path,
    y_true=y_test,
    y_pred=y_test_pred,
    y_probs=y_test_pred_probs
)
print(f"✓ Test predictions saved to: {predictions_path}")

# Save training history
history_path = os.path.join(RESULTS_DIR, f"training_history_{timestamp}.json")
history_dict = {
    'loss': [float(x) for x in history.history['loss']],
    'accuracy': [float(x) for x in history.history['accuracy']],
    'val_loss': [float(x) for x in history.history['val_loss']],
    'val_accuracy': [float(x) for x in history.history['val_accuracy']],
}
with open(history_path, 'w') as f:
    json.dump(history_dict, f, indent=2)
print(f"✓ Training history saved to: {history_path}")

### Step 14: Create Summary Report
Generate a comprehensive summary report of the training run

In [None]:
summary_report = f"""
╔═══════════════════════════════════════════════════════════════════════╗
║                    BASELINE MODEL TRAINING REPORT                     ║
╚═══════════════════════════════════════════════════════════════════════╝

TRAINING CONFIGURATION
{'-'*73}
Timestamp:              {timestamp}
Model Type:             Baseline 1D CNN
Input Shape:            {input_shape}
Number of Classes:      {num_classes}
Total Parameters:       {results['num_parameters']:,}

DATA SPLITS
{'-'*73}
Train Set:              {X_train.shape[0]:,} windows from {len(train_patients)} patients
Validation Set:         {X_val.shape[0]:,} windows from {len(val_patients)} patients
Test Set:               {X_test.shape[0]:,} windows from {len(test_patients)} patients
Total:                  {X_train.shape[0] + X_val.shape[0] + X_test.shape[0]:,} windows

TRAINING RESULTS
{'-'*73}
Total Epochs Trained:   {results['epochs_trained']}
Final Train Loss:       {results['final_train_loss']:.6f}
Final Train Accuracy:   {results['final_train_accuracy']:.4f}
Final Val Loss:         {results['final_val_loss']:.6f}
Final Val Accuracy:     {results['final_val_accuracy']:.4f}

TEST SET PERFORMANCE
{'-'*73}
Test Accuracy:          {test_accuracy:.4f}

ARTIFACTS SAVED
{'-'*73}
Model (HDF5):           {model_h5_path}
Model (Keras):          {model_keras_path}
Results JSON:           {results_json_path}
Predictions (NPZ):      {predictions_path}
Training History:       {history_path}
Confusion Matrix Plot:  {os.path.join(RESULTS_DIR, f'confusion_matrix_{timestamp}.png')}
Training History Plot:  {os.path.join(RESULTS_DIR, f'training_history_{timestamp}.png')}
Summary Report:         {os.path.join(RESULTS_DIR, f'summary_report_{timestamp}.txt')}

{'-'*73}
Report generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
"""

print(summary_report)

# Save report
report_path = os.path.join(RESULTS_DIR, f"summary_report_{timestamp}.txt")
with open(report_path, 'w') as f:
    f.write(summary_report)

print(f"\n✓ Summary report saved to: {report_path}")