# 12-lead ECG disease prediction

## Loading data

In [None]:
# !python -V > full_requirements.txt && pip list --format=freeze >> full_requirements.txt

In [1]:
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
import warnings
warnings.filterwarnings('ignore')

import joblib
from pathlib import Path
%matplotlib inline

# Import custom modules
from src.data_processing import load_processed_data


Functions

In [2]:
def classify_cardiac_condition(diagnosis_codes):
    pathology_hierarchy = {
        'Structural': {
            'codes': [
                '22298006',   # Myocardial Infarction
                '164865005',  # ST Segment Elevation
                '164867002',  # ST Segment Depression
                '164873001',  # Left Ventricular Hypertrophy
                '164874007'   # Right Ventricular Hypertrophy
            ],
            'severity': 5
        },
        'Arrhythmia': {
            'codes': [
                '49436004',   # Atrial Fibrillation
                '164896001',  # Ventricular Fibrillation
                '426761007',  # Ventricular Tachycardia
                '427172004'   # Premature Ventricular Contractions
            ],
            'severity': 4
        },
        'Conduction': {
            'codes': [
                '27885002',   # Complete Heart Block
                '445211001',  # 2nd Degree Atrioventricular Block
                '28189009',   # Left Bundle Branch Block
                '59118001'    # Right Bundle Branch Block
            ],
            'severity': 3
        },
        'Rhythm Variant': {
            'codes': [
                '251146004',  # Sinus Tachycardia
                '427393009',  # Sinus Bradycardia
                '195126007'   # Atrial Tachycardia
            ],
            'severity': 2
        },
        'Normal': {
            'codes': ['426783006'],  # Normal Sinus Rhythm
            'severity': 1
        },
        'Other': {
            'codes': [],  # Catch-all for unclassified diagnoses
            'severity': 0
        }
    }
    
    # Find matching categories
    matching_categories = [
        category for category, details in pathology_hierarchy.items()
        if any(code in diagnosis_codes for code in details['codes'])
    ]
    
    # If multiple matches, select highest severity
    if matching_categories:
        return max(
            matching_categories, 
            key=lambda cat: pathology_hierarchy[cat]['severity']
        )
    
    return 'Other'

Only Execute first time (to define index split)

In [None]:
# # Try loading preprocessed data
# data_path = '/Users/marcpalomer/Documents/Personal/ECG_prediction/Results/processed_data'
# try:
#    ecg_data, patient_data = load_processed_data(data_path)
#    print(f"Loaded data for {len(ecg_data)} patients")
# except (FileNotFoundError, StopIteration):
#    raise FileNotFoundError("Preprocessed data not found. Please run data_processing.py first")


# # Define stratification based on diagnosis
# patient_data['diagnosis_class'] = patient_data['diagnosis_code'].apply(classify_cardiac_condition)

# # Create development and validation sets (70/30 split)
# dev_indices, val_indices = train_test_split(
#     patient_data.index,
#     test_size=0.7,
#     random_state=42,
#     stratify=patient_data['diagnosis_class']
# )

# del ecg_data
# del patient_data

# # Save indices for reproducibility
# np.save('./Results/dev_indices.npy', dev_indices)
# np.save('./Results/val_indices.npy', val_indices)

 ## 2. Initial Data Split

In [None]:
# Load validation indices
dev_indices = np.load('./Results/dev_indices.npy', allow_pickle=True)
val_indices = np.load('./Results/val_indices.npy', allow_pickle=True)


In [None]:
data_path = './Results/processed_data'
try:
   dev_ecg_dict, dev_data = load_processed_data(data_path, indices= dev_indices)
   print(f"Loaded data for {len(dev_data)} patients")
except (FileNotFoundError, StopIteration):
   raise FileNotFoundError("Preprocessed data not found. Please run data_processing.py first")

dev_ecg = {k: v['ecg_signals_filtered'] for k, v in dev_ecg_dict.items()}
del dev_ecg_dict

print("Development set size:", len(dev_indices))
print("Validation set size:", len(val_indices))



In [None]:
dev_ecg['JS39860'].head()

In [None]:
dev_data.head()

## Illness distribution and filtering

In [None]:
dev_data['diagnosis_label'] = dev_data['diagnosis_code'].apply(classify_cardiac_condition)
dev_labels = dev_data['diagnosis_label']
dev_labels.value_counts()

In [None]:
# Apply to dataframe
dev_labels.hist()
plt.show()

## Downsampling others class

In [None]:
# Assume 'labels' is a pandas Series containing the class labels
largest_class_size =dev_labels.value_counts()[2]
largest_class_size

In [None]:
# Determine the size of the largest non-'Other' class

# Downsample the 'Other' class
others_indices = dev_labels[dev_labels == 'Other'].index
downsampled_others = np.random.choice(others_indices, size=largest_class_size, replace=False)

# Create balanced dataset
balanced_indices = dev_labels[dev_labels != 'Other'].index.tolist() + list(downsampled_others)

# Update dev_ecg and dev_labels
dev_ecg_downsized = {pid: dev_ecg[pid] for pid in dev_ecg.keys() if pid in balanced_indices}
dev_labels_downsized = dev_labels.loc[balanced_indices]
dev_data_downsized = dev_data.loc[balanced_indices]

In [None]:
# Apply to dataframe
dev_labels_downsized.hist()
plt.show()

In [None]:
print(f"Original ECG records: {len(dev_ecg)}")
print(f"Downsampled ECG records: {len(dev_ecg_downsized)}")

### HRV calculation in lead II (gold standard) (it is a time saries for patient so like a new lead...)

In [None]:
import neurokit2 as nk
import numpy as np

def validate_rpeaks(rpeaks, fs):
    # Remove physiologically impossible R-peaks
    rr_intervals = np.diff(rpeaks) / fs
    valid_rr = (rr_intervals >= 0.2) & (rr_intervals <= 2.0)  
    valid_peaks = rpeaks[1:][valid_rr]
    return valid_peaks

def calculate_hr_metrics(rpeaks, fs):
    rr_intervals = np.diff(rpeaks) / fs
    hr = 60 / rr_intervals
    return np.median(hr), np.mean(hr), np.std(hr), np.min(hr), np.max(hr)

def calculate_heartrate(record, fs):
    # Find R-peaks using neurokit2
    rpeaks = list(nk.ecg_findpeaks(record, sampling_rate=fs).values())[0]
    rpeaks = validate_rpeaks(rpeaks, fs)
    return calculate_hr_metrics(rpeaks, fs)

def add_hr_metrics(patient_data, ecg_data):
    metrics = {'median_hr': [], 'mean_hr': [], 'std_hr': [], 'min_hr': [], 'max_hr': []}
    
    for id in patient_data.index:
        if id in ecg_data:
            lead_II = ecg_data[id].loc[:,'II']
            try:
                median_hr, mean_hr, std_hr, min_hr, max_hr = calculate_heartrate(lead_II, fs=500)
            except:
                median_hr, mean_hr, std_hr, min_hr, max_hr = [np.nan,np.nan,np.nan,np.nan,np.nan]
            metrics['median_hr'].append(median_hr)
            metrics['mean_hr'].append(mean_hr)
            metrics['std_hr'].append(std_hr)
            metrics['min_hr'].append(min_hr)
            metrics['max_hr'].append(max_hr)
        else:
            for key in metrics:
                metrics[key].append(None)
    
    for metric, values in metrics.items():
        patient_data[metric] = values
    
    return patient_data

dev_hrv = add_hr_metrics(dev_data_downsized, dev_ecg_downsized)
dev_hrv.head()

# ML Classifier

In [None]:
analyse_features= ['median_hr',	'mean_hr','std_hr',	'min_hr', 'max_hr', 'age']
ML_dataset = dev_hrv[analyse_features+['diagnosis_label']]
# arrythmia_ml_dataset = ML_dataset[ML_dataset['cardiac_condition'].isin(['Normal','Arrhythmia'])]
ML_dataset.value_counts('diagnosis_label')

In [None]:
ML_dataset.head()

In [None]:
from sklearn.model_selection import train_test_split
import src.RF_pipeline as RF_pipeline
import importlib
importlib.reload(RF_pipeline)


def prepare_data(df, target_col, features=None, test_size=0.2, random_state=42):
    """Main function to analyze the model."""
    if features is None:
        features = [col for col in df.columns if col != target_col]
    
    X = df[features]
    y = df[target_col]

    analyzer = RF_pipeline.RandomForestAnalyzer()
    visualizer = RF_pipeline.ModelVisualizer()
    # Preprocess data
    X_processed, y_processed, y_mapping = analyzer.preprocess(X,y)
    
    # Split the data into train and test sets
    X_train, X_test, y_train, y_test = train_test_split(X_processed, y_processed, test_size=test_size, random_state=random_state)
    


    return X_train, X_test, y_train, y_test, y_mapping, analyzer, visualizer, features
   
X_train, X_test, y_train, y_test, y_mapping, analyzer, visualizer, features = prepare_data(df=ML_dataset, target_col='diagnosis_label', features=None)

In [None]:
X_train.head()

In [None]:
y_train

## Cross validated performance

In [None]:
# Perform cross-validation
cv_results = analyzer.cross_validate(X_train, y_train)

# Visualize cross-validation results
visualizer.plot_cv_results(scores=cv_results, multiclass=True)

# Optional: Print detailed metrics

metrics = ['test_accuracy', 'test_precision_macro', 'test_recall_macro', 'test_f1_macro', 'test_balanced_accuracy']
stats = ['Mean', 'Std']

# Create empty lists to store values
means = [np.mean(cv_results[metric]) for metric in metrics]
stds = [np.std(cv_results[metric]) for metric in metrics]

# Create the dataframe
df = pd.DataFrame({
    'Metric': metrics,
    'Mean': means,
    'Std': stds
})
# Optional: Round the values to 4 decimal places
df[['Mean', 'Std']] = df[['Mean', 'Std']].round(4)

# Display the dataframe
df

In [None]:
y_mapping

In [None]:
# Train the model
rf_model = analyzer.get_fitted_model(X_train, y_train)

# Predict probabilities on test set
y_pred_proba = rf_model.predict_proba(X_test)

# Get class names from encoding dictionary
classes = list(y_mapping.keys())

# Plot multiclass ROC curves
roc_auc = visualizer.plot_multiclass_roc(y_test, y_pred_proba, classes)

# Print AUC for each class
# print("\nClass-wise AUC:")
# for cls, auc_val in zip(classes, roc_auc.values()):
#     print(f"{cls}: {auc_val:.4f}")

In [None]:
y_mapping

## Decision boundary display

In [None]:
from sklearn.decomposition import PCA
def plot_5class_decision_space(X, y, model, class_names):
   color_mapping = {
       0: 'red', 1: 'blue', 2: 'green', 
       3: 'orange', 4: 'purple', 5: 'brown'
   }
   
   reducer = PCA(n_components=2)
   X_reduced = reducer.fit_transform(X)
   
   x_min, x_max = X_reduced[:, 0].min() - 1, X_reduced[:, 0].max() + 1
   y_min, y_max = X_reduced[:, 1].min() - 1, X_reduced[:, 1].max() + 1
   xx, yy = np.meshgrid(
       np.linspace(x_min, x_max, 100),
       np.linspace(y_min, y_max, 100)
   )
   
   grid_reduced = np.c_[xx.ravel(), yy.ravel()]
   grid_original = reducer.inverse_transform(grid_reduced)
   
   Z_proba = model.predict_proba(grid_original)
   Z = np.argmax(Z_proba, axis=1)
   
   plt.figure(figsize=(12, 10))
   
   # Create a custom colormap with alpha
   colors = [color_mapping[cls] for cls in np.unique(Z)]
   alphas = [0.2] * len(colors)
   
   # Plot decision boundaries with unique colors
   plt.tricontourf(grid_reduced[:, 0], grid_reduced[:, 1], Z, 
                   levels=len(np.unique(Z)), 
                   colors=colors, 
                   alpha=0.2)
   
   inverted_classnames = {v: k for k, v in class_names.items()}
   for cls in np.unique(y):
       mask = (y == cls)
       plt.scatter(
           X_reduced[mask, 0], 
           X_reduced[mask, 1], 
           color=color_mapping[cls],
           edgecolor='black', 
           s=30,
           label=inverted_classnames[cls]
       )
   plt.legend(title='Classes', loc='best', markerscale=2)   
   plt.title('5-Class Decision Boundaries')
   plt.xlabel('PCA Component 1')
   plt.ylabel('PCA Component 2')
   plt.show()
   
   return X_reduced

# Usage example
X_pca = plot_5class_decision_space(X_test, y_test, model=rf_model, class_names=y_mapping)

In [None]:
y_pred = rf_model.predict(X_test)
visualizer.plot_multiclass_confusion_matrix(y_test, y_pred, list(y_mapping.keys()), normalise = 1)

# RF Performance

It is clear both from PCA space coloring and from confussion matrix that RF model is able to identify "Normal" patients vs other categories, but is not so good at differentiating between the different cardiopathological families we have defined. 

An interesting approach to implement here would be to generate an ensamble model, first lasssifying between Normal and abnormal and later trying to differentiate between the different abnormalities.

## Train final ML dataset

In [None]:
# Prepare full training data
X_full_train = pd.concat([X_train, X_test])
y_full_train = np.concatenate([y_train, y_test])

# Train final model
final_model = RF_pipeline.RandomForestClassifier(
    n_estimators=100, 
    random_state=42
)
final_model.fit(X_full_train, y_full_train)

# Optional: Save the model
joblib.dump(final_model, './Results/final_ml_model.joblib')

# Verify model performance
y_pred = final_model.predict(X_full_train)
visualizer.plot_multiclass_confusion_matrix(y_full_train, y_pred, list(y_mapping.keys()))

RF model is correltly overfitting in train, as it is expected in such method, meaning that possibly it is at lease catching the patterns inside data. Further testing on external validation data will assess if such learned pattern corresponds to signal in the data or to noise in the data.

## Save final model

In [None]:
# After training
analyzer = RF_pipeline.RandomForestAnalyzer()
analyzer.save_model(final_model, X_full_train)

# DL CNN prediction: 
- Will we beat HRV RF classification?

### Training Phase
1. **Data Preparation**
  - Raw ECG dictionary + labels → `prepare_data()` → `normalize_signals()`
  - Train/val/test split
  - Dataset & DataLoader creation for batching

2. **Training Cycle**
  - DataLoader feeds batches to ModelTrainer
  - Forward pass through ECGNet
  - Loss calculation, backpropagation
  - Validation performance check
  - Save best model
  - Track metrics history

### Evaluation Phase
1. **Model Assessment**
  - Load best model weights
  - Full forward pass on test set
  - Generate predictions/probabilities

2. **Results**
  - Performance metrics calculation
  - Visualization generation
  - Save all results

## DL Architecture

### Input Processing
- 12-lead ECG signals
- 5000 timepoints per lead
- Normalized per lead

### Feature Extraction
- Conv1d (k=50): QRS complex detection
- Conv1d (k=7): Wave morphology
- Conv1d (k=5): Fine details
- Increasing channels (12→32→64→128) for feature hierarchy

### Each Conv Block
- BatchNorm: Training stability
- ReLU: Non-linearity
- MaxPool: Dimension reduction

### Classification
- AdaptivePool: Fixed output size
- FC layers (6400→256→64→2)
- Dropout layers prevent overfitting
- Output: Binary classification probabilities

## Label encoding

In [None]:
y_mapping

In [None]:


dev_labels_downsized_array = dev_labels_downsized.map(y_mapping)

print("\nLabel encoding dictionary:")
for code, label in y_mapping.items():
    print(f"{label} -> {code}")

print("\nLabel distribution:")
print(dev_labels_downsized_array.value_counts())

In [None]:
# Assuming your data is in a dictionary like {patient_id: DataFrame(500x12)}
dev_ecg_downsized_array = []
for patient_id in dev_ecg_downsized:
    df = dev_ecg_downsized[patient_id]
    signal = df.values.T  # Transpose to get (12, 500)
    dev_ecg_downsized_array.append(signal)

dev_ecg_downsized_array = np.array(dev_ecg_downsized_array)  # Shape: (n_patients, 12, 500)
dev_labels_downsized_array = np.array(dev_labels_downsized_array)

In [None]:
dev_ecg_downsized_array.shape

In [None]:
dev_labels_downsized_array

## DL model training/loading

In [None]:
# Store variables to keep
keep = ['importlib', 'CNN_pipeline', 'tf','np','train_test_split','model', 'X_val', 'y_val', 'signals', 'labels', 'y_mapping']
import gc

def clean_memory(keep_vars=[]):
   """
   Free memory while keeping specified variables.
   Args:
       keep_vars (list): Names of variables to keep
   """
   # Store variables to keep
   saved = {var: globals()[var] for var in keep_vars if var in globals()}
   
   # Clear globals
   for var in list(globals()):
       if var not in ['gc', 'clean_memory'] + keep_vars:
           del globals()[var]
           
   # Restore saved variables
   globals().update(saved)
   
   # Force garbage collection
   gc.collect()

# Usage example:
# clean_memory(keep)

In [None]:
# import src.CNN_pipeline as CNN_pipeline
# importlib.reload(CNN_pipeline)
# # Initialize model
# model = CNN_pipeline.ECGClassifier(
#     input_shape=(dev_ecg_downsized_array.shape[1], dev_ecg_downsized_array.shape[2]), 
#     encode_dict=y_mapping
# )

# X_train_CNN, X_val_CNN, y_train_CNN, y_val_CNN = train_test_split(dev_ecg_downsized_array, dev_labels_downsized_array, test_size=0.25)
# # del signals
# # del labels


In [None]:
# len(X_train_CNN)

In [None]:
# len(X_val_CNN)

In [None]:
# np.unique(y_train_CNN, return_counts=True)


In [None]:
# np.unique(y_val_CNN, return_counts=True)


CNN model is nicely prepared to get trained on n different classes of data. It consists of the structure mentioned above plus some callbacks:

- EarlyStopping:

    - Stops training when validation loss stops improving
    - Prevents overfitting
    - Saves best model weights


- ModelCheckpoint:

    - Saves model weights at best validation performance
    - Allows recovery of best model after training


- ReduceLROnPlateau:

    - Dynamically reduces learning rate when metrics plateau
    - Helps model fine-tune and escape local minima

In [None]:
# # Train
# del X_train
# del y_train
# history = model.train(X_train_CNN, y_train_CNN, X_val_CNN, y_val_CNN)

In [None]:
# # Load weights
# model.load_model('./Results/final_CNN_model/best_model.keras')

In [None]:
# # Evaluate
# importlib.reload(CNN_pipeline)

# metrics = model.evaluate(X_train_CNN, y_train_CNN)

In [None]:
# metrics = model.evaluate(X_val_CNN, y_val_CNN)

It is clear from both the train and validation plots that the model is not able to fit the training data, even less generalise corrrectly. The model is underfitting our data. What can we do to solve this?

Farzad Nobar has a cool article on this: https://medium.com/towards-data-science/machine-learning-basics-i-look-for-in-data-scientist-interviews-a6ff25be38c9

It indicates that we can increase model complexity (not interesting), for complex models increase training size (yes interesting) or decrease regularizations or other techniques to prevent overfitting (as dropout etc) (yes interesting)

- Lets think about this for some days...

# Alternative CNN approach

In [None]:
print("Labels range:", dev_labels_downsized_array.min(), "-", dev_labels_downsized_array.max())
print("Unique labels:", np.unique(dev_labels_downsized_array))

In [None]:
print("y_mapping:", y_mapping)

In [None]:
config = {
    'batch_size': 32,
    'learning_rate': 0.001,
    'num_epochs': 50,
    'seed': 42
}
import Alternative_CNN
importlib.reload(Alternative_CNN)

results = Alternative_CNN.main(signals=dev_ecg_downsized_array, labels = dev_labels_downsized_array, label_mapping=y_mapping, config = config)

Copy the plots + explaination of what happened