In [3]:
import pandas as pd

# Load the CSV files
records_csv = pd.read_csv("/home/ppremnat/PTB-XL/ptbxl_database.csv", usecols=['ecg_id', 'patient_id', 'scp_codes', 'filename_lr'])

# Define the diagnostic classes
diagnostics = {
    "NORM": ['NORM', 'CSD'],
    "STTC": ['NDT', 'NST_', 'DIG', 'LNGQT', 'ISC_', 'ISCAL', 'ISCIN', 'ISCIL', 'ISCAS', 'ISCLA', 'ANEUR', 'EL', 'ISCAN'],
    "MI": ['IMI', 'ASMI', 'ILMI', 'AMI', 'ALMI', 'INJAS', 'LMI', 'INJAL', 'IPLMI', 'IPMI', 'INJIN', 'INJLA', 'PMI', 'INJIL'],
    "HYP": ['LVH', 'LAO/LAE', 'RVH', 'RAO/RAE', 'SEHYP'],
    "CD": ['LAFB', 'IRBBB', '1AVB', 'IVCD', 'CRBBB', 'CLBBB', 'LPFB', 'WPW', 'ILBBB', '3AVB', '2AVB'],
    "OTHER": ['AFLT', 'AFIB', 'PSVT', 'STACH', 'PVC', 'PACE', 'PAC']
}

# Create a reverse mapping from SCP code to diagnostic class
scp_to_class = {code: cls for cls, codes in diagnostics.items() for code in codes}

# Function to classify the record
def classify_record(scp_codes):
    scp_dict = eval(scp_codes)
    for scp_code in scp_dict.keys():
        if scp_code in scp_to_class:
            return scp_to_class[scp_code]
    return 'OTHER'  # Default to 'OTHER' if no SCP code matches

# Apply the classification function to the dataframe
records_csv['diagnostic_class'] = records_csv['scp_codes'].apply(classify_record)

records_csv.head()


Unnamed: 0,ecg_id,patient_id,scp_codes,filename_lr,diagnostic_class
0,1,15709.0,"{'NORM': 100.0, 'LVOLT': 0.0, 'SR': 0.0}",records100/00000/00001_lr,NORM
1,2,13243.0,"{'NORM': 80.0, 'SBRAD': 0.0}",records100/00000/00002_lr,NORM
2,3,20372.0,"{'NORM': 100.0, 'SR': 0.0}",records100/00000/00003_lr,NORM
3,4,17014.0,"{'NORM': 100.0, 'SR': 0.0}",records100/00000/00004_lr,NORM
4,5,17448.0,"{'NORM': 100.0, 'SR': 0.0}",records100/00000/00005_lr,NORM


In [4]:
records_csv['diagnostic_class'].value_counts()

NORM     9514
MI       5422
STTC     2804
CD       2322
HYP      1307
OTHER     430
Name: diagnostic_class, dtype: int64

In [5]:
records_csv.drop(columns=['scp_codes'], inplace=True)

In [6]:
records =records_csv[records_csv['diagnostic_class'] != 'OTHER']

In [7]:
df=pd.DataFrame(records)

In [8]:
df.head()

Unnamed: 0,ecg_id,patient_id,filename_lr,diagnostic_class
0,1,15709.0,records100/00000/00001_lr,NORM
1,2,13243.0,records100/00000/00002_lr,NORM
2,3,20372.0,records100/00000/00003_lr,NORM
3,4,17014.0,records100/00000/00004_lr,NORM
4,5,17448.0,records100/00000/00005_lr,NORM


In [9]:
df['diagnostic_class'].value_counts()

NORM    9514
MI      5422
STTC    2804
CD      2322
HYP     1307
Name: diagnostic_class, dtype: int64

## ResNet

In [33]:
import torch
from torch.utils.data import DataLoader, Dataset, random_split
from sklearn.model_selection import train_test_split
import pandas as pd
import wfdb

# Custom Dataset for ECG data
class ECGDataset(Dataset):
    def __init__(self, df, base_path, transform=None):
        self.df = df
        self.base_path = base_path
        self.transform = transform
        self.class_map = {"NORM": 0, "MI": 1, "STTC": 2, "CD": 3, "HYP": 4}

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        if torch.is_tensor(idx):
            idx = idx.tolist()

        row = self.df.iloc[idx]
        record_path = os.path.join(self.base_path, row['filename_lr'])
        signal, _ = wfdb.rdsamp(record_path)

        # Convert signal to torch tensor
        signal = torch.FloatTensor(signal.T)

        # Encode the diagnostic class
        label = torch.tensor(self.class_map[row['diagnostic_class']], dtype=torch.long)

        if self.transform:
            signal = self.transform(signal)

        return signal, label


# Split the DataFrame
train_df, test_df = train_test_split(df, test_size=0.2, stratify=df['diagnostic_class'])

# Create datasets
train_dataset = ECGDataset(train_df, '/home/ppremnat/PTB-XL/')
test_dataset = ECGDataset(test_df, '/home/ppremnat/PTB-XL/')

# Create DataLoaders
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)


In [34]:
import torch
import torch.nn as nn

class ResidualUnit(nn.Module):
    def __init__(self, n_filters_in, n_filters_out, kernel_size=17, dropout_keep_prob=0.5, activation_function='relu'):
        super(ResidualUnit, self).__init__()
        self.conv1 = nn.Conv1d(n_filters_in, n_filters_out, kernel_size=kernel_size, padding=(kernel_size // 2))
        self.bn1 = nn.BatchNorm1d(n_filters_out)
        self.activation = nn.ReLU() if activation_function == 'relu' else nn.ELU()
        self.dropout = nn.Dropout(dropout_keep_prob)
        self.conv2 = nn.Conv1d(n_filters_out, n_filters_out, kernel_size=kernel_size, padding=(kernel_size // 2))
        self.bn2 = nn.BatchNorm1d(n_filters_out)
        self.shortcut = nn.Conv1d(n_filters_in, n_filters_out, kernel_size=1) if n_filters_in != n_filters_out else None

    def forward(self, x):
        #print(f"Input x: {x.shape}")
        identity = x

        out = self.conv1(x)
        #print(f"After conv1: {out.shape}")
        out = self.bn1(out)
        out = self.activation(out)
        out = self.dropout(out)

        out = self.conv2(out)
        #print(f"After conv2: {out.shape}")
        out = self.bn2(out)

        if self.shortcut is not None:
            identity = self.shortcut(identity)
            #print(f"After shortcut: {identity.shape}")

        out += identity
        out = self.activation(out)
        #print(f"Output out: {out.shape}")

        return out

class ECG_ResNet(nn.Module):
    def __init__(self, num_classes=5):
        super(ECG_ResNet, self).__init__()
        self.conv1 = nn.Conv1d(12, 64, kernel_size=17, padding=8)
        self.bn1 = nn.BatchNorm1d(64)
        self.relu = nn.ReLU()
        self.res1 = ResidualUnit(64, 128)
        self.res2 = ResidualUnit(128, 196)
        self.res3 = ResidualUnit(196, 256)
        self.avgpool = nn.AdaptiveAvgPool1d(1)
        self.fc = nn.Linear(256, num_classes)

    def forward(self, x):
        #print(f"Input x: {x.shape}")
        x = self.conv1(x)
        #print(f"After conv1: {x.shape}")
        x = self.bn1(x)
        x = self.relu(x)

        x = self.res1(x)
        #print(f"After res1: {x.shape}")
        x = self.res2(x)
        #print(f"After res2: {x.shape}")
        x = self.res3(x)
        #print(f"After res3: {x.shape}")

        x = self.avgpool(x)
        #print(f"After avgpool: {x.shape}")
        x = x.view(x.size(0), -1)
        x = self.fc(x)
        #print(f"Output x: {x.shape}")

        return x


In [35]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

model = ECG_ResNet(num_classes=5).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

from sklearn.metrics import precision_score, recall_score, f1_score, confusion_matrix
import numpy as np

# Initialize metrics storage
train_precisions, train_recalls, train_f1s = [], [], []
test_precisions, test_recalls, test_f1s = [], [], []

# Training loop
for epoch in range(10):
    model.train()
    all_labels = []
    all_preds = []
    running_loss = 0.0
    
    for i, (signals, labels) in enumerate(train_loader):
        signals, labels = signals.to(device), labels.to(device)
        optimizer.zero_grad()
        outputs = model(signals)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        
        running_loss += loss.item()
        
        # Collect predictions and labels for metrics
        _, preds = torch.max(outputs, 1)
        all_labels.extend(labels.cpu().numpy())
        all_preds.extend(preds.cpu().numpy())
        
        if i % 100 == 0:
            print(f"Epoch [{epoch+1}/10], Step [{i+1}/{len(train_loader)}], Loss: {loss.item():.4f}")

    # Calculate and print training metrics
    train_loss = running_loss / len(train_loader)
    train_precision = precision_score(all_labels, all_preds, average='weighted')
    train_recall = recall_score(all_labels, all_preds, average='weighted')
    train_f1 = f1_score(all_labels, all_preds, average='weighted')
    train_precisions.append(train_precision)
    train_recalls.append(train_recall)
    train_f1s.append(train_f1)

    print(f"Epoch [{epoch+1}/10] Training Loss: {train_loss:.4f}")
    print(f"Epoch [{epoch+1}/10] Training Precision: {train_precision:.4f}")
    print(f"Epoch [{epoch+1}/10] Training Recall: {train_recall:.4f}")
    print(f"Epoch [{epoch+1}/10] Training F1-Score: {train_f1:.4f}")

    # Testing loop
    model.eval()
    all_labels = []
    all_preds = []
    with torch.no_grad():
        correct = 0
        total = 0
        running_loss = 0.0
        
        for signals, labels in test_loader:
            signals, labels = signals.to(device), labels.to(device)
            outputs = model(signals)
            loss = criterion(outputs, labels)
            running_loss += loss.item()
            
            # Collect predictions and labels for metrics
            _, preds = torch.max(outputs, 1)
            all_labels.extend(labels.cpu().numpy())
            all_preds.extend(preds.cpu().numpy())
            
            total += labels.size(0)
            correct += (preds == labels).sum().item()
        
        accuracy = 100 * correct / total
        test_loss = running_loss / len(test_loader)
        test_precision = precision_score(all_labels, all_preds, average='weighted')
        test_recall = recall_score(all_labels, all_preds, average='weighted')
        test_f1 = f1_score(all_labels, all_preds, average='weighted')
        test_precisions.append(test_precision)
        test_recalls.append(test_recall)
        test_f1s.append(test_f1)
        
        print()
        print()
        print(f"Epoch [{epoch+1}/10] Test Loss: {test_loss:.4f}")
        print(f"Epoch [{epoch+1}/10] Test Accuracy: {accuracy:.2f}%")
        print(f"Epoch [{epoch+1}/10] Test Precision: {test_precision:.4f}")
        print(f"Epoch [{epoch+1}/10] Test Recall: {test_recall:.4f}")
        print(f"Epoch [{epoch+1}/10] Test F1-Score: {test_f1:.4f}")
        

Epoch [1/10], Step [1/535], Loss: 1.6643
Epoch [1/10], Step [101/535], Loss: 1.0355
Epoch [1/10], Step [201/535], Loss: 0.9031
Epoch [1/10], Step [301/535], Loss: 0.9596
Epoch [1/10], Step [401/535], Loss: 0.5636
Epoch [1/10], Step [501/535], Loss: 0.7256
Epoch [1/10] Training Loss: 0.9390
Epoch [1/10] Training Precision: 0.6270
Epoch [1/10] Training Recall: 0.6491
Epoch [1/10] Training F1-Score: 0.6242


Epoch [1/10] Test Loss: 0.8758
Epoch [1/10] Test Accuracy: 67.59%
Epoch [1/10] Test Precision: 0.6778
Epoch [1/10] Test Recall: 0.6759
Epoch [1/10] Test F1-Score: 0.6528
Epoch [2/10], Step [1/535], Loss: 0.8676
Epoch [2/10], Step [101/535], Loss: 0.7104
Epoch [2/10], Step [201/535], Loss: 0.6765
Epoch [2/10], Step [301/535], Loss: 0.7162
Epoch [2/10], Step [401/535], Loss: 0.5679
Epoch [2/10], Step [501/535], Loss: 0.7796
Epoch [2/10] Training Loss: 0.7992
Epoch [2/10] Training Precision: 0.6915
Epoch [2/10] Training Recall: 0.7036
Epoch [2/10] Training F1-Score: 0.6923


Epoch [2/10]

## InceptionNet

In [14]:
import torch
import torch.nn as nn

class InceptionBlock(nn.Module):
    def __init__(self, in_channels):
        super(InceptionBlock, self).__init__()
        self.conv1x1 = nn.Sequential(
            nn.Conv1d(in_channels, 16, kernel_size=1),
            nn.BatchNorm1d(16),
            nn.ReLU(inplace=True)
        )
        
        self.conv1x1_3x3 = nn.Sequential(
            nn.Conv1d(in_channels, 16, kernel_size=1),
            nn.BatchNorm1d(16),
            nn.ReLU(inplace=True),
            nn.Conv1d(16, 24, kernel_size=3, padding=1),
            nn.BatchNorm1d(24),
            nn.ReLU(inplace=True)
        )
        
        self.conv1x1_5x5 = nn.Sequential(
            nn.Conv1d(in_channels, 16, kernel_size=1),
            nn.BatchNorm1d(16),
            nn.ReLU(inplace=True),
            nn.Conv1d(16, 24, kernel_size=5, padding=2),
            nn.BatchNorm1d(24),
            nn.ReLU(inplace=True)
        )
        
        self.maxpool1x1 = nn.Sequential(
            nn.MaxPool1d(kernel_size=3, stride=1, padding=1),
            nn.Conv1d(in_channels, 24, kernel_size=1),
            nn.BatchNorm1d(24),
            nn.ReLU(inplace=True)
        )
    
    def forward(self, x):
        print(f"InceptionBlock input shape: {x.shape}")
        
        branch1 = self.conv1x1(x)
        print(f"Branch1 (1x1) shape: {branch1.shape}")
        
        branch2 = self.conv1x1_3x3(x)
        print(f"Branch2 (1x1 + 3x3) shape: {branch2.shape}")
        
        branch3 = self.conv1x1_5x5(x)
        print(f"Branch3 (1x1 + 5x5) shape: {branch3.shape}")
        
        branch4 = self.maxpool1x1(x)
        print(f"Branch4 (maxpool + 1x1) shape: {branch4.shape}")
        
        outputs = torch.cat([branch1, branch2, branch3, branch4], 1)
        print(f"InceptionBlock output shape: {outputs.shape}")
        
        return outputs

class InceptionNet(nn.Module):
    def __init__(self, num_classes=5):
        super(InceptionNet, self).__init__()
        self.conv1 = nn.Sequential(
            nn.Conv1d(12, 64, kernel_size=7, stride=2, padding=3),
            nn.BatchNorm1d(64),
            nn.ReLU(inplace=True)
        )
        
        self.inception1 = InceptionBlock(64)
        self.inception2 = InceptionBlock(64 + 16 + 24 + 24)  # Output channels from inception1
        
        self.pool = nn.AdaptiveAvgPool1d(1)
        self.fc = nn.Linear(64 + 16 + 24 + 24, num_classes)  # Output channels after concatenation
    
    def forward(self, x):
        print(f"InceptionNet input shape: {x.shape}")
        
        x = self.conv1(x)
        print(f"After conv1: {x.shape}")
        
        x = self.inception1(x)
        print(f"After inception1: {x.shape}")
        
        x = self.inception2(x)
        print(f"After inception2: {x.shape}")
        
        x = self.pool(x)
        print(f"After pool: {x.shape}")
        
        x = x.view(x.size(0), -1)
        print(f"After view (flatten): {x.shape}")
        
        x = self.fc(x)
        print(f"Output shape: {x.shape}")
        
        return x


In [15]:
import torch
from torch.utils.data import DataLoader, Dataset, random_split
from sklearn.model_selection import train_test_split
import pandas as pd
import wfdb
import os

# Custom Dataset for ECG data
class ECGDataset(Dataset):
    def __init__(self, df, base_path, transform=None):
        self.df = df
        self.base_path = base_path
        self.transform = transform
        self.class_map = {"NORM": 0, "MI": 1, "STTC": 2, "CD": 3, "HYP": 4}

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        if torch.is_tensor(idx):
            idx = idx.tolist()

        row = self.df.iloc[idx]
        record_path = os.path.join(self.base_path, row['filename_lr'])
        signal, _ = wfdb.rdsamp(record_path)

        # Convert signal to torch tensor
        signal = torch.FloatTensor(signal.T)

        # Encode the diagnostic class
        label = torch.tensor(self.class_map[row['diagnostic_class']], dtype=torch.long)

        if self.transform:
            signal = self.transform(signal)

        return signal, label


# Split the DataFrame
train_df, test_df = train_test_split(df, test_size=0.2, stratify=df['diagnostic_class'])

# Create datasets
train_dataset = ECGDataset(train_df, '/home/ppremnat/PTB-XL/')
test_dataset = ECGDataset(test_df, '/home/ppremnat/PTB-XL/')

# Create DataLoaders
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

# Model, Loss, Optimizer
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = InceptionNet(num_classes=5).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

from sklearn.metrics import precision_score, recall_score, f1_score, confusion_matrix
import numpy as np

# Initialize metrics storage
train_precisions, train_recalls, train_f1s = [], [], []
test_precisions, test_recalls, test_f1s = [], [], []

# Training loop
for epoch in range(10):
    model.train()
    all_labels = []
    all_preds = []
    running_loss = 0.0
    
    for i, (signals, labels) in enumerate(train_loader):
        signals, labels = signals.to(device), labels.to(device)
        optimizer.zero_grad()
        outputs = model(signals)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        
        running_loss += loss.item()
        
        # Collect predictions and labels for metrics
        _, preds = torch.max(outputs, 1)
        all_labels.extend(labels.cpu().numpy())
        all_preds.extend(preds.cpu().numpy())
        
        if i % 100 == 0:
            print(f"Epoch [{epoch+1}/10], Step [{i+1}/{len(train_loader)}], Loss: {loss.item():.4f}")

    # Calculate and print training metrics
    train_loss = running_loss / len(train_loader)
    train_precision = precision_score(all_labels, all_preds, average='weighted')
    train_recall = recall_score(all_labels, all_preds, average='weighted')
    train_f1 = f1_score(all_labels, all_preds, average='weighted')
    train_precisions.append(train_precision)
    train_recalls.append(train_recall)
    train_f1s.append(train_f1)

    print(f"Epoch [{epoch+1}/10] Training Loss: {train_loss:.4f}")
    print(f"Epoch [{epoch+1}/10] Training Precision: {train_precision:.4f}")
    print(f"Epoch [{epoch+1}/10] Training Recall: {train_recall:.4f}")
    print(f"Epoch [{epoch+1}/10] Training F1-Score: {train_f1:.4f}")

    # Testing loop
    model.eval()
    all_labels = []
    all_preds = []
    with torch.no_grad():
        correct = 0
        total = 0
        running_loss = 0.0
        
        for signals, labels in test_loader:
            signals, labels = signals.to(device), labels.to(device)
            outputs = model(signals)
            loss = criterion(outputs, labels)
            running_loss += loss.item()
            
            # Collect predictions and labels for metrics
            _, preds = torch.max(outputs, 1)
            all_labels.extend(labels.cpu().numpy())
            all_preds.extend(preds.cpu().numpy())
            
            total += labels.size(0)
            correct += (preds == labels).sum().item()
        
        accuracy = 100 * correct / total
        test_loss = running_loss / len(test_loader)
        test_precision = precision_score(all_labels, all_preds, average='weighted')
        test_recall = recall_score(all_labels, all_preds, average='weighted')
        test_f1 = f1_score(all_labels, all_preds, average='weighted')
        test_precisions.append(test_precision)
        test_recalls.append(test_recall)
        test_f1s.append(test_f1)
        
        print()
        print()
        print(f"Epoch [{epoch+1}/10] Test Loss: {test_loss:.4f}")
        print(f"Epoch [{epoch+1}/10] Test Accuracy: {accuracy:.2f}%")
        print(f"Epoch [{epoch+1}/10] Test Precision: {test_precision:.4f}")
        print(f"Epoch [{epoch+1}/10] Test Recall: {test_recall:.4f}")
        print(f"Epoch [{epoch+1}/10] Test F1-Score: {test_f1:.4f}")


InceptionNet input shape: torch.Size([32, 12, 1000])
After conv1: torch.Size([32, 64, 500])
InceptionBlock input shape: torch.Size([32, 64, 500])
Branch1 (1x1) shape: torch.Size([32, 16, 500])
Branch2 (1x1 + 3x3) shape: torch.Size([32, 24, 500])
Branch3 (1x1 + 5x5) shape: torch.Size([32, 24, 500])
Branch4 (maxpool + 1x1) shape: torch.Size([32, 24, 500])
InceptionBlock output shape: torch.Size([32, 88, 500])
After inception1: torch.Size([32, 88, 500])
InceptionBlock input shape: torch.Size([32, 88, 500])


RuntimeError: Given groups=1, weight of size [16, 128, 1], expected input[32, 88, 500] to have 128 channels, but got 88 channels instead