In [10]:
# TODO: Cite model 

In [11]:
import os
import wfdb
import numpy as np
import pandas as pd
import glob
from sklearn.preprocessing import MultiLabelBinarizer

In [12]:
# Paths
base_dir = "data"
csv_path = os.path.join(base_dir, "ptbxl_database.csv")
record_dir = os.path.join(base_dir, "records100")

# Load metadata
variables = pd.read_csv(csv_path, index_col=0)

# Get all .dat files
files = glob.glob(os.path.join(record_dir, "**", "*.dat"), recursive=True)
ecg_ids = [int(os.path.basename(f).split("_")[0]) for f in files]
variables = variables.loc[variables.index.isin(ecg_ids)]

# Reorder to match ECG files
ordered_indices = [i for i in ecg_ids if i in variables.index]
variables = variables.loc[ordered_indices]

In [13]:
from ast import literal_eval

# Decode scp_codes
variables["scp_codes"] = variables["scp_codes"].apply(literal_eval)

# Load diagnostic map
scp_path = os.path.join(base_dir, "scp_statements.csv")
scp_df = pd.read_csv(scp_path, index_col=0)
scp_map = scp_df[scp_df["diagnostic_class"].notnull()]["diagnostic_class"].to_dict()

# Map to superclasses
def map_to_superclass(scp_codes):
    return list({scp_map[code] for code in scp_codes if code in scp_map})

variables["diagnostic_superclass"] = variables["scp_codes"].apply(map_to_superclass)

In [14]:
target_labels = {"NORM", "MI", "STTC"}
variables = variables[variables["diagnostic_superclass"].apply(lambda x: bool(set(x) & target_labels))]

In [15]:
test_fold = 10
train_df = variables[variables["strat_fold"] != test_fold]
test_df = variables[variables["strat_fold"] == test_fold]

In [16]:
def load_raw_data(df, base_path):
    signals = []
    for path in df["filename_lr"]:
        signal, _ = wfdb.rdsamp(os.path.join(base_path, path))
        signals.append(signal)
    return np.array(signals)

X_train = load_raw_data(train_df, base_dir)
X_test = load_raw_data(test_df, base_dir)

In [17]:
def normalize(ecg):
    return (ecg - np.mean(ecg, axis=0)) / (np.std(ecg, axis=0) + 1e-8)

X_train = np.array([normalize(sig) for sig in X_train])
X_test = np.array([normalize(sig) for sig in X_test])

In [18]:
mlb = MultiLabelBinarizer(classes=["NORM", "MI", "STTC"])
mlb.fit(variables["diagnostic_superclass"])  # fit on full data

y_train = mlb.transform(train_df["diagnostic_superclass"])
y_test = mlb.transform(test_df["diagnostic_superclass"])



In [19]:
print(X_train.shape)  
print(y_train.shape)  

(16954, 1000, 12)
(16954, 3)


In [20]:
import torch
import torch.nn as nn
import torch.nn.functional as F

# Helper: 1D convolution with same padding
def conv(in_planes, out_planes, kernel_size=3, stride=1):
    return nn.Conv1d(in_planes, out_planes, kernel_size=kernel_size, stride=stride,
                     padding=(kernel_size - 1) // 2, bias=False)

# Basic ResNet block for 1D
class BasicBlock1D(nn.Module):
    expansion = 1
    def __init__(self, in_planes, planes, stride=1, downsample=None, kernel_size=3):
        super().__init__()
        self.conv1 = conv(in_planes, planes, kernel_size, stride)
        self.bn1 = nn.BatchNorm1d(planes)
        self.relu = nn.ReLU(inplace=True)
        self.conv2 = conv(planes, planes, kernel_size)
        self.bn2 = nn.BatchNorm1d(planes)
        self.downsample = downsample

    def forward(self, x):
        identity = x if self.downsample is None else self.downsample(x)
        out = self.relu(self.bn1(self.conv1(x)))
        out = self.bn2(self.conv2(out))
        out += identity
        return self.relu(out)

In [21]:
class ResNet1D(nn.Module):
    def __init__(self, block, layers, in_channels=12, num_classes=3, base_filters=64):
        super().__init__()
        self.in_planes = base_filters
        self.conv1 = nn.Conv1d(in_channels, base_filters, kernel_size=7, stride=2, padding=3, bias=False)
        self.bn1 = nn.BatchNorm1d(base_filters)
        self.relu = nn.ReLU(inplace=True)
        self.maxpool = nn.MaxPool1d(kernel_size=3, stride=2, padding=1)

        self.layer1 = self._make_layer(block, base_filters, layers[0])
        self.layer2 = self._make_layer(block, base_filters * 2, layers[1], stride=2)
        self.layer3 = self._make_layer(block, base_filters * 4, layers[2], stride=2)
        self.layer4 = self._make_layer(block, base_filters * 8, layers[3], stride=2)

        self.avgpool = nn.AdaptiveAvgPool1d(1)
        self.fc = nn.Linear(base_filters * 8 * block.expansion, num_classes)

    def _make_layer(self, block, planes, num_blocks, stride=1):
        downsample = None
        if stride != 1 or self.in_planes != planes * block.expansion:
            downsample = nn.Sequential(
                nn.Conv1d(self.in_planes, planes * block.expansion, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm1d(planes * block.expansion),
            )
        layers = [block(self.in_planes, planes, stride, downsample)]
        self.in_planes = planes * block.expansion
        for _ in range(1, num_blocks):
            layers.append(block(self.in_planes, planes))
        return nn.Sequential(*layers)

    def forward(self, x):
        x = self.relu(self.bn1(self.conv1(x)))
        x = self.maxpool(x)

        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)

        x = self.avgpool(x).squeeze(-1)
        return self.fc(x)

In [22]:
model = ResNet1D(BasicBlock1D, [2, 2, 2, 2], in_channels=12, num_classes=3)
print(model)

ResNet1D(
  (conv1): Conv1d(12, 64, kernel_size=(7,), stride=(2,), padding=(3,), bias=False)
  (bn1): BatchNorm1d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (relu): ReLU(inplace=True)
  (maxpool): MaxPool1d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
  (layer1): Sequential(
    (0): BasicBlock1D(
      (conv1): Conv1d(64, 64, kernel_size=(3,), stride=(1,), padding=(1,), bias=False)
      (bn1): BatchNorm1d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU(inplace=True)
      (conv2): Conv1d(64, 64, kernel_size=(3,), stride=(1,), padding=(1,), bias=False)
      (bn2): BatchNorm1d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    )
    (1): BasicBlock1D(
      (conv1): Conv1d(64, 64, kernel_size=(3,), stride=(1,), padding=(1,), bias=False)
      (bn1): BatchNorm1d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU(inplace=True)
      (conv2): Conv