In [11]:
import pandas as pd

# Path to the ConditionNames_SNOMED-CT.csv file
mapping_csv_path = "/workspaces/BME3053C_ECG_Project/ECGTeam_Data/ecg-arrhythmia/ConditionNames_SNOMED-CT.csv"

# Load the CSV into a DataFrame
mapping_df = pd.read_csv(mapping_csv_path)

# Create a dictionary for mapping original letter labels to SNOMED CT codes and condition names
label_to_snomed = dict(zip(mapping_df['Acronym Name'], mapping_df['Snomed_CT']))
label_to_condition = dict(zip(mapping_df['Acronym Name'], mapping_df['Full Name']))

# Preview the mapping
print(mapping_df.head())


  Acronym Name                                  Full Name  Snomed_CT
0         1AVB            1 degree atrioventricular block  270492004
1         2AVB            2 degree atrioventricular block  195042002
2        2AVB1  2 degree atrioventricular block(Type one)   54016002
3        2AVB2  2 degree atrioventricular block(Type two)   28189009
4         3AVB            3 degree atrioventricular block   27885002


In [13]:
# Correct path to the mapping CSV file
mapping_csv_path = "ECGTeam_Data/ecg-arrhythmia/ConditionNames_SNOMED-CT.csv"
mapping_df = pd.read_csv(mapping_csv_path)

# Create dictionary for label to condition mapping
label_to_condition = dict(zip(mapping_df['Acronym Name'], mapping_df['Full Name']))

In [14]:
import os
import wfdb
import numpy as np
import matplotlib.pyplot as plt
from scipy.signal import butter, lfilter
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report
import torch
from torch.utils.data import Dataset, DataLoader
from torch.nn import CrossEntropyLoss
from torch.optim import AdamW
from transformers import DistilBertTokenizer, DistilBertForSequenceClassification

# Seed for reproducibility
np.random.seed(42)
torch.manual_seed(42)

# Bandpass filter
def bandpass_filter(data, lowcut, highcut, fs, order=4):
    nyquist = 0.5 * fs
    low = lowcut / nyquist
    high = highcut / nyquist
    b, a = butter(order, [low, high], btype="band")
    return lfilter(b, a, data)

# Filter parameters
fs = 360
lowcut = 0.5
highcut = 50

# Initialize DistilBERT
tokenizer = DistilBertTokenizer.from_pretrained("distilbert-base-uncased")
model = DistilBertForSequenceClassification.from_pretrained("distilbert-base-uncased", num_labels=2)

# Load and preprocess ECG signals
folder_path = "WFDBRecords"
signals = []
labels = []
file_limit = 100
file_count = 0

for root, _, files in os.walk(folder_path):
    for file in files:
        if file.endswith(".hea") and file_count < file_limit:
            file_count += 1
            record_name = os.path.splitext(file)[0]
            record_path = os.path.join(root, record_name)
            try:
                record = wfdb.rdrecord(record_path)
                signal = record.p_signal[:, 0]
                filtered_signal = bandpass_filter(signal, lowcut, highcut, fs)
                
                # Convert to string and tokenize
                encoded = tokenizer(
                    " ".join(map(str, filtered_signal)),
                    truncation=True,
                    padding="max_length",
                    max_length=512,
                    return_tensors="pt"
                )
                signals.append(encoded["input_ids"].squeeze(0).tolist())
                labels.append(0)  # Replace with real label logic

            except Exception as e:
                print(f"Error processing {record_name}: {e}")

# Convert lists to arrays
signals = np.array(signals, dtype=object)
labels = np.array(labels)

# Train/val/test split
X_train_val, X_test, y_train_val, y_test = train_test_split(signals, labels, test_size=0.2, random_state=42)
X_train, X_val, y_train, y_val = train_test_split(X_train_val, y_train_val, test_size=0.25, random_state=42)

# PyTorch Dataset
class ECGDataset(Dataset):
    def __init__(self, signals, labels):
        self.signals = signals
        self.labels = labels

    def __len__(self):
        return len(self.signals)

    def __getitem__(self, idx):
        input_array = np.array(self.signals[idx], dtype=np.int64)
        input_ids = torch.tensor(input_array, dtype=torch.long)
        attention_mask = (input_ids != tokenizer.pad_token_id).long()
        label = torch.tensor(self.labels[idx], dtype=torch.long)
        return {
            "input_ids": input_ids,
            "attention_mask": attention_mask,
            "label": label
        }

# Create dataset and dataloader
train_dataset = ECGDataset(X_train, y_train)
val_dataset = ECGDataset(X_val, y_val)
test_dataset = ECGDataset(X_test, y_test)

train_loader = DataLoader(train_dataset, batch_size=2, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=2)
test_loader = DataLoader(test_dataset, batch_size=2)

# Training setup
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)
optimizer = AdamW(model.parameters(), lr=5e-5)
loss_fn = CrossEntropyLoss()

# Training loop
for epoch in range(1):
    model.train()
    total_train_loss = 0
    for batch in train_loader:
        optimizer.zero_grad()
        input_ids = batch["input_ids"].to(device)
        attention_mask = batch["attention_mask"].to(device)
        labels = batch["label"].to(device)
        outputs = model(input_ids=input_ids, attention_mask=attention_mask, labels=labels)
        loss = outputs.loss
        loss.backward()
        optimizer.step()
        total_train_loss += loss.item()

    model.eval()
    total_val_loss = 0
    all_preds, all_labels = [], []
    with torch.no_grad():
        for batch in val_loader:
            input_ids = batch["input_ids"].to(device)
            attention_mask = batch["attention_mask"].to(device)
            labels = batch["label"].to(device)
            outputs = model(input_ids=input_ids, attention_mask=attention_mask, labels=labels)
            loss = outputs.loss
            preds = torch.argmax(outputs.logits, dim=1)
            total_val_loss += loss.item()
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())

    print(f"Epoch {epoch + 1}")
    print(f"Train Loss: {total_train_loss / len(train_loader):.4f}")
    print(f"Val Loss: {total_val_loss / len(val_loader):.4f}")
    print("Val Metrics:")
    print(classification_report(all_labels, all_preds))

# Final Evaluation
model.eval()
all_preds, all_labels = [], []
with torch.no_grad():
    for batch in test_loader:
        input_ids = batch["input_ids"].to(device)
        attention_mask = batch["attention_mask"].to(device)
        labels = batch["label"].to(device)
        outputs = model(input_ids=input_ids, attention_mask=attention_mask)
        preds = torch.argmax(outputs.logits, dim=1)
        all_preds.extend(preds.cpu().numpy())
        all_labels.extend(labels.cpu().numpy())

print("Test Set Evaluation:")
print(classification_report(all_labels, all_preds))



Some weights of DistilBertForSequenceClassification were not initialized from the model checkpoint at distilbert-base-uncased and are newly initialized: ['classifier.bias', 'classifier.weight', 'pre_classifier.bias', 'pre_classifier.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


Epoch 1
Train Loss: 0.0721
Val Loss: 0.0024
Val Metrics:
              precision    recall  f1-score   support

           0       1.00      1.00      1.00        20

    accuracy                           1.00        20
   macro avg       1.00      1.00      1.00        20
weighted avg       1.00      1.00      1.00        20

Test Set Evaluation:
              precision    recall  f1-score   support

           0       1.00      1.00      1.00        20

    accuracy                           1.00        20
   macro avg       1.00      1.00      1.00        20
weighted avg       1.00      1.00      1.00        20

