In [29]:
# Part 3 & 4
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import datasets, transforms
from torch.utils.data import DataLoader
from torch.nn.utils import prune 
import time
import torch.nn.functional as F

#from fvcore.nn import FlopCountAnalysis, parameter_count_table

class TransformerBlock(nn.Module):
    def __init__(self, embed_size, heads):
        super(TransformerBlock, self).__init__()
        self.multi_att = nn.MultiheadAttention(embed_dim=embed_size, num_heads=heads)
        self.feed_forward = nn.Sequential(
            nn.Linear(embed_size, 2 * embed_size),
            nn.ReLU(),
            nn.Linear(2 * embed_size, embed_size)
        )
        self.norm1 = nn.LayerNorm(embed_size)
        self.norm2 = nn.LayerNorm(embed_size)

    def forward(self, value, key, query):
        attention = self.multi_att(query, key, value)[0]
        x = self.norm1(attention + query)
        forward = self.feed_forward(x)
        out = self.norm2(forward + x)
        return out

class TransformerModel(nn.Module):
    def __init__(self, embed_size=128, heads=8, num_layers=3, input_dim=784):
        super(TransformerModel, self).__init__()
        self.input_projection = nn.Linear(input_dim, embed_size)
        self.layers = nn.ModuleList([TransformerBlock(embed_size, heads) for _ in range(num_layers)])
        self.out = nn.Linear(embed_size, 10)  # MNIST has 10 classes

    def forward(self, x):
        x = x.view(x.size(0), -1)  # Flatten the image
        x = self.input_projection(x)  # Project input to the embedding size
        query = key = value = x.unsqueeze(0)
        for layer in self.layers:
            x = layer(value, key, query)
        x = x.squeeze(0)
        x = self.out(x)
        return x

# Assume the rest of your code (data loading, training loop, etc.) is correct and follows this pattern.

# Load MNIST data
def load_mnist_data():
    transform = transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize((0.1307,), (0.3081,))
    ])
    train_dataset = datasets.MNIST(root='./data', train=True, download=True, transform=transform)
    test_dataset = datasets.MNIST(root='./data', train=False, download=True, transform=transform)
    train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
    test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)
    return train_loader, test_loader

# Pruning function to remove channels based on weight magnitude
def prune_channels(model, amount=0.2):
    for name, module in model.named_modules():
        if isinstance(module, nn.Linear):
            prune.l1_unstructured(module, name='weight', amount=amount)

# Training function
def train_model(model, train_loader, epochs, device, optimizer, criterion, prune=False, start_pruning_after=5, pruning_increment=0.1):
    total_pruning_rate = 0
    for epoch in range(epochs):
        start_time = time.time()
        model.train()
        for data, target in train_loader:
            data, target = data.to(device), target.to(device)
            optimizer.zero_grad()
            output = model(data)
            loss = criterion(output, target)
            loss.backward()
            optimizer.step()

        if prune and epoch >= start_pruning_after:
            if total_pruning_rate < 0.5:  # Ensure not pruning more than 50%
                prune_channels(model, amount=pruning_increment)
                total_pruning_rate += pruning_increment
                
        elapsed_time = time.time() - start_time
        print(f'Epoch {epoch+1}, Loss: {loss.item()}, Time elapsed: {elapsed_time:.2f} seconds')
        if epoch % 5 == 0 or epoch == epochs - 1:  # Evaluate every 5 epochs and at the end
            accuracy = evaluate_model(model, test_loader, device)
            print(f'Accuracy after Epoch {epoch+1}: {accuracy}%')


def entropy(predictions):
    """Calculates the entropy of the prediction probability distribution."""
    p_log_p = predictions * torch.log(predictions + 1e-9)  # Adding a small epsilon to prevent log(0)
    return -p_log_p.sum(dim=1).mean()  # Sum over classes and average over the batch
    
# Evaluation function
def evaluate_model(model, test_loader, device):
    model.eval()
    correct = 0
    total = 0
    total_entropy = 0
    with torch.no_grad():
        for data, target in test_loader:
            data, target = data.to(device), target.to(device)
            outputs = model(data)
            probabilities = F.softmax(outputs, dim=1)
            total_entropy += entropy(probabilities).item()  # Calculate entropy

            pred = probabilities.argmax(dim=1, keepdim=True)
            correct += pred.eq(target.view_as(pred)).sum().item()
    accuracy = 100. * correct / len(test_loader.dataset)
    average_entropy = total_entropy / len(test_loader)
    return accuracy, average_entropy

#def calculate_flops(model, input_size):
#    inputs = torch.randn(1, *input_size)
#    flop_analysis = FlopCountAnalysis(model, inputs)
#    total_flops = flop_analysis.total()
#    print(f"Total FLOPs: {total_flops}")
#    return total_flops

if __name__ == "__main__":
    train_loader, test_loader = load_mnist_data()
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = TransformerModel().to(device)
    optimizer = optim.Adam(model.parameters(), lr=0.001)
    criterion = nn.CrossEntropyLoss()

    # Train without pruning
    print("Training baseline model...")
    train_model(model, train_loader, epochs=10, device=device, optimizer=optimizer, criterion=criterion)
    baseline_accuracy = evaluate_model(model, test_loader, device)

    # Reset and train with pruning
    #model = TransformerModel().to(device)
    #print("Training pruned model...")
    #train_model(model, train_loader, epochs=10, device=device, optimizer=optimizer, criterion=criterion, prune=True)
    #pruned_accuracy = evaluate_model(model, test_loader, device)
    #flops = calculate_flops(model, (1, 784))  # Assuming batch size of 1 for simplicity

Training baseline model...
Epoch 1, Loss: 0.06947039812803268, Time elapsed: 34.79 seconds
Accuracy after Epoch 1: (95.88, 0.1297375554034409)%
Epoch 2, Loss: 0.19341208040714264, Time elapsed: 47.14 seconds
Epoch 3, Loss: 0.1235053539276123, Time elapsed: 44.46 seconds
Epoch 4, Loss: 0.05402369052171707, Time elapsed: 59.55 seconds
Epoch 5, Loss: 0.0843316912651062, Time elapsed: 45.93 seconds
Epoch 6, Loss: 0.008823173120617867, Time elapsed: 42.91 seconds
Accuracy after Epoch 6: (97.29, 0.06686426146988408)%
Epoch 7, Loss: 0.05991634353995323, Time elapsed: 44.69 seconds
Epoch 8, Loss: 0.01232814323157072, Time elapsed: 52.79 seconds
Epoch 9, Loss: 0.016949331387877464, Time elapsed: 44.76 seconds
Epoch 10, Loss: 0.09803974628448486, Time elapsed: 41.30 seconds
Accuracy after Epoch 10: (97.57, 0.048369932942614616)%


In [30]:
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import datasets, transforms
from torch.utils.data import DataLoader

# Define Transformer block with gated attention
class GatedAttention(nn.Module):
    def __init__(self, embed_size):
        super(GatedAttention, self).__init__()
        self.gate = nn.Linear(embed_size, embed_size)

    def forward(self, x):
        gating_weights = torch.sigmoid(self.gate(x))
        return x * gating_weights

class TransformerBlock(nn.Module):
    def __init__(self, embed_size, heads):
        super(TransformerBlock, self).__init__()
        self.multi_att = nn.MultiheadAttention(embed_dim=embed_size, num_heads=heads)
        self.gating = GatedAttention(embed_size)
        self.feed_forward = nn.Sequential(
            nn.Linear(embed_size, 2 * embed_size),
            nn.ReLU(),
            nn.Linear(2 * embed_size, embed_size)
        )
        self.norm1 = nn.LayerNorm(embed_size)
        self.norm2 = nn.LayerNorm(embed_size)

    def forward(self, value, key, query):
        value = self.gating(value)
        key = self.gating(key)
        query = self.gating(query)
        attention = self.multi_att(query, key, value)[0]
        x = self.norm1(attention + query)
        forward = self.feed_forward(x)
        out = self.norm2(forward + x)
        return out

class TransformerModel(nn.Module):
    def __init__(self, embed_size=128, heads=8, num_layers=3, input_dim=784):
        super(TransformerModel, self).__init__()
        self.input_projection = nn.Linear(input_dim, embed_size)
        self.layers = nn.ModuleList([TransformerBlock(embed_size, heads) for _ in range(num_layers)])
        self.out = nn.Linear(embed_size, 10)

    def forward(self, x):
        x = x.view(x.size(0), -1)
        x = self.input_projection(x)
        query = key = value = x.unsqueeze(0)
        for layer in self.layers:
            x = layer(value, key, query)
        x = x.squeeze(0)
        x = self.out(x)
        return x

def load_mnist_data():
    transform = transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize((0.1307,), (0.3081,))
    ])
    train_dataset = datasets.MNIST(root='./data', train=True, download=True, transform=transform)
    test_dataset = datasets.MNIST(root='./data', train=False, download=True, transform=transform)
    train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
    test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)
    return train_loader, test_loader

def evaluate_model(model, test_loader, device):
    model.eval()
    correct = 0
    total = 0
    total_entropy = 0
    with torch.no_grad():
        for data, target in test_loader:
            data, target = data.to(device), target.to(device)
            outputs = model(data)
            probabilities = F.softmax(outputs, dim=1)
            total_entropy += entropy(probabilities).item()  # Calculate entropy

            pred = probabilities.argmax(dim=1, keepdim=True)
            correct += pred.eq(target.view_as(pred)).sum().item()
    accuracy = 100. * correct / len(test_loader.dataset)
    average_entropy = total_entropy / len(test_loader)
    return accuracy, average_entropy
    
def entropy(predictions):
    """Calculates the entropy of the prediction probability distribution."""
    p_log_p = predictions * torch.log(predictions + 1e-9)  # Adding a small epsilon to prevent log(0)
    return -p_log_p.sum(dim=1).mean()  # Sum over classes and average over the batch

def train_model(model, train_loader, test_loader, epochs, device, optimizer, criterion):
    for epoch in range(epochs):
        start_time = time.time()
        model.train()
        for data, target in train_loader:
            data, target = data.to(device), target.to(device)
            optimizer.zero_grad()
            output = model(data)
            loss = criterion(output, target)
            loss.backward()
            optimizer.step()
        elapsed_time = time.time() - start_time
        print(f'Epoch {epoch+1}, Loss: {loss.item()}, Time elapsed: {elapsed_time:.2f} seconds')
        if epoch % 5 == 0 or epoch == epochs - 1:  # Evaluate every 5 epochs and at the end
            accuracy = evaluate_model(model, test_loader, device)
            print(f'Accuracy after Epoch {epoch+1}: {accuracy}%')

if __name__ == "__main__":
    train_loader, test_loader = load_mnist_data()
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = TransformerModel().to(device)
    optimizer = optim.Adam(model.parameters(), lr=0.001)
    criterion = nn.CrossEntropyLoss()

    print("Training model with gated attention...")
    train_model(model, train_loader, test_loader, epochs=10, device=device, optimizer=optimizer, criterion=criterion)


Training model with gated attention...
Epoch 1, Loss: 0.04826178029179573, Time elapsed: 43.07 seconds
Accuracy after Epoch 1: (95.98, 0.1126326176162345)%
Epoch 2, Loss: 0.07568582892417908, Time elapsed: 48.80 seconds
Epoch 3, Loss: 0.03277347981929779, Time elapsed: 47.38 seconds
Epoch 4, Loss: 0.021255837753415108, Time elapsed: 73.80 seconds
Epoch 5, Loss: 0.053934164345264435, Time elapsed: 75.29 seconds
Epoch 6, Loss: 0.15232841670513153, Time elapsed: 44.05 seconds
Accuracy after Epoch 6: (97.17, 0.061817129828428595)%
Epoch 7, Loss: 0.04766477644443512, Time elapsed: 43.51 seconds
Epoch 8, Loss: 0.09622547775506973, Time elapsed: 51.17 seconds
Epoch 9, Loss: 0.11222010850906372, Time elapsed: 48.64 seconds
Epoch 10, Loss: 0.1545521467924118, Time elapsed: 43.76 seconds
Accuracy after Epoch 10: (96.94, 0.06794376086187773)%


In [2]:
# Part 1.1 / 1.2 SECOND

import pandas as pd
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from torch.nn import LayerNorm
import torch.optim as optim

class ECGDataset(Dataset):
    def __init__(self, data, labels):
        self.data = data
        self.labels = labels

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        sample = self.data[idx]
        if sample.ndim == 1:
            sample = sample.unsqueeze(0)  # Ensure there's a sequence dimension
        return sample, self.labels[idx]

def load_data(file_path):
    df = pd.read_csv(file_path)
    df = df.dropna()  # Remove rows with NaN values

    X = df.iloc[:, :-1].values
    y = df.iloc[:, -1].values

    scaler = StandardScaler()
    X_scaled = scaler.fit_transform(X)

    return X_scaled, y

def prepare_loaders(X, y, batch_size=64, test_size=0.2, random_state=42):
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=test_size, random_state=random_state)
    
    train_data = torch.tensor(X_train, dtype=torch.float32)
    test_data = torch.tensor(X_test, dtype=torch.float32)
    train_labels = torch.tensor(y_train, dtype=torch.long)
    test_labels = torch.tensor(y_test, dtype=torch.long)

    train_dataset = ECGDataset(train_data, train_labels)
    test_dataset = ECGDataset(test_data, test_labels)

    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

    return train_loader, test_loader

class SelectiveSSM(nn.Module):
    def __init__(self, input_dim, hidden_dim):
        super(SelectiveSSM, self).__init__()
        self.hidden_dim = hidden_dim
        self.selective_gate = nn.Linear(input_dim, hidden_dim)
        self.update_gate = nn.Linear(input_dim, hidden_dim)
        self.state_transform = nn.Linear(hidden_dim, hidden_dim)

        self.layer_norm = LayerNorm(hidden_dim)  # Layer normalization
        
    def forward(self, x, hidden):
        #x_norm = self.layer_norm(x)  # Normalize inputs
        
        selectivity = torch.sigmoid(self.selective_gate(x))
        updates = torch.tanh(self.update_gate(x))
        #hidden = selectivity * updates + (1 - selectivity) * self.state_transform(hidden)

        hidden_updated = selectivity * updates + (1 - selectivity) * self.state_transform(hidden)
        hidden = self.layer_norm(hidden_updated)  # Normalize the updated hidden state
        return hidden

class MambaModel(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim):
        super(MambaModel, self).__init__()
        self.ssm = SelectiveSSM(input_dim, hidden_dim)
        self.classifier = nn.Linear(hidden_dim, output_dim)
        self.hidden_dim = hidden_dim

    def forward(self, x):
        hidden = torch.zeros(x.size(0), self.hidden_dim, device=x.device)
        for t in range(x.size(1)):
            hidden = self.ssm(x[:, t, :], hidden)
        out = self.classifier(hidden)
        return out

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = MambaModel(input_dim=187, hidden_dim=256, output_dim=5)  # Corrected input_dim to match your dataset
model.to(device)

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

def train_model(train_loader, model, criterion, optimizer, num_epochs=10):
    model.train()
    for epoch in range(num_epochs):
        for data, labels in train_loader:
            data, labels = data.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(data)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
        print(f'Epoch {epoch+1}, Loss: {loss.item()}')

# Example usage
file_path = 'C:/Users/rupin/Downloads/archive/mitbih_train.csv'
X, y = load_data(file_path)
train_loader, test_loader = prepare_loaders(X, y)



In [3]:
# Part 1.2 
import torch
import numpy as np
from sklearn.metrics import accuracy_score, classification_report

def evaluate_model(test_loader, model):
    model.eval()  # Set the model to evaluation mode
    true_labels = []
    predicted_labels = []

    with torch.no_grad():  # Inference mode, no gradients needed
        for data, labels in test_loader:
            data, labels = data.to(device), labels.to(device)
            outputs = model(data)
            _, predicted = torch.max(outputs, 1)
            predicted_labels.extend(predicted.cpu().numpy())
            true_labels.extend(labels.cpu().numpy())

    accuracy = accuracy_score(true_labels, predicted_labels)
    report = classification_report(true_labels, predicted_labels)
    return accuracy, report

# Training the model
train_model(train_loader, model, criterion, optimizer, num_epochs=10)

# Evaluating the model
accuracy, report = evaluate_model(test_loader, model)
print(f'Accuracy on Test Set: {accuracy}')
print('Classification Report:')
print(report)


Epoch 1, Loss: 0.22339175641536713
Epoch 2, Loss: 0.05657535418868065
Epoch 3, Loss: 0.04412178695201874
Epoch 4, Loss: 0.006150183733552694
Epoch 5, Loss: 0.011675630696117878
Epoch 6, Loss: 0.013105016201734543
Epoch 7, Loss: 0.14350268244743347
Epoch 8, Loss: 0.10360483080148697
Epoch 9, Loss: 0.05790698900818825
Epoch 10, Loss: 0.024175236001610756
Accuracy on Test Set: 0.9790417451887385
Classification Report:
              precision    recall  f1-score   support

           0       0.99      0.99      0.99     14577
           1       0.89      0.69      0.78       418
           2       0.94      0.93      0.94      1120
           3       0.87      0.73      0.79       152
           4       0.97      0.99      0.98      1244

    accuracy                           0.98     17511
   macro avg       0.93      0.87      0.90     17511
weighted avg       0.98      0.98      0.98     17511



In [4]:
class SelectiveSSM(nn.Module):
    def __init__(self, input_dim, hidden_dim):
        super(SelectiveSSM, self).__init__()
        self.hidden_dim = hidden_dim
        self.selective_gate = nn.Linear(input_dim, hidden_dim)
        self.update_gate = nn.Linear(input_dim, hidden_dim)
        self.state_transform = nn.Linear(hidden_dim, hidden_dim)

        #self.layer_norm = LayerNorm(hidden_dim)  # Layer normalization
        
    def forward(self, x, hidden):
        
        selectivity = torch.sigmoid(self.selective_gate(x))
        updates = torch.tanh(self.update_gate(x))
        hidden = selectivity * updates + (1 - selectivity) * self.state_transform(hidden)
        return hidden

train_model(train_loader, model, criterion, optimizer, num_epochs=10)

# Evaluating the model
accuracy, report = evaluate_model(test_loader, model)
print(f'Accuracy on Test Set: {accuracy}')
print('Classification Report:')
print(report)


Epoch 1, Loss: 0.008746728301048279
Epoch 2, Loss: 0.012450456619262695
Epoch 3, Loss: 0.025245103985071182
Epoch 4, Loss: 0.10159114003181458
Epoch 5, Loss: 0.0031929181423038244
Epoch 6, Loss: 0.003471081145107746
Epoch 7, Loss: 0.004381862469017506
Epoch 8, Loss: 0.013778313994407654
Epoch 9, Loss: 0.16024957597255707
Epoch 10, Loss: 0.0021379387471824884
Accuracy on Test Set: 0.9792701730340928
Classification Report:
              precision    recall  f1-score   support

           0       0.99      0.99      0.99     14577
           1       0.86      0.72      0.78       418
           2       0.94      0.92      0.93      1120
           3       0.86      0.71      0.78       152
           4       0.99      0.98      0.99      1244

    accuracy                           0.98     17511
   macro avg       0.93      0.87      0.89     17511
weighted avg       0.98      0.98      0.98     17511

