## Evaluation Template (Experiment A)

Two tables:

- Seen Class Labels
- Unseen class Labels

- We need to establish baselines results framed as a multi-label classification problem

- Descriptive statistics to create plots for results obtained during testing for each of the classes. One line per projection dimension (64, 128, 256, 512, etc).

In [1]:
!pip install wfdb

Collecting wfdb
  Downloading wfdb-4.1.2-py3-none-any.whl (159 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m160.0/160.0 kB[0m [31m1.6 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: wfdb
Successfully installed wfdb-4.1.2


In [2]:
import random
import os
import itertools
from glob import glob
import sys

import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import pandas as pd
from tqdm import tqdm
import matplotlib.pyplot as plt
import scipy
import wfdb
import cv2
import zipfile
import ast

from sklearn.metrics import roc_auc_score, average_precision_score, f1_score, accuracy_score, precision_score, recall_score, f1_score

from albumentations.pytorch import ToTensorV2
from transformers import AutoTokenizer, AutoModel
import albumentations as A

from sklearn.model_selection import train_test_split

In [3]:
from google.colab import drive
drive.mount('/content/drive', force_remount=True)

Mounted at /content/drive


In [4]:
# Dataset containing ECG signals (500 Hz and 100 Hz)
!cp -r  "/content/drive/MyDrive/ECG Project (Shared Folder)/2024 - PTB-XL + Reports (Scientific Paper)/records500V2.zip" "/content"

In [5]:
# CSV of dataset labels and file paths to the corresponding ECG signals
!cp -r  "/content/drive/MyDrive/ECG Project (Shared Folder)/2024 - PTB-XL + Reports (Scientific Paper)/Experiment A/Test Results/testing_subset.csv" "/content"

In [6]:
with zipfile.ZipFile('records500V2.zip', 'r') as zip_ref:
    zip_ref.extractall('/content')

In [7]:
sys.path.append('/content/drive/MyDrive/ECG Project (Shared Folder)/2024 - PTB-XL + Reports (Scientific Paper)/Experiment A')

In [8]:
import codes

In [9]:
def set_seed(seed):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed_all(seed)

# Use a fixed seed value
set_seed(42)

In [10]:
class CONFIG:
    debug = False
    batch_size = 128
    num_workers = 2
    head_lr = 0.001
    image_encoder_lr = 0.001
    patience = 5
    factor = 0.8
    epochs = 20
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    # Image Model
    # model_name = 'resnet18'
    image_embedding_size = 512

    # Text Model
    text_encoder_model = 'emilyalsentzer/Bio_ClinicalBERT'
    text_tokenizer = 'emilyalsentzer/Bio_ClinicalBERT'
    text_embedding_size = 768
    max_length = 200

    pretrained = True # for both image encoder and text encoder
    trainable = True # for both image encoder and text encoder
    temperature = 10.0
    optimizer = torch.optim.Adam

    # image size
    size = 224

    # for projection head; used for both image and text encoder
    num_projection_layers = 1
    projection_dim = 1536
    dropout = 0.0
    ecg_sr = 128

In [11]:
def load_wsdb(file):
    file = os.path.splitext(file)[0]
    record = wfdb.io.rdrecord(file)
    ecg = record.p_signal.T.astype('float32')
    leads = tuple(record.sig_name)
    sr = record.fs
    ecg[np.isnan(ecg)] = 0.0
    return ecg, leads, sr

def load_raw_data(df, sampling_rate, path):
    data = []
    not_found_files = []
    if sampling_rate == 100:
        filenames = df.filename_lr
    else:
        filenames = df.filename_hr

    for f in filenames:
        try:
            record = wfdb.rdsamp(path+f)
            data.append(record)
        except FileNotFoundError:
            not_found_files.append(f)
            continue

    data = np.array([signal for signal, meta in data])
    return data, not_found_files

In [12]:
path_500_HR = '/content/records500'  # Path to ECG signal files (to load them)
path_PTBXL_dataset = "/content/drive/MyDrive/ECG Project (Shared Folder)/2024 - PTB-XL + Reports (Scientific Paper)/Experiment A/Test Results/testing_subset.csv"

In [13]:
PTBXL_dataset = pd.read_csv(path_PTBXL_dataset)
print("There are {0} records in the PTB-XL dataset.".format(len(PTBXL_dataset)))

There are 6315 records in the PTB-XL dataset.


Model Paths:

- `"/content/drive/MyDrive/ECG Project (Shared Folder)/2024 - PTB-XL + Reports (Scientific Paper)/Experiment A/Test Results/64/model_64_temp_1.0.pth"`
- `"/content/drive/MyDrive/ECG Project (Shared Folder)/2024 - PTB-XL + Reports (Scientific Paper)/Experiment A/Test Results/64/model_64_temp_10.0.pth"`
- `"/content/drive/MyDrive/ECG Project (Shared Folder)/2024 - PTB-XL + Reports (Scientific Paper)/Experiment A/Test Results/128/model_128.pth"`
- `"/content/drive/MyDrive/ECG Project (Shared Folder)/2024 - PTB-XL + Reports (Scientific Paper)/Experiment A/Test Results/256/model_256.pth"`
- `"/content/drive/MyDrive/ECG Project (Shared Folder)/2024 - PTB-XL + Reports (Scientific Paper)/Experiment A/Test Results/512/model_512.pth"`
- `"/content/drive/MyDrive/ECG Project (Shared Folder)/2024 - PTB-XL + Reports (Scientific Paper)/Experiment A/Test Results/768/model_768.pth"`
- `"/content/drive/MyDrive/ECG Project (Shared Folder)/2024 - PTB-XL + Reports (Scientific Paper)/Experiment A/Test Results/model_1536.pth"`

In [15]:
# Pretrained model
!cp -r "/content/drive/MyDrive/ECG Project (Shared Folder)/2024 - PTB-XL + Reports (Scientific Paper)/Experiment A/Test Results/1536/model_1536.pth" "/content"

## Models for CLIP

In [20]:
import torch
from torch import nn

_ACTIVATION_DICT = {'relu': nn.ReLU,
                    'tanh': nn.Tanh,
                    'none': nn.Identity,
                    'leaky_relu': lambda: nn.LeakyReLU(negative_slope=0.2)}

class Conv1dBlock(nn.Module):
    def __init__(self, in_channels, out_channels, kernel_size,
                 act='relu', bn=True, dropout=None,
                 maxpool=None, padding=None, stride=1):

        super().__init__()

        if padding is None or padding == 'same':
            padding = kernel_size // 2

        self.conv = nn.Conv1d(in_channels, out_channels, kernel_size, padding=padding, bias=not bn, stride=stride)
        self.bn = nn.BatchNorm1d(out_channels) if bn else None
        self.act = _ACTIVATION_DICT[act]()
        self.dropout = None if dropout is None else nn.Dropout(dropout)
        self.maxpool = None if maxpool is None else nn.MaxPool1d(maxpool)

    def forward(self, x):
        x = self.conv(x)

        if self.bn is not None:
            x = self.bn(x)

        x = self.act(x)

        if self.dropout is not None:
            x = self.dropout(x)

        if self.maxpool is not None:
            x = self.maxpool(x)

        return x

class LinearBlock(nn.Module):
    def __init__(self, in_channels, out_channels, act='relu', bn=True, dropout=None):

        super().__init__()

        self.linear = nn.Linear(in_channels, out_channels, bias=not bn)
        self.bn = nn.BatchNorm1d(out_channels) if bn else None
        self.act = _ACTIVATION_DICT[act]()
        self.dropout = None if dropout is None else nn.Dropout(dropout)

    def forward(self, x):
        x = self.linear(x)

        if self.bn is not None:
            x = self.bn(x)

        x = self.act(x)

        if self.dropout is not None:
            x = self.dropout(x)
        return x

class ConvEncoder(nn.Module):
    def __init__(self, in_channels, channels, kernels, bn=True, dropout=None, maxpool=2, padding=0, stride=None):
        super().__init__()

        num_layers = len(channels)
        if stride is None:
            stride = [1] * num_layers

        self.in_layer = Conv1dBlock(in_channels, channels[0], kernels[0], bn=bn, dropout=dropout, maxpool=maxpool, padding=padding, stride=stride[0])

        conv_layers = list()
        for i in range(1, num_layers):
            conv_layers.append(Conv1dBlock(channels[i - 1], channels[i], kernels[i], bn=bn, dropout=dropout, maxpool=maxpool, padding=padding, stride=stride[i]))
        self.conv_layers = nn.ModuleList(conv_layers)

    def forward(self, x):
        x = self.in_layer(x)
        for layer in self.conv_layers:
            x = layer(x)
        return x

class ECGEncoder(nn.Module):
    def __init__(self,
                 window=1280,
                 in_channels=12,
                 channels=(32, 32, 64, 64, 128, 128, 256, 256),
                 kernels=(7, 7, 5, 5, 3, 3, 3, 3),
                 linear=512,
                 output=512):

        super().__init__()

        self.conv_encoder = ConvEncoder(in_channels, channels,  kernels, bn=True)

        with torch.no_grad():
            inpt = torch.zeros((1, in_channels, window), dtype=torch.float32)
            outpt = self.conv_encoder(inpt)
            output_window = outpt.shape[2]

        self.flatten = nn.Flatten()
        self.conv_to_linear = nn.Linear(output_window * channels[-1], linear)
        self.act = nn.ReLU()
        self.out_layer = nn.Linear(linear, output)

    def forward(self, x):
        # print(x.shape)
        x = self.conv_encoder(x)
        # print(x.shape)
        x = self.flatten(x)
        # print(x.shape)
        x = self.conv_to_linear(x)
        x = self.act(x)
        x = self.out_layer(x)
        return x

In [21]:
class ImageEncoder(nn.Module):

    def __init__(self, config):
        super().__init__()
        self.config = CONFIG
        self.encoder = ECGEncoder(output=CONFIG.image_embedding_size)

    def forward(self, x):
        x = self.encoder(x)
        return x


class TextEncoder(nn.Module):
    def __init__(self, config):
        super().__init__()

        self.config = config
        if self.config.pretrained:
            self.model = AutoModel.from_pretrained(self.config.text_encoder_model)
        else:
            self.model = AutoModel.from_config(self.config.text_encoder_model)

        self.tokenizer = AutoTokenizer.from_pretrained(self.config.text_tokenizer)

        for p in self.model.parameters():
            p.requires_grad = False  # Set requires_grad to False for all parameters

        # we are using the CLS token hidden representation as the sentence's embedding
        self.target_token_idx = 0

    def forward(self, texts):
        input_ids, attention_mask = self.tokenize_texts(texts)
        embeddinbgs = self.inputs_to_embeddings(input_ids, attention_mask)
        return embeddinbgs

    def tokenize_texts(self, texts):
        inputs = self.tokenizer(texts, padding=True, truncation=True, max_length=self.config.max_length, return_tensors='pt')
        input_ids = inputs['input_ids'].detach().to(self.config.device)
        attention_mask = inputs['attention_mask'].detach().to(self.config.device)
        return input_ids, attention_mask

    def inputs_to_embeddings(self, input_ids, attention_mask):
        output = self.model(input_ids=input_ids, attention_mask=attention_mask)
        last_hidden_state = output.last_hidden_state
        return last_hidden_state[:, self.target_token_idx, :].detach()


class ProjectionHead(nn.Module):
    def __init__(
        self,
        embedding_dim,
        projection_dim=CONFIG.projection_dim,
        dropout=CONFIG.dropout
    ):
        super().__init__()
        self.projection = nn.Linear(embedding_dim, projection_dim)
        self.gelu = nn.GELU()
        self.fc = nn.Linear(projection_dim, projection_dim)
        self.dropout = nn.Dropout(dropout)
        self.layer_norm = nn.LayerNorm(projection_dim)

    def forward(self, x):
        projected = self.projection(x)
        x = self.gelu(projected)
        x = self.fc(x)
        x = self.dropout(x)
        x = x + projected
        x = self.layer_norm(x)
        return x

class CLIPModel(nn.Module):
    def __init__(
        self,
        temperature=CONFIG.temperature,
        image_embedding=CONFIG.image_embedding_size,
        text_embedding=CONFIG.text_embedding_size,
    ):
        super().__init__()
        self.image_encoder = ImageEncoder(CONFIG)
        self.text_encoder = TextEncoder(CONFIG)
        self.image_projection = ProjectionHead(embedding_dim=image_embedding)
        self.text_projection = ProjectionHead(embedding_dim=text_embedding)
        self.temperature = temperature

    def forward(self, batch):
        image_embeddings = self.image_to_embeddings(batch['image'])
        text_embeddings = self.text_to_embeddings(batch['caption'])

        # Calculating the Loss
        logits = (text_embeddings @ image_embeddings.T) / self.temperature
        images_similarity = image_embeddings @ image_embeddings.T
        texts_similarity = text_embeddings @ text_embeddings.T
        targets = F.softmax((images_similarity + texts_similarity) / 2 * self.temperature, dim=-1)
        texts_loss = cross_entropy(logits, targets, reduction='none')
        images_loss = cross_entropy(logits.T, targets.T, reduction='none')
        loss =  (images_loss + texts_loss) / 2.0 # shape: (batch_size)
        return loss.mean(), image_embeddings, text_embeddings

    def text_to_embeddings(self, texts):
        text_features = self.text_encoder(texts)
        text_embeddings = self.text_projection(text_features)
        return text_embeddings

    def image_to_embeddings(self, images):
        image_features = self.image_encoder(images)
        image_embeddings = self.image_projection(image_features)
        return image_embeddings


def cross_entropy(preds, targets, reduction='none'):
    log_softmax = nn.LogSoftmax(dim=-1)
    loss = (-targets * log_softmax(preds)).sum(1)
    if reduction == "none":
        return loss
    elif reduction == "mean":
        return loss.mean()


def nxn_cos_sim(A, B, dim=1):
    a_norm = F.normalize(A, p=2, dim=dim)
    b_norm = F.normalize(B, p=2, dim=dim)
    return torch.mm(a_norm, b_norm.transpose(0, 1))

In [22]:
def calc_metrics(image_embeddings, captions, class_embeddings, class_names):
    similarity = nxn_cos_sim(image_embeddings, class_embeddings, dim=1)
    predictions_ids = similarity.argmax(dim=1)
    predictions = [class_names[idx] for idx in predictions_ids]
    tps = [prediction in caption for prediction, caption in zip(predictions, captions)]
    accuracy = np.mean(tps)

    results = dict()
    results['accuracy'] = accuracy

    similarity = similarity.detach().cpu().numpy()
    for i, name in enumerate(class_names):

        true = np.array([name in caption for caption in captions]).astype('int32')

        if true.std() > 0:
            results[f'{name}_rocauc'] = roc_auc_score(true, similarity[:, i])
            results[f'{name}_prauc'] = average_precision_score(true, similarity[:, i])
        else:
            results[f'{name}_rocauc'] = None
            results[f'{name}_prauc'] = None

    return results

def calc_accuracy(image_embeddings, captions, class_embeddings, class_names):
    similarity = nxn_cos_sim(image_embeddings, class_embeddings, dim=1)
    predictions_ids = similarity.argmax(dim=1)
    predictions = [class_names[idx] for idx in predictions_ids]
    tps = [prediction in caption for prediction, caption in zip(predictions, captions)]
    accuracy = np.mean(tps)
    return accuracy


class AvgMeter:
    def __init__(self, name="Metric"):
        self.name = name
        self.reset()

    def reset(self):
        self.avg, self.sum, self.count = [0] * 3

    def update(self, val, count=1):
        self.count += count
        self.sum += val * count
        self.avg = self.sum / self.count

    def __repr__(self):
        text = f"{self.name}: {self.avg:.4f}"
        return text

In [23]:
def valid_epoch(model, loader, classes):
    model.eval()

    with torch.no_grad():
        class_embeddings = model.text_to_embeddings(classes).detach().cpu()

    tqdm_object = tqdm(loader, total=len(loader))
    embeddings = list()
    captions = list()
    with torch.no_grad():
        for batch in tqdm_object:
            batch = {k: v.to(CONFIG.device) if isinstance(v, torch.Tensor) else v for k, v in batch.items()}
            loss, image_embeddings, text_embeddings = model(batch)
            embeddings.append(image_embeddings.cpu())
            captions += batch['caption']

    embeddings = torch.cat(embeddings)

#     plt.figure(figsize=(30, 5))
#     plt.hist(class_embeddings.numpy().flatten(), bins=100)
#     plt.grid()
#     plt.show()


#     plt.figure(figsize=(30, 5))
#     plt.hist(embeddings.numpy().flatten(), bins=100)
#     plt.grid()
#     plt.show()


    metric = calc_metrics(embeddings, captions, class_embeddings, classes)
    return metric

def resample(ecg, shape):
    resized = cv2.resize(ecg, (shape, ecg.shape[0]))
    resized = resized.astype(ecg.dtype)
    return resized

class CLIP_Dataset_Evaluation(torch.utils.data.Dataset):
    def __init__(self, df, config):
        self.df = df
        self.df['filename_hr'] = self.df['filename_hr'].astype(str)
        self.config = config

        self.ecg_files = self.df['filename_hr'].values
        self.captions = self.df['true_label_dx'].values

    def __len__(self, ):
        return len(self.df)

    def __getitem__(self, idx):
        ecg, leads, sr = load_wsdb(self.ecg_files[idx])
        caption = self.captions[idx]
        image = self.process_ecg(ecg, sr)
        return {'image': image, 'caption': caption}

    def process_ecg(self, ecg, sr):
        new_shape = int(self.config.ecg_sr * ecg.shape[1] / sr)
        ecg = resample(ecg, new_shape)
        return ecg

## Model Evaluation

In [24]:
# Instantiate your model
model = CLIPModel()

# Define the path to your pretrained weights
weights_path = "/content/model_1536.pth"

# Load the weights
state_dict = torch.load(weights_path)

# Update the model's weights
model.load_state_dict(state_dict)

# Send the model to the device
model = model.to(CONFIG.device)

# Your existing optimizer code
optimizer = CONFIG.optimizer

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


config.json:   0%|          | 0.00/385 [00:00<?, ?B/s]

pytorch_model.bin:   0%|          | 0.00/436M [00:00<?, ?B/s]

vocab.txt:   0%|          | 0.00/213k [00:00<?, ?B/s]

In [25]:
test_subset = pd.read_csv("/content/testing_subset.csv")
test_subset.head(3)

Unnamed: 0,filename_hr,HR_ID,report_ENG,true_label_dx
0,records500/00000/00001_hr,HR00001.hea,sinus rhythm peripheral low voltage,"['low qrs voltages', 'sinus rhythm']"
1,records500/00000/00022_hr,HR00022.hea,Sinus rhythm left type non-specific abnormal t,"['t wave abnormal', 'left axis deviation', 'si..."
2,records500/00000/00026_hr,HR00026.hea,sinus rhythm suspected p-sinistrocardiale left...,"['left axis deviation', 'sinus rhythm', 'nonsp..."


In [26]:
test_ds = CLIP_Dataset_Evaluation(test_subset, CONFIG)
test_dl = torch.utils.data.DataLoader(test_ds, batch_size=CONFIG.batch_size, num_workers=CONFIG.num_workers, shuffle=False)

In [27]:
test_ds[0]

{'image': array([[-0.115     , -0.115     , -0.11384375, ...,  0.21      ,
          0.21      ,  0.21      ],
        [-0.05      , -0.05      , -0.045     , ...,  0.205     ,
          0.205     ,  0.205     ],
        [ 0.065     ,  0.065     ,  0.06957813, ..., -0.005     ,
         -0.005     , -0.005     ],
        ...,
        [-0.035     , -0.0335625 , -0.03      , ...,  0.185     ,
          0.185     ,  0.185     ],
        [-0.035     , -0.035     , -0.03      , ...,  0.17      ,
          0.17      ,  0.17      ],
        [-0.075     , -0.075     , -0.07      , ...,  0.18      ,
          0.18      ,  0.18      ]], dtype=float32),
 'caption': "['low qrs voltages', 'sinus rhythm']"}

In [28]:
# This is the list of classes which were excluded during training and validation
excluded_classes = ['left ventricular hypertrophy',
                    'st depression',
                    'low qrs voltages',
                    's t changes',
                    'sinus tachycardia',
                    't wave abnormal',
                    'right axis deviation']

In [29]:
# Test the model
metrics = valid_epoch(model, test_dl, excluded_classes)

# Prepare the results
test_results = {f'test_{key}': val for key, val in metrics.items() if val is not None}
test_results['test_mean_rocaucs'] = np.mean([val for key, val in metrics.items() if key.endswith('_rocauc') and val is not None])
test_results['test_mean_praucs'] = np.mean([val for key, val in metrics.items() if key.endswith('_prauc') and val is not None])
print('Test:', test_results['test_mean_rocaucs'], test_results['test_mean_praucs'])

# Save to CSV
df = pd.DataFrame([test_results])  # Wrap the dictionary in a list to make it a single-row DataFrame
df.to_csv('test_results.csv', index=False)

  self.pid = os.fork()
100%|██████████| 50/50 [00:31<00:00,  1.57it/s]


Test: 0.4974439401001312 0.17896145948697909


## Download trained model and csv file of results

In [30]:
from google.colab import files

In [31]:
files.download('test_results.csv')

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>