In [17]:
# !pip install wfdb
# !pip install biosppy
# !pip install PyWavelets

In [None]:
import wfdb
import biosppy
import numpy as np
import pandas as pd
import pywt
import matplotlib.pyplot as plt
from sklearn.preprocessing import LabelEncoder, StandardScaler
from sklearn.metrics import classification_report
from sklearn.model_selection import train_test_split

In [None]:
class ECGProcessing:
    """
    A class for processing and analyzing ECG (Electrocardiogram) signals using wavelet transforms.
    Supports loading ECG records, detecting R-peaks, segmenting heartbeats, and extracting wavelet features.
    """
    def __init__(self, wavelet="db6", level=6):
        """
        Initialize ECG processing with specified wavelet type and decomposition level.
        
        Args:
            wavelet (str): Type of wavelet to use (default: 'db6' - Daubechies 6)
            level (int): Level of wavelet decomposition (default: 6)
        """
        # Wavelet transform parameters
        self.wavelet = wavelet
        self.level = level
        
        # Raw data storage
        self.record = None  # Stores the raw ECG record
        self.annotation = None  # Stores beat annotations
        self.annotations = {}  # Dictionary to store annotation symbols
        self.signal = None  # Processed signal data
        self.ecg = None  # ECG data in pandas DataFrame
        self.ecg_numpy = None  # ECG data in numpy array
        
        # R-peak detection results
        self.r_peaks = []  # R-peak locations from Hamilton segmenter
        self.r_peaks_array = []  # R-peaks in numpy array format
        
        # Heartbeat segmentation results
        self.heartbeats = []  # List of segmented heartbeats
        self.time_durations = []  # Duration of each heartbeat segment
        self.rr_intervals_samples = None  # R-R intervals in samples
        self.rr_intervals_seconds = None  # R-R intervals in seconds
        self.heartbeats_array = []  # Heartbeats in numpy array format
        
        # Wavelet analysis results
        self.wavelet_coeffs = []  # Wavelet coefficients for each heartbeat
        self.wavelet_coeffs_array = []  # Coefficients in numpy array format
        
        # Feature extraction results
        self.scaled_features = []  # Scaled feature vectors
        self.features_list = []  # Raw feature vectors
        self.standard_scaler = StandardScaler()  # For feature normalization
        self.label_encoder = LabelEncoder()  # For encoding beat labels
        
        # Tracking dictionaries for analysis
        self.tracked_segments = {}  # Maps R-peaks to heartbeat segments
        self.tracked_coeffs = {}  # Maps R-peaks to wavelet coefficients
        self.tracked_features = {}  # Maps R-peaks to extracted features

    def load_record(self, signal_number, full=False):
        """
        Load an ECG record from the MIT-BIH Arrhythmia Database.
        
        Args:
            signal_number (int): Record number (will be added to 100)
            full (bool): If True, load entire record; if False, load subset
        """
        self.record_number = 100 + signal_number
        filename = f"F:/SDP - II/mit-bih-arrhythmia-database-1.0.0/{str(100 + signal_number)}"
        
        # Load either full record or subset (180-4000 samples)
        if full:
            self.record = wfdb.rdrecord(filename)
            self.annotation = wfdb.rdann(filename, 'atr', shift_samps=True)
        else:
            self.record = wfdb.rdrecord(filename, sampfrom=180, sampto=4000)
            self.annotation = wfdb.rdann(filename, 'atr', sampfrom=180, sampto=4000, shift_samps=True)
        
        # Process signal into different formats
        self.signal = self.record.p_signal
        self.ecg = pd.DataFrame(np.array(
            [
                list(range(len(self.record.adc()))),  # Generate timestamp column
                self.record.adc()[:,0]  # Extract first lead data
            ]
        ).T,
        columns=['TimeStamp', 'ecg'])
        self.ecg_numpy = self.ecg.iloc[:,1].to_numpy()
        
        # Create dictionary of annotations
        for i in range(1, len(self.annotation.sample)):
            self.annotations[self.annotation.sample[i]] = self.annotation.symbol[i]

    def plot_record(self, annotation=False):
        """
        Plot the ECG record with optional annotations.
        
        Args:
            annotation (bool): If True, include beat annotations in plot
        """
        if annotation:
            wfdb.plot_wfdb(record=self.record, annotation=self.annotation, time_units='seconds', figsize=(15, 8))
            wfdb.show_ann_classes()
            wfdb.show_ann_labels()
        else:
            wfdb.plot_wfdb(record=self.record, time_units='seconds', figsize=(15, 8))

    def find_R_peaks(self):
        """
        Detect R-peaks in the ECG signal using Hamilton segmenter algorithm.
        Calculates R-R intervals in both samples and seconds.
        """
        self.r_peaks = biosppy.signals.ecg.hamilton_segmenter(signal=self.ecg_numpy, sampling_rate=self.annotation.fs)
        self.r_peaks_array = np.array(self.r_peaks[0])
        rr_intervals_samples = np.diff(self.r_peaks_array)
        self.rr_intervals_seconds = rr_intervals_samples / self.annotation.fs

    def plot_R_peaks(self):
        """
        Plot the ECG signal with detected R-peaks marked.
        """
        plt.figure(figsize=(20,4), dpi=100)
        plt.xticks(np.arange(0, len(self.ecg_numpy)+1, 150))
        plt.plot(self.ecg_numpy, color='blue')
        plt.scatter(self.r_peaks_array, self.ecg_numpy[self.r_peaks_array], color='red', s=50, marker='*')
        plt.xlabel('Samples')
        plt.ylabel('MLIImV')
        plt.title(f"Record {self.record_number} : R Peak Locations - Hamilton Segmenter")

    def segment_heartbeats(self, before, after):
        """
        Segment individual heartbeats around R-peaks.
        
        Args:
            before (int): Number of samples to include before R-peak
            after (int): Number of samples to include after R-peak
        """
        time_per_sample = 1 / self.annotation.fs
        for r_peak in self.r_peaks_array:
            start, end = r_peak - before, r_peak + after
            if start >= 0 and end < len(self.ecg_numpy):
                segment = self.ecg_numpy[start:end]
                self.heartbeats.append(segment)
                self.tracked_segments[r_peak] = segment
                self.time_durations.append(len(segment) * time_per_sample)

        self.heartbeats_array = np.array(self.heartbeats)

        # Verify consistent segment durations
        if all(x == self.time_durations[0] for x in self.time_durations):
            print(f"Time Duration for each ECG Heartbeat Segment: {self.time_durations[0]:.4f} secs")

    def wavelet_transform(self):
        """
        Apply wavelet transform to each heartbeat segment.
        Uses specified wavelet type and decomposition level from initialization.
        """
        self.wavelet_coeffs = [pywt.wavedec(seg, self.wavelet, level=self.level) for seg in self.heartbeats]
        for r_peak, seg in self.tracked_segments.items():
            self.tracked_coeffs[r_peak] = pywt.wavedec(seg, self.wavelet, level=self.level)
        self.wavelet_coeffs_array = np.array([np.concatenate(coeff) for coeff in self.wavelet_coeffs])

    def _extract_wavelet_features(self, coeffs):
        """
        Extract statistical features from wavelet coefficients.
        
        Args:
            coeffs (list): List of wavelet coefficients
            
        Returns:
            list: Extracted features (mean, standard deviation, energy)
        """
        # Calculate features from approximation coefficients
        features = [
            np.mean(coeffs[0]),  # Mean of approximation coefficients
            np.std(coeffs[0]),   # Standard deviation of approximation coefficients
            np.sum(np.square(coeffs[0]))  # Energy of approximation coefficients
        ]
        return features

    def extract_features(self):
        """
        Extract features from wavelet coefficients for all heartbeat segments.
        Stores features both in list form and mapped to R-peaks.
        """
        for r_peak, coeffs in self.tracked_coeffs.items():
            features = self._extract_wavelet_features(coeffs)
            self.features_list.append(features)
            self.tracked_features[r_peak] = features

    def scale_features(self):
        """
        Normalize extracted features using StandardScaler.
        """
        self.scaled_features = self.standard_scaler.fit_transform(self.features_list)

    def run(self, signal_number, full=False, plot=False):
        """
        Execute complete ECG processing pipeline.
        
        Args:
            signal_number (int): Record number to process
            full (bool): Whether to process full record
            plot (bool): Whether to generate plots at each step
        """
        self.load_record(signal_number, full=full)
        if plot: self.plot_record(annotation=True)
        self.find_R_peaks()
        if plot: self.plot_R_peaks()
        self.segment_heartbeats(before=90, after=162)
        if plot: self.plot_segments(before=90, after=162)
        self.wavelet_transform()
        if plot: self.plot_wavelets()
        self.extract_features()
        self.scale_features()

In [21]:
ecgp = ECGProcessing()

In [None]:
# Initialize lists to collect data across multiple ECG signals
all_scaled_features = []  # Stores scaled features extracted from each ECG signal
all_annotations = []      # Stores annotations for each ECG signal (e.g., classifications like Normal, SVEB, etc.)
all_r_peaks = []          # Stores detected R-peak positions for each ECG signal
all_tracked_features = [] # Stores additional tracked features from each ECG signal

# Loop through the first 20 ECG signals
for i in range(20):
    try:
        # Run ECG processing on signal 'i'
        # Parameters:
        # signal_number=i: process the i-th signal
        # full=True: process the entire signal
        # plot=False: do not plot the signal (to save time and resources)
        ecgp.run(signal_number=i, full=True, plot=False)
    
    # Skip if the specific signal file is not found
    except FileNotFoundError:
        continue
    
    # Append processed data to the respective lists
    all_scaled_features.append(ecgp.scaled_features)    # Store scaled feature set
    all_annotations.append(ecgp.annotations)            # Store annotations (classifications)
    all_r_peaks.append(ecgp.r_peaks_array)              # Store array of R-peak locations
    all_tracked_features.append(ecgp.tracked_features)  # Store additional tracked features


Time Duration for each ECG Heartbeat Segment: 0.7000 secs




Time Duration for each ECG Heartbeat Segment: 0.7000 secs




Time Duration for each ECG Heartbeat Segment: 0.7000 secs




Time Duration for each ECG Heartbeat Segment: 0.7000 secs




Time Duration for each ECG Heartbeat Segment: 0.7000 secs




Time Duration for each ECG Heartbeat Segment: 0.7000 secs




Time Duration for each ECG Heartbeat Segment: 0.7000 secs




Time Duration for each ECG Heartbeat Segment: 0.7000 secs




Time Duration for each ECG Heartbeat Segment: 0.7000 secs




Time Duration for each ECG Heartbeat Segment: 0.7000 secs




Time Duration for each ECG Heartbeat Segment: 0.7000 secs




Time Duration for each ECG Heartbeat Segment: 0.7000 secs




Time Duration for each ECG Heartbeat Segment: 0.7000 secs




Time Duration for each ECG Heartbeat Segment: 0.7000 secs




Time Duration for each ECG Heartbeat Segment: 0.7000 secs




Time Duration for each ECG Heartbeat Segment: 0.7000 secs




Time Duration for each ECG Heartbeat Segment: 0.7000 secs




Time Duration for each ECG Heartbeat Segment: 0.7000 secs




Time Duration for each ECG Heartbeat Segment: 0.7000 secs




In [None]:
from bisect import bisect_left

def find_nearest_annotation(d, sample_num):
    # Extract the sorted list of annotation keys (assumed to be sorted)
    keys = list(d.keys())
    
    # Use binary search to find the insertion point for sample_num in keys
    pos = bisect_left(keys, sample_num)

    # Edge case: if sample_num is smaller than the first key, return the first annotation
    if pos == 0:
        return d[keys[0]]
    
    # Edge case: if sample_num is larger than the last key, return the last annotation
    if pos == len(keys):
        return d[keys[-1]]

    # Identify the two closest keys: one before and one after sample_num
    before = keys[pos - 1]
    after = keys[pos]
    
    # Determine which key is closer to sample_num
    # Return the annotation associated with the nearest key
    nearest_key = after if (after - sample_num) < (sample_num - before) else before
    return d[nearest_key]


In [24]:
def map_annotation(annotation_symbol):
    annotation_map = {
        'N': 1, 'L': 1, 'R': 1, 'e': 1, 'j': 1,  # Normal
        'A': 2, 'a': 2, 'J': 2, 'S': 2,          # SVEB
        'V': 3, 'E': 3,                          # VEB
        'F': 4,                                  # Fusion
        'p': 5, '/': 5, 'f': 5, 'Q': 5           # Unknown
    }
    return annotation_map.get(annotation_symbol, 0)  # Default to 0 if not in map

In [None]:
# Initialize lists to store processed features and annotations
ultra_new_features = []     # Will hold scaled features for each ECG signal
ultra_new_annotations = []  # Will hold mapped annotations for each ECG signal

# Loop over each dictionary in all_tracked_features
# Each dictionary corresponds to an ECG signal with R-peaks as keys and feature segments as values
for i, dictionary in enumerate(all_tracked_features):
    temp1 = []  # Temporary list to store features for the current signal
    temp2 = []  # Temporary list to store mapped annotations for the current signal
    
    # Iterate through each R-peak and its corresponding feature segment
    for r_peak, segment in dictionary.items():
        # Find the nearest annotation for the current R-peak
        nearest_annotation = find_nearest_annotation(all_annotations[i], r_peak)
        
        # Append the segment (feature data) to temp1
        temp1.append(segment)
        
        # Map the nearest annotation to a numerical label or category and add it to temp2
        temp2.append(map_annotation(nearest_annotation))
    
    # Scale features for the current signal and store them in ultra_new_features
    # StandardScaler is used to standardize the features to have mean 0 and standard deviation 1
    ultra_new_features.append(StandardScaler().fit_transform(temp1))
    
    # Store the processed annotations in ultra_new_annotations
    ultra_new_annotations.append(temp2)


In [None]:
np_ultra_new_features = np.array(ultra_new_features) # Converting into numpy array

In [None]:
np_ultra_new_annotations = np.array(ultra_new_annotations) # Converting into numpy array

In [None]:
import numpy as np
import tensorflow as tf
from tensorflow.keras import layers, models
from sklearn.preprocessing import LabelEncoder
from tensorflow.keras.utils import to_categorical

def prepare_data(X, y):
    """
    Prepare input data and labels for neural network training by encoding labels
    and converting them to one-hot vectors.
    
    Args:
        X: Input features array
        y: Target labels array (can be multi-dimensional)
    
    Returns:
        tuple: (
            X: Original input features,
            y_onehot: One-hot encoded labels,
            label_encoder: Fitted LabelEncoder for inverse transformation
        )
    """
    # Initialize the label encoder for converting categorical labels to integers
    label_encoder = LabelEncoder()

    # Reshape labels to 1D array for encoding
    # This is necessary because LabelEncoder only works with 1D arrays
    flat_y = y.reshape(-1)
    
    # Fit the encoder to learn all unique classes and transform labels to integers
    label_encoder.fit(flat_y)

    # Transform labels back to integers while preserving original array shape
    # This maintains the structure of sequential or multi-dimensional labels
    y_encoded = label_encoder.transform(flat_y).reshape(y.shape)

    # Get the total number of unique classes from the encoder
    num_classes = len(label_encoder.classes_)

    # Convert integer-encoded labels to one-hot vectors
    # Each label becomes a binary vector where the 1 is at the index of the class
    # This is required for categorical crossentropy loss in neural networks
    y_onehot = to_categorical(y_encoded, num_classes=num_classes)

    # Return processed data and the encoder for potential inverse transformation later
    return X, y_onehot, label_encoder

In [None]:
def create_lstm_model(input_shape, num_classes):
    """
    Create a deep LSTM model for sequence classification/prediction.
    
    Architecture:
    - Two LSTM layers for sequence processing
    - TimeDistributed Dense layers for predictions at each timestep
    - Dropout layers for regularization
    
    Args:
        input_shape (tuple): Shape of input data (timesteps, features)
        num_classes (int): Number of classes for classification
    
    Returns:
        models.Sequential: Compiled Keras model ready for training
    """
    model = models.Sequential([
        # First LSTM layer
        # - 128 units determines the dimensionality of the output space
        # - return_sequences=True outputs the hidden state for each timestep
        # - input_shape specifies the expected input dimensions
        layers.LSTM(128, return_sequences=True, input_shape=input_shape),
        
        # Dropout layer to prevent overfitting
        # - Randomly sets 30% of input units to 0 during training
        layers.Dropout(0.3),

        # Second LSTM layer for deeper feature extraction
        # - 64 units creates a smaller representation
        # - return_sequences=True needed for timestep-wise predictions
        layers.LSTM(64, return_sequences=True),
        
        # Another dropout layer for regularization
        layers.Dropout(0.3),

        # TimeDistributed wrapper allows the Dense layer to be applied
        # independently to each timestep
        # - 32 neurons for dimensionality reduction
        # - ReLU activation for non-linearity
        layers.TimeDistributed(layers.Dense(32, activation='relu')),
        
        # Final dropout layer before output
        layers.Dropout(0.2),

        # Output layer wrapped in TimeDistributed
        # - One prediction per timestep
        # - num_classes neurons for class probabilities
        # - Softmax activation for probability distribution
        layers.TimeDistributed(layers.Dense(num_classes, activation='softmax'))
    ])

    # Configure the training process
    model.compile(
        # Adam optimizer for adaptive learning rate
        optimizer='adam',
        # Categorical crossentropy for multi-class classification
        loss='categorical_crossentropy',
        # Track accuracy during training
        metrics=['accuracy']
    )

    return model

In [None]:
def train_model(X, y, validation_split=0.2, epochs=50, batch_size=32):
    """
    Train an LSTM model on sequential data with early stopping.
    
    Args:
        X (numpy.ndarray): Input features array of shape (sequences, timesteps, features)
        y (numpy.ndarray): Target labels array
        validation_split (float): Fraction of data to use for validation (default: 0.2)
        epochs (int): Maximum number of training epochs (default: 50)
        batch_size (int): Number of samples per gradient update (default: 32)
    
    Returns:
        tuple: (
            model: Trained Keras model,
            history: Training history,
            label_encoder: LabelEncoder for inverse transformation
        )
    """
    # Extract and print data dimensions for verification
    num_sequences, timesteps, features = X.shape
    print(f"Input shape: {X.shape}")
    print(f"Number of sequences: {num_sequences}")
    print(f"Timesteps per sequence: {timesteps}")
    print(f"Features per timestep: {features}")

    # Prepare the data for training
    # - Encode labels to integers
    # - Convert to one-hot vectors
    X_processed, y_processed, label_encoder = prepare_data(X, y)

    # Create the LSTM model
    # - Get number of classes from encoder
    # - Configure input shape based on data dimensions
    num_classes = len(label_encoder.classes_)
    model = create_lstm_model(input_shape=(timesteps, features), num_classes=num_classes)

    # Display model architecture and parameters
    model.summary()

    # Train the model
    history = model.fit(
        X_processed, y_processed,
        # Hold out portion of data for validation
        validation_split=validation_split,
        # Maximum number of training iterations
        epochs=epochs,
        # Number of samples per gradient update
        batch_size=batch_size,
        # Add callbacks for training control
        callbacks=[
            # Early stopping to prevent overfitting
            tf.keras.callbacks.EarlyStopping(
                # Monitor validation loss for improvement
                monitor='val_loss',
                # Number of epochs with no improvement before stopping
                patience=5,
                # Keep the model weights from the epoch with best validation loss
                restore_best_weights=True
            )
        ]
    )

    # Return the trained model, training history, and encoder for later use
    return model, history, label_encoder

In [31]:
model, history, label_encoder = train_model(np_ultra_new_features, np_ultra_new_annotations)

Input shape: (19, 38897, 3)
Number of sequences: 19
Timesteps per sequence: 38897
Features per timestep: 3
Model: "sequential_1"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 lstm_2 (LSTM)               (None, 38897, 128)        67584     
                                                                 
 dropout_3 (Dropout)         (None, 38897, 128)        0         
                                                                 
 lstm_3 (LSTM)               (None, 38897, 64)         49408     
                                                                 
 dropout_4 (Dropout)         (None, 38897, 64)         0         
                                                                 
 time_distributed_2 (TimeDi  (None, 38897, 32)         2080      
 stributed)                                                      
                                                                 
 dropout_5 (D

In [32]:
import os
import pickle
def save_model(model, label_encoder, save_dir='saved_model_new'):
    """
    Save both the model and label encoder
    
    Parameters:
    model: Trained Keras model
    label_encoder: Fitted sklearn LabelEncoder
    save_dir: Directory to save the model and encoder
    """
    # Create directory if it doesn't exist
    os.makedirs(save_dir, exist_ok=True)
    
    # Save the model
    model.save(os.path.join(save_dir, 'lstm_model'))
    
    # Save the label encoder
    with open(os.path.join(save_dir, 'label_encoder.pkl'), 'wb') as f:
        pickle.dump(label_encoder, f)
    
    print(f"Model and label encoder saved to {save_dir}")

In [33]:
save_model(model, label_encoder)

INFO:tensorflow:Assets written to: saved_model_new\lstm_model\assets


INFO:tensorflow:Assets written to: saved_model_new\lstm_model\assets


Model and label encoder saved to saved_model_new


In [35]:
def detailed_evaluation(model, X_test, y_test_raw, label_encoder):
    """
    Generate detailed classification metrics and visualizations
    """
    # Get predictions
    y_pred_proba = model.predict(X_test)
    y_pred = y_pred_proba.argmax(axis=2)
    
    # Flatten the arrays for evaluation
    y_true_flat = y_test_raw.reshape(-1)
    y_pred_flat = label_encoder.inverse_transform(y_pred.reshape(-1))
    
    # 1. Classification Report
    print("\nDetailed Classification Report:")
    report = classification_report(y_true_flat, y_pred_flat)
    print(report)
    
    # Save report as DataFrame for better visualization
    report_dict = classification_report(y_true_flat, y_pred_flat, output_dict=True)
    report_df = pd.DataFrame(report_dict).transpose()
    print("\nClassification Report as DataFrame:")
    print(report_df)
    
    total_correct = np.sum(y_true_flat == y_pred_flat)
    total_samples = len(y_true_flat)
    
    print("\nOverall Metrics:")
    print(f"Total Samples: {total_samples}")
    print(f"Correct Predictions: {total_correct}")
    print(f"Overall Accuracy: {total_correct/total_samples:.4f}")
    
    # return report_df, cm

In [36]:
X_train, X_test, y_train, y_test = train_test_split(
        np_ultra_new_features, np_ultra_new_annotations, test_size=0.2, random_state=42
    )
detailed_evaluation(model, X_test, y_test, label_encoder)


Detailed Classification Report:
              precision    recall  f1-score   support

           0       0.00      0.00      0.00       604
           1       0.99      1.00      0.99    153636
           2       0.00      0.00      0.00       320
           3       0.00      0.00      0.00       760
           5       0.00      0.00      0.00       268

    accuracy                           0.99    155588
   macro avg       0.20      0.20      0.20    155588
weighted avg       0.98      0.99      0.98    155588



  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))



Classification Report as DataFrame:
              precision    recall  f1-score        support
0              0.000000  0.000000  0.000000     604.000000
1              0.987454  1.000000  0.993687  153636.000000
2              0.000000  0.000000  0.000000     320.000000
3              0.000000  0.000000  0.000000     760.000000
5              0.000000  0.000000  0.000000     268.000000
accuracy       0.987454  0.987454  0.987454       0.987454
macro avg      0.197491  0.200000  0.198737  155588.000000
weighted avg   0.975065  0.987454  0.981221  155588.000000

Overall Metrics:
Total Samples: 155588
Correct Predictions: 153636
Overall Accuracy: 0.9875
