In [1]:
# Data manipulation
import numpy as np
import pandas as pd

# Visualization
import matplotlib.pyplot as plt
import seaborn as sns

# Machine learning utilities
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.metrics import (
    accuracy_score,
    classification_report,
    confusion_matrix,
    roc_auc_score,
)

# Random Forest Classifier
from sklearn.ensemble import RandomForestClassifier

# Neural Network (Optional)
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset

In [2]:
def load_and_preprocess_data(file_path, sequence_length=5):
    """
    Load and preprocess the MIT-BIH Arrhythmia Dataset.
    Reshape the data into sequences of fixed length.
    """
    # Load the dataset
    df = pd.read_csv('C:/Users/abdulssekyanzi/EDA Dataset.csv/100.csv')

    # Drop unnecessary columns
    if 'Unnamed: 0' in df.columns:
        df = df.drop(columns=['Unnamed: 0'])

    # Extract features (ECG signals)
    X = df[['MLII', 'V5']].values  # Features: MLII and V5

    # Handle labels (Assume 'label' column exists; otherwise, generate synthetic labels)
    if 'label' in df.columns:
        y = df['label'].values  # Use existing labels if available
    else:
        print("Generating synthetic labels for testing purposes...")
        y = np.random.randint(0, 5, size=len(X))  # Generate random labels (0 to 4)

    # Encode categorical labels
    label_encoder = LabelEncoder()
    y = label_encoder.fit_transform(y)

    # Create sequences of fixed length
    num_samples = len(X) - sequence_length + 1
    X_seq = np.array([X[i:i + sequence_length] for i in range(num_samples)])  # Shape: (num_samples, sequence_length, 2)
    y_seq = np.array([y[i + sequence_length - 1] for i in range(num_samples)])  # Labels correspond to the last timestamp

    # Split the data into training and testing sets
    X_train, X_test, y_train, y_test = train_test_split(X_seq, y_seq, test_size=0.2, random_state=42)

    # Scale the features (standardize to zero mean and unit variance)
    scaler = StandardScaler()
    X_train = np.array([scaler.fit_transform(seq) for seq in X_train])
    X_test = np.array([scaler.transform(seq) for seq in X_test])

    return X_train, X_test, y_train, y_test, label_encoder.classes_

In [3]:
# Example: Extract statistical features from sequences
def extract_features(X_seq):
    """
    Extract statistical features from ECG signal sequences.
    """
    features = []
    for seq in X_seq:
        mlii_mean = np.mean(seq[:, 0])
        mlii_std = np.std(seq[:, 0])
        v5_mean = np.mean(seq[:, 1])
        v5_std = np.std(seq[:, 1])
        features.append([mlii_mean, mlii_std, v5_mean, v5_std])
    return np.array(features)

# Apply feature extraction
X_train_flat = extract_features(X_train)
X_test_flat = extract_features(X_test)

NameError: name 'X_train' is not defined

In [None]:
def train_random_forest(X_train, y_train, n_estimators=100, max_depth=None):
    """
    Train a Random Forest classifier on the dataset.
    """
    print("Training the Random Forest Classifier...")
    rf_model = RandomForestClassifier(n_estimators=n_estimators, max_depth=max_depth, random_state=42)
    rf_model.fit(X_train, y_train)
    return rf_model

# Train the model
rf_model = train_random_forest(X_train_flat, y_train, n_estimators=200, max_depth=10)

In [None]:
def evaluate_model(rf_model, X_test, y_test, class_names):
    """
    Evaluate the Random Forest model using various metrics and visualizations.
    """
    print("Evaluating the Random Forest Model...")

    # Make predictions
    y_pred = rf_model.predict(X_test)

    # Calculate metrics
    accuracy = accuracy_score(y_test, y_pred)
    print(f"Test Accuracy: {accuracy:.4f}")

    # Classification report
    print("\nClassification Report:")
    print(classification_report(y_test, y_pred, target_names=class_names))

    # Confusion matrix
    cm = confusion_matrix(y_test, y_pred)
    print("\nConfusion Matrix:")
    print(cm)

    # Plot confusion matrix
    plt.figure(figsize=(8, 6))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=class_names, yticklabels=class_names)
    plt.title('Confusion Matrix')
    plt.xlabel('Predicted Labels')
    plt.ylabel('True Labels')
    plt.show()

# Evaluate the model
evaluate_model(rf_model, X_test_flat, y_test, class_names=label_encoder.classes_)

In [None]:
def plot_roc_curve(y_test, y_prob, class_names):
    """
    Plot the ROC curve for multi-class classification.
    """
    lb = LabelBinarizer().fit(y_test)
    y_test_bin = lb.transform(y_test)
    n_classes = len(class_names)

    fpr = dict()
    tpr = dict()
    roc_auc = dict()

    for i in range(n_classes):
        fpr[i], tpr[i], _ = roc_curve(y_test_bin[:, i], y_prob[:, i])
        roc_auc[i] = auc(fpr[i], tpr[i])

    # Plot ROC curves
    plt.figure(figsize=(8, 6))
    for i in range(n_classes):
        plt.plot(fpr[i], tpr[i], label=f'Class {class_names[i]} (AUC = {roc_auc[i]:.2f})')

    plt.plot([0, 1], [0, 1], 'k--')  # Diagonal line
    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')
    plt.title('ROC Curve')
    plt.legend(loc='lower right')
    plt.show()

# Get predicted probabilities
y_prob = rf_model.predict_proba(X_test_flat)

# Plot ROC curve
plot_roc_curve(y_test, y_prob, class_names=label_encoder.classes_)

In [None]:
# Define a simple neural network
class SimpleNN(nn.Module):
    def __init__(self, input_size, num_classes):
        super(SimpleNN, self).__init__()
        self.fc1 = nn.Linear(input_size, 64)
        self.fc2 = nn.Linear(64, num_classes)

    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = self.fc2(x)
        return x

# Convert data to PyTorch tensors
X_train_tensor = torch.tensor(X_train_flat, dtype=torch.float32)
y_train_tensor = torch.tensor(y_train, dtype=torch.long)
X_test_tensor = torch.tensor(X_test_flat, dtype=torch.float32)
y_test_tensor = torch.tensor(y_test, dtype=torch.long)

# Initialize the model
input_size = X_train_flat.shape[1]
num_classes = len(np.unique(y_train))
model = SimpleNN(input_size, num_classes)

# Define loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Training loop
num_epochs = 10
batch_size = 32
train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

for epoch in range(num_epochs):
    model.train()
    epoch_loss = 0
    correct = 0
    total = 0

    for inputs, labels in train_loader:
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        epoch_loss += loss.item()
        _, predicted = torch.max(outputs, 1)
        correct += (predicted == labels).sum().item()
        total += labels.size(0)

    print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {epoch_loss/len(train_loader):.4f}, '
          f'Train Accuracy: {correct / total * 100:.2f}%')