In [13]:
import numpy as np
import matplotlib.pyplot as plt
from scipy import signal
import wfdb
import pandas as pd
import sys
import torch
import os
from torch.utils.data import Dataset
from torchvision import transforms
from torch.utils.data import DataLoader
import ast
import numpy as np
import statsmodels.api as sm
import HiguchiFractalDimension as hfd
import pywt
from statsmodels.tsa.ar_model import AutoReg


In [2]:
#defenition of data path and excel file path
path = '/home/abhishek/rashad_internship/Physionet/ptb-xl-1.0.3/'
excel = '/home/abhishek/rashad_internship/Physionet/ptb-xl-1.0.3/ptbxl_database.csv'

In [4]:
#custom class definition
import pandas as pd
import numpy as np
import ast
import wfdb
from torch.utils.data import Dataset
from torchvision import transforms
from scipy.signal import butter, filtfilt
from scipy.stats import kurtosis,skew
from statsmodels.tsa.ar_model import AutoReg



class Custom_class(Dataset):
    def __init__(self, excelfile, path, num_data, transform=None, data_split='train', fold=None):
        self.dat = pd.read_csv(excelfile)
        self.col = self.dat['filename_hr']  # only 500 hz files are used for training
        self.label = self.dat['scp_codes']  # used for labeling
        self.strat_fold = self.dat['strat_fold']  # Load strat_fold column
        self.path = path
        self.transform = transform  # Initialize the transform attribute
        self.num_data = num_data
        self.data_split = data_split
        self.fold = fold

        if self.data_split == 'train':
            self.indices = [idx for idx in range(self.num_data) if (self.strat_fold[idx] != fold)]
        elif self.data_split == 'test':
            self.indices = [idx for idx in range(self.num_data) if (self.strat_fold[idx] == fold)]
        elif self.data_split == 'val':
            self.indices = [idx for idx in range(self.num_data) if (self.strat_fold[idx] == fold)]

    def __len__(self):
        return len(self.indices)

    def __getitem__(self, idx):
        idx = self.indices[idx]  # Adjust index to match filtered data
        y, _ = wfdb.rdsamp(self.path + self.col[idx])  # Use channel 0
        y = y.astype(np.float32)
        y = np.transpose(y)

        # Apply filtering
        y = self.bandpass_filter(y, 1, 47, 500)  # applying BPF

        # Normalize using z-score
        y = self.z_score_normalize(y)
        y = y.astype(np.float32)
        scp_code_dict = ast.literal_eval(self.label[idx])  # Fetching label from the scp_codes column

        # Check if the first key is 'NORM' and assign the label accordingly
        first_key = max(scp_code_dict, key=scp_code_dict.get)  # one key in scp_code dictionary with highest value is considered as label
        label = 0 if first_key == 'NORM' else 1  # if label is NORM then encoded as 1 else 0

        if self.transform:
            y = self.transform(y)

        return y[0, :, :],label

    def bandpass_filter(self, data, lowcut, highcut, fs, order=3):
        nyquist = 0.5 * fs
        low = lowcut / nyquist
        high = highcut / nyquist
        b, a = butter(order, [low, high], btype='band')
        y = filtfilt(b, a, data, axis=1)
        return y

    def z_score_normalize(self, data):
        mean = np.mean(data, axis=1, keepdims=True)
        std = np.std(data, axis=1, keepdims=True)
        normalized_data = (data - mean) / std
        return normalized_data
    def calculate_ar_coefficients(self, data, order):
        ar_coeffs = []
        for i in range(data.shape[0]):  # iterate over each channel
            model = AutoReg(data[i], lags=order)
            model_fit = model.fit()
            ar_coeffs.append(model_fit.params)
        ar_coeffs = np.array(ar_coeffs)
        return ar_coeffs

# Example usage
transform = transforms.Compose([
    transforms.ToTensor(),
])

# For training data, TOTAL 7000 data is used for TRAINING AND TESTING
train_dataset = Custom_class(excel, path, num_data=7000, transform=transform, data_split='train',fold=10)

# For test data
test_dataset = Custom_class(excel, path, num_data=7000, transform=transform, data_split='test',fold=10)

# For validation data
val_dataset = Custom_class(excel, path, num_data=7000, transform=transform, data_split='val',fold=10)



In [5]:
print(f"Number of data in training set : {len(train_dataset)}")
print(f"Number of data in training set :{len(val_dataset)}")

Number of data in training set : 6171
Number of data in training set :829


In [7]:
#MODEL DEFINITION
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
from scipy.stats import skew, kurtosis




# Define the Res_Block_1
class ResBlock1(nn.Module):
    def __init__(self, in_channels, out_channels):
        super(ResBlock1, self).__init__()
        self.conv1 = nn.Conv1d(in_channels, out_channels, kernel_size=2, stride=2, padding=1)
        self.bn1 = nn.BatchNorm1d(out_channels)
        self.conv2 = nn.Conv1d(out_channels, out_channels, kernel_size=3, stride=1, padding=1)
        self.bn2 = nn.BatchNorm1d(out_channels)
        self.conv3 = nn.Conv1d(out_channels, out_channels, kernel_size=1, stride=1, padding=0)
        self.bn3 = nn.BatchNorm1d(out_channels)
        self.adjust_channels = nn.Conv1d(in_channels, out_channels, kernel_size=2, stride=2, padding=1)
        self.adjust_bn = nn.BatchNorm1d(out_channels)

    def forward(self, x):
        shortcut = self.adjust_channels(x)
        shortcut = self.adjust_bn(shortcut)

        x = F.leaky_relu(self.bn1(self.conv1(x)))
        x = F.leaky_relu(self.bn2(self.conv2(x)))
        x = self.bn3(self.conv3(x))
        x = x + shortcut
        x = F.leaky_relu(x)
        return x

# Define the Res_Block_2
class ResBlock2(nn.Module):
    def __init__(self, in_channels, out_channels):
        super(ResBlock2, self).__init__()
        self.conv1 = nn.Conv1d(in_channels, out_channels, kernel_size=1, stride=1, padding=0)
        self.bn1 = nn.BatchNorm1d(out_channels)
        self.conv2 = nn.Conv1d(out_channels, out_channels, kernel_size=3, stride=1, padding=1)
        self.bn2 = nn.BatchNorm1d(out_channels)
        self.conv3 = nn.Conv1d(out_channels, out_channels, kernel_size=1, stride=1, padding=0)
        self.bn3 = nn.BatchNorm1d(out_channels)
        self.adjust_channels = nn.Conv1d(in_channels, out_channels, kernel_size=1, stride=1, padding=0)
        self.adjust_bn = nn.BatchNorm1d(out_channels)

    def forward(self, x):
        shortcut = self.adjust_channels(x)
        shortcut = self.adjust_bn(shortcut)

        x = F.leaky_relu(self.bn1(self.conv1(x)))
        x = F.leaky_relu(self.bn2(self.conv2(x)))
        x = self.bn3(self.conv3(x))
        x = x + shortcut
        x = F.leaky_relu(x)
        return x

# Define the complete ResNet-50 model with Self-Attention
class ResNet50(nn.Module):
    def __init__(self, input_channels=12):
        super(ResNet50, self).__init__()
        self.conv1 = nn.Conv1d(input_channels, 64, kernel_size=7, stride=2, padding=3)
        self.bn1 = nn.BatchNorm1d(64)
        self.maxpool = nn.MaxPool1d(kernel_size=3, stride=2, padding=1)

        self.layer1 = self._make_layer(ResBlock1, 64, 128, 1)
        self.layer2 = self._make_layer(ResBlock2, 128, 128, 2)
        self.layer3 = self._make_layer(ResBlock1, 128, 256, 1)
        self.layer4 = self._make_layer(ResBlock2, 256, 256, 3)
        self.layer5 = self._make_layer(ResBlock1, 256, 512, 1)
        self.layer6 = self._make_layer(ResBlock2, 512, 512, 5)
        self.layer7 = self._make_layer(ResBlock1, 512, 1024, 1)
        self.layer8 = self._make_layer(ResBlock2, 1024, 1024, 2)
        self.avgpool = nn.AdaptiveAvgPool1d(1)
        self.dropout = nn.Dropout(p=0.5) #added to improve generalization
        

    def _make_layer(self, block, in_channels, out_channels, blocks):
        layers = []
        layers.append(block(in_channels, out_channels))
        for _ in range(1, blocks):
            layers.append(block(out_channels, out_channels))
        return nn.Sequential(*layers)

    def forward(self, x):
        x = self.conv1(x)
        x = self.bn1(x)
        x = F.relu(x)
        x = self.maxpool(x)

        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)
        x = self.layer5(x)
        x = self.layer6(x)
        x = self.layer7(x)
        x = self.layer8(x)
        
        x = self.avgpool(x)
        x = torch.flatten(x, 1)
        return x




In [9]:


def calculate_ar_coefficients(data, order, device):
    batch_size, channels, sequence_length = data.size()
    
    ar_coeffs = []
    
    for i in range(batch_size):
        channel_coeffs = []
        for j in range(channels):
            channel_data = data[i, j].cpu().numpy()
            model = AutoReg(channel_data, lags=order)
            model_fit = model.fit()
            mean_ar_coeff = np.mean(model_fit.params)
            channel_coeffs.append([mean_ar_coeff])
        
        ar_coeffs.append(channel_coeffs)
    
    ar_tensor = torch.tensor(ar_coeffs, dtype=torch.float32).to(device)
    return ar_tensor

def compute_higuchi_fractal_dimension(data, device):
    batch_size, channels, sequence_length = data.size()

    hfd_values = []
    
    for i in range(batch_size):
        channel_hfds = []
        for j in range(channels):
            channel_data = data[i, j].cpu().numpy()
            hfd_value = hfd.hfd(channel_data)
            channel_hfds.append([hfd_value])
        
        hfd_values.append(channel_hfds)
    
    hfd_tensor = torch.tensor(hfd_values, dtype=torch.float32).to(device)
    return hfd_tensor

def compute_wavelet_variance(data, device):
    batch_size, channels, sequence_length = data.size()
    
    wavelet_var = []
    
    for i in range(batch_size):
        channel_vars = []
        for j in range(channels):
            channel_data = data[i, j].cpu().numpy()
            coeffs = pywt.wavedec(channel_data, 'db2')
            variances = [np.var(c) for c in coeffs]
            variances = np.mean(variances)
            channel_vars.append([variances])
        
        wavelet_var.append(channel_vars)
    
    wavelet_var_tensor = torch.tensor(wavelet_var, dtype=torch.float32).to(device)
    return wavelet_var_tensor

def compute_statistical_features(data, device):
    data = data.to(device).float()
    
    mean = torch.mean(data, dim=-1, keepdim=True).to(device)
    variance = torch.var(data, dim=-1, keepdim=True).to(device)
    
    kurtosis = torch.mean((data - mean)**4, dim=-1, keepdim=True) / (variance**2)
    
    ar_order = 70
    ar_coeffs = calculate_ar_coefficients(data, ar_order, device)
    hfd_values = compute_higuchi_fractal_dimension(data, device)
    wavelet_var = compute_wavelet_variance(data, device)
    
    statistical_features = torch.cat([mean, variance, kurtosis, hfd_values, ar_coeffs, wavelet_var], dim=-1)
    statistical_features = statistical_features.flatten(1)
    return statistical_features


In [10]:
class CombinedModel(nn.Module):
    def __init__(self, feature_extractor, num_stat_features):
        super(CombinedModel, self).__init__()
        self.feature_extractor = feature_extractor
        self.fc_combined = nn.Linear(1024 + num_stat_features, 512)  # Adjust 128 to match the feature size
        self.fc2 = nn.Linear(512, 128)
        self.fc3 = nn.Linear(128, 1)
    def forward(self, x, stat_features):
        cnn_features = self.feature_extractor(x)
        combined_features = torch.cat([cnn_features, stat_features], dim=-1)
        x = self.fc_combined(combined_features)
        x = self.fc2(x)
        x = self.fc3(x)
        x = torch.sigmoid(x)
        x = torch.squeeze(x)
        return x

In [14]:
train_dataloader = DataLoader(train_dataset, batch_size=32, shuffle=True,num_workers=2)
test_dataloader = DataLoader(test_dataset, batch_size=32, shuffle=True,num_workers=2)
validation_dataloader = DataLoader(val_dataset, batch_size=32, shuffle=True,num_workers=2)

In [15]:
feature_extractor = ResNet50(input_channels=12)
model = CombinedModel(feature_extractor, 72)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = model.to(device)

In [16]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.optim import lr_scheduler
from tqdm import tqdm
import copy
from scipy.stats import skew, kurtosis

# model.load_state_dict(torch.load("/home/abhishek/rashad_internship/ecg_classification_using_resnet/best_model.pth"))

# Define your optimizer and scheduler
criterion = nn.BCELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.0001)
scheduler = lr_scheduler.StepLR(optimizer, step_size=7, gamma=0.1)

# Define your data loaders (train_dataloader, validation_dataloader)

num_epochs = 35
best_model_wts = copy.deepcopy(model.state_dict())
best_acc = 0.0

# Define your validation function
def validate_model(model, dataloader, criterion):
    model.eval()  # Set model to evaluation mode
    running_loss = 0.0
    running_corrects = 0

    with torch.no_grad():
        for inputs,labels in tqdm(dataloader, desc="Validation", leave=False):
            inputs = inputs.to(device)
            labels = labels.to(device)
            labels = labels.float()
            stat_features = compute_statistical_features(inputs,device)


        # Forward pass
            outputs = model(inputs,stat_features)
            preds = torch.round(outputs)
            loss = criterion(outputs, labels)

            running_loss += loss.item() * inputs.size(0)
            running_corrects += torch.sum(preds == labels.data)

    epoch_loss = running_loss / len(dataloader.dataset)
    epoch_acc = running_corrects.double() / len(dataloader.dataset)
    
    return epoch_loss, epoch_acc

# Training loop
for epoch in range(num_epochs):
    model.train()  # Set model to training mode
    running_loss = 0.0
    running_corrects = 0

    print(f'Epoch {epoch+1}/{num_epochs}')
    
    for inputs, labels in tqdm(train_dataloader, desc="Training"):
        inputs = inputs.to(device)
        labels = labels.to(device)
        labels = labels.float()

        stat_features = compute_statistical_features(inputs,device)

        optimizer.zero_grad()

        # Forward pass
        outputs = model(inputs,stat_features)
        preds = torch.round(outputs)

        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item() * inputs.size(0)
        running_corrects += torch.sum(preds == labels.data)
    
    epoch_loss = running_loss / len(train_dataloader.dataset)
    epoch_acc = running_corrects.double() / len(train_dataloader.dataset)
    scheduler.step()

    print(f'Training - Epoch {epoch+1}/{num_epochs}, Loss: {epoch_loss:.4f}, Accuracy: {epoch_acc:.4f}')

    # Validate the model
    val_loss, val_acc = validate_model(model, validation_dataloader, criterion)
    print(f'Validation - Epoch {epoch+1}/{num_epochs}, Loss: {val_loss:.4f}, Accuracy: {val_acc:.4f}')

    # Deep copy the model if the current validation accuracy is the best so far
    if val_acc > best_acc:
        best_acc = val_acc
        best_model_wts = copy.deepcopy(model.state_dict())
        # Save the best model
        torch.save(model.state_dict(), "best_model.pth")

# Load best model weights
model.load_state_dict(best_model_wts)

print(f"Training complete. Best validation accuracy: {best_acc:.4f}")


Epoch 1/35


Training:  22%|██▏       | 43/193 [03:14<10:54,  4.36s/it]

In [None]:
import torch
import numpy as np
from scipy.stats import skew, kurtosis
from sklearn.metrics import accuracy_score, confusion_matrix

# Load your model class and initialize the model
# Assuming ResNet50 is your model class
model = ResNet50(input_channels=12, num_classes=6)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = model.to(device)
model.load_state_dict(torch.load('/home/abhishek/rashad_internship/ecg_classification_using_resnet/best_model.pth'))

def test_model(model, test_loader):
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    model.eval()  # Set the model to evaluation mode
    all_preds = []
    all_labels = []

    with torch.no_grad():
        for data,labels in test_loader:
            data, labels = data.to(device),labels.to(device)
           
            stats = compute_statistical_features(inputs,device)
            # Forward pass
            outputs = model(data, stats)
            preds = torch.round(outputs)
            
            
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())

    # Calculate accuracy
    accuracy = accuracy_score(all_labels, all_preds)

    # Calculate confusion matrix
    cm = confusion_matrix(all_labels, all_preds)

    return accuracy, cm

# Example usage:
# Assuming `model` is your PyTorch model and `test_loader` is your test data loader
accuracy, confusion_matrix = test_model(model, validation_dataloader)
print("Accuracy:", accuracy)
print("Confusion Matrix:")
print(confusion_matrix)


In [None]:
import seaborn as sns

# Plot confusion matrix
plt.figure(figsize=(8, 6))
sns.heatmap(confusion_matrix, annot=True, fmt='d', cmap='Blues', cbar=False)
plt.xlabel('Predicted Labels')
plt.ylabel('True Labels')
plt.title('Confusion Matrix')
plt.show()