# ClinicalBERT

In [None]:
# General imports
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import torch
import sys
from tqdm import tqdm
import ast
from scipy.signal import resample

# Matlab/WFDB files
import scipy.io as sio

from transformers import AutoTokenizer, AutoModel
import torch
import torch.nn as nn

In [None]:
sys.path.append('C:/Users/navme/Desktop/ECG_Project/PyFiles')

In [15]:
from helper_functions import *
from dataset import PhysioNetDataset
from tripletloss import TripletLoss

In [None]:
PhysioNet_PATH = f'C:/Users/navme/Desktop/ECG_Thesis_Local/PhysioNet-2021-Challenge/physionet.org/files/challenge-2021/1.0.3/training'
PhysioNet_PATH

In [None]:
train_set = PhysioNetDataset(PhysioNet_PATH, train = True)
val_set = PhysioNetDataset(PhysioNet_PATH, train = False)

In [None]:
processed_train_csv = pd.read_csv('processed_train_set_records.csv')
processed_val_csv = pd.read_csv('processed_val_set_records.csv')

In [None]:
processed_train_csv.head(2)

In [None]:
class ECGEncoder(nn.Module):
    def __init__(self):
        super(ECGEncoder, self).__init__()
        self.tokenizer = AutoTokenizer.from_pretrained("emilyalsentzer/Bio_ClinicalBERT")
        self.bert = AutoModel.from_pretrained("emilyalsentzer/Bio_ClinicalBERT")

    def forward(self, ecg_signal):
        # Convert ECG signal to string as BERT takes text as input
        ecg_signal = ' '.join(map(str, ecg_signal))
        inputs = self.tokenizer(ecg_signal, return_tensors="pt")
        outputs = self.bert(**inputs)
        # Use the BERT embeddings for the [CLS] token (first token)
        embeddings = outputs.last_hidden_state[:, 0, :]
        return embeddings

In [None]:
# Instantiate the ECGEncoder
ecg_encoder = ECGEncoder()

# Assume we have an ECG signal from the PhysioNetDataset
_, ecg_signal = train_set[0]  # Get the first sample from the training set

# We'll just use one of the leads for this example
lead_name = list(ecg_signal.keys())[0]  # Get the name of the first lead
lead_ecg_signal = ecg_signal[lead_name]  # Get the ECG signal for that lead

# Convert the ECG signal to a ClinicalBERT embedding
ecg_embedding = ecg_encoder(lead_ecg_signal)

# Now ecg_embedding is a tensor containing the ClinicalBERT embedding of the ECG signal
print(ecg_embedding)

In [16]:
class CLIPModel(nn.Module):
    def __init__(self, margin=1.0):
        super(CLIPModel, self).__init__()
        self.ecg_encoder = ECGEncoder()
        self.triplet_loss = TripletLoss(margin)

    def forward(self, anchor, positive, negative):
        anchor_embedding = self.ecg_encoder(anchor)
        positive_embedding = self.ecg_encoder(positive)
        negative_embedding = self.ecg_encoder(negative)

        loss = self.triplet_loss(anchor_embedding, positive_embedding, negative_embedding)
        return loss

In [17]:
# Instantiate the CLIPModel
clip_model = CLIPModel()

# Assume we have an anchor, positive, and negative ECG signals from the PhysioNetDataset
_, anchor_ecg_signal = train_set[0]  # Get the first sample from the training set
_, positive_ecg_signal = train_set[1]  # Get the second sample from the training set
_, negative_ecg_signal = train_set[2]  # Get the third sample from the training set

# We'll just use one of the leads for this example
lead_name = list(anchor_ecg_signal.keys())[0]  # Get the name of the first lead
anchor_lead_ecg_signal = anchor_ecg_signal[lead_name]  # Get the ECG signal for that lead
positive_lead_ecg_signal = positive_ecg_signal[lead_name]  # Get the ECG signal for that lead
negative_lead_ecg_signal = negative_ecg_signal[lead_name]  # Get the ECG signal for that lead

In [18]:
# Compute the triplet loss for these ECG signals
triplet_loss = clip_model(anchor_lead_ecg_signal, positive_lead_ecg_signal, negative_lead_ecg_signal)

# Now triplet_loss is a tensor containing the triplet loss for these ECG signals
print(triplet_loss)

tensor(1.0096, grad_fn=<MeanBackward0>)


In [20]:
# Instantiate the CLIPModel
clip_model = CLIPModel()

num_epochs = 2

# Define an optimizer
optimizer = torch.optim.Adam(clip_model.parameters())

# Loop over your training data
for epoch in range(num_epochs):
    for i in range(len(train_set)):
        # Get the anchor, positive, and negative ECG signals
        _, anchor_ecg_signal = train_set[i]
        _, positive_ecg_signal = train_set[(i + 1) % len(train_set)]  # Use the next sample as the positive example
        _, negative_ecg_signal = train_set[(i + 2) % len(train_set)]  # Use the sample after that as the negative example

        # We'll just use one of the leads for this example
        lead_name = list(anchor_ecg_signal.keys())[0]  # Get the name of the first lead
        anchor_lead_ecg_signal = anchor_ecg_signal[lead_name]  # Get the ECG signal for that lead
        positive_lead_ecg_signal = positive_ecg_signal[lead_name]  # Get the ECG signal for that lead
        negative_lead_ecg_signal = negative_ecg_signal[lead_name]  # Get the ECG signal for that lead

        # Compute the triplet loss for these ECG signals
        triplet_loss = clip_model(anchor_lead_ecg_signal, positive_lead_ecg_signal, negative_lead_ecg_signal)

        # Backpropagate the loss and update the model parameters
        optimizer.zero_grad()
        triplet_loss.backward()
        optimizer.step()

    print(f"Epoch {epoch + 1}/{num_epochs} Loss: {triplet_loss.item()}")

: 

1. The `ECGEncoder` is used to convert the raw ECG signals into embeddings using the ClinicalBERT model.

2. The `CLIPModel` uses the `ECGEncoder` to get embeddings for the anchor, positive, and negative examples, and then computes the triplet loss on these embeddings. By training the `CLIPModel` on your dataset, you're effectively learning a new representation for your ECG signals where similar signals are close together and different signals are far apart.

3. Once you've trained the `CLIPModel`, you can use it to transform your ECG signals into this new representation. You can then train a classifier on these transformed signals. This classifier could be any type of model you choose, such as a linear classifier, a support vector machine, a decision tree, a neural network, etc. The hope is that the new representation learned by the `CLIPModel` will be more useful for classification than the raw ECG signals.

In [None]:
import torch.nn as nn
import torch.optim as optim

# Load the data
train_dataset = PhysioNetDataset(processed_train_csv)
val_dataset = PhysioNetDataset(processed_val_csv)

# Instantiate the CLIPModel
clip_model = CLIPModel()

# Define the 1D CNN model
class CNN(nn.Module):
    def __init__(self):
        super(CNN, self).__init__()
        self.conv1 = nn.Conv1d(1, 64, kernel_size=5, stride=2)
        self.relu = nn.ReLU()
        self.fc = nn.Linear(64, num_classes)  # num_classes is the number of unique values in 'dx_modality'

    def forward(self, x):
        x = self.conv1(x)
        x = self.relu(x)
        x = torch.flatten(x, start_dim=1)
        x = self.fc(x)
        return x

cnn = CNN()

# Define a loss function and an optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(cnn.parameters())

# Train the model
for epoch in range(num_epochs):
    for i in range(len(train_dataset)):
        # Get the ECG signal and the label
        _, ecg_signal = train_dataset[i]
        label = processed_train_csv['dx_modality'][i]

        # Transform the ECG signal using the CLIPModel
        ecg_signal = clip_model(ecg_signal)

        # Train the CNN on the transformed ECG signal and the label
        optimizer.zero_grad()
        outputs = cnn(ecg_signal)
        loss = criterion(outputs, label)
        loss.backward()
        optimizer.step()

    print(f"Epoch {epoch + 1}/{num_epochs} Loss: {loss.item()}")

# Evaluate the model on the validation set
correct = 0
total = 0
with torch.no_grad():
    for i in range(len(val_dataset)):
        # Get the ECG signal and the label
        _, ecg_signal = val_dataset[i]
        label = processed_val_csv['dx_modality'][i]

        # Transform the ECG signal using the CLIPModel
        ecg_signal = clip_model(ecg_signal)

        # Compute the model's prediction
        outputs = cnn(ecg_signal)
        _, predicted = torch.max(outputs.data, 1)

        total += label.size(0)
        correct += (predicted == label).sum().item()

print(f"Accuracy on the validation set: {100 * correct / total}%")

To predict the class of a new ECG signal using the trained 1D CNN model, you can follow these steps:

1. Load the new ECG signal.
2. Transform the ECG signal into the learned representation using the `CLIPModel`.
3. Pass the transformed ECG signal through the 1D CNN to get the predicted class.

In [None]:
from scipy.io import loadmat

# Load the new ECG signal
new_ecg_signal = loadmat('new_ecg_signal.mat')

# Assuming the ECG signal is stored in a variable named 'ecg' in the .mat file
new_ecg_signal = new_ecg_signal['ecg']

# Transform the ECG signal using the CLIPModel
new_ecg_signal = clip_model(new_ecg_signal)

# Pass the transformed ECG signal through the 1D CNN to get the predicted class
outputs = cnn(new_ecg_signal)
_, predicted_class = torch.max(outputs.data, 1)

print(f"The predicted class for the new ECG signal is: {predicted_class.item()}")

This code loads a new ECG signal, transforms it using the `CLIPModel`, and then passes the transformed signal through the 1D CNN to get the predicted class. The predicted class is then printed to the console. Note that you'll need to replace `'new_ecg_signal.mat'` with the path to your new ECG signal file.