In [1]:
import os
import numpy as np
from torch import nn
from torch.utils.data import Dataset, DataLoader

In [2]:
DATASET_PATH = "./dataset"

In [3]:
x_train = np.expand_dims(np.load(os.path.join(DATASET_PATH, "x_train.npy")), -1)[:100]
y_train = np.expand_dims(np.load(os.path.join(DATASET_PATH, "y_train.npy")), -1)[:100]

x_test = np.expand_dims(np.load(os.path.join(DATASET_PATH, "x_test.npy")), -1)[:25]
y_test = np.expand_dims(np.load(os.path.join(DATASET_PATH, "y_test.npy")), -1)[:25]

x_val = np.expand_dims(np.load(os.path.join(DATASET_PATH, "x_val.npy")), -1)[:10]
y_val = np.expand_dims(np.load(os.path.join(DATASET_PATH, "y_val.npy")), -1)[:10]

In [4]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np

class TransformerModel(nn.Module):
    def __init__(self, input_size, d_model, num_heads, num_layers):
        super(TransformerModel, self).__init__()
        self.input_linear = nn.Linear(input_size, d_model)
        self.transformer = nn.Transformer(d_model=d_model, nhead=num_heads, num_encoder_layers=num_layers, num_decoder_layers=num_layers)
        self.output_linear = nn.Linear(d_model, input_size)
        self.sigmoid = nn.Sigmoid()

    def forward(self, src, tgt):
        src = self.input_linear(src)
        tgt = self.input_linear(tgt)
        src = src.permute(1, 0, 2)  # Transformer expects [seq_len, batch_size, d_model]
        tgt = tgt.permute(1, 0, 2)
        output = self.transformer(src, tgt)
        output = output.permute(1, 0, 2)
        output = self.output_linear(output)
        return self.sigmoid(output)

# Model parameters
input_size = 1
d_model = 64
num_heads = 4
num_layers = 2

model = TransformerModel(input_size, d_model, num_heads, num_layers)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model.to(device)



TransformerModel(
  (input_linear): Linear(in_features=1, out_features=64, bias=True)
  (transformer): Transformer(
    (encoder): TransformerEncoder(
      (layers): ModuleList(
        (0-1): 2 x TransformerEncoderLayer(
          (self_attn): MultiheadAttention(
            (out_proj): NonDynamicallyQuantizableLinear(in_features=64, out_features=64, bias=True)
          )
          (linear1): Linear(in_features=64, out_features=2048, bias=True)
          (dropout): Dropout(p=0.1, inplace=False)
          (linear2): Linear(in_features=2048, out_features=64, bias=True)
          (norm1): LayerNorm((64,), eps=1e-05, elementwise_affine=True)
          (norm2): LayerNorm((64,), eps=1e-05, elementwise_affine=True)
          (dropout1): Dropout(p=0.1, inplace=False)
          (dropout2): Dropout(p=0.1, inplace=False)
        )
      )
      (norm): LayerNorm((64,), eps=1e-05, elementwise_affine=True)
    )
    (decoder): TransformerDecoder(
      (layers): ModuleList(
        (0-1): 2 x Tr

In [5]:
class ECGDataset(Dataset):
  def __init__(self, data, labels):
    super(ECGDataset, self).__init__()
    self.data = data
    self.labels = labels

  def __len__(self):
    return self.data.shape[0]
  
  def __getitem__(self, idx):
    sample = torch.tensor(self.data[idx], dtype=torch.float32)
    label = torch.tensor(self.labels[idx], dtype=torch.float32)
    return sample, label

In [6]:
import torch.optim as optim
import os

os.environ['CUDA_LAUNCH_BLOCKING'] = '1'  # Enable CUDA debugging

dataset = ECGDataset(x_train, y_train)
train_loader = DataLoader(dataset, 1, shuffle=True)

learning_rate = 1e-4
num_epochs = 10

# Loss and optimizer
criterion = nn.BCELoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

# Training loop
for epoch in range(num_epochs):
    model.train()
    optimizer.zero_grad()
    
    for i, (x_train, y_train) in enumerate(train_loader):
        x_train, y_train = x_train.to(device), y_train.to(device)
        output = model(x_train, y_train)
        
        # Check tensor shapes
        # print(f"Epoch [{epoch + 1}/{num_epochs}] - Output shape: {output.shape}, Target shape: {y_train.shape}")

        # Compute loss
        loss = criterion(output, y_train)
        
        # Backward pass and optimization
        loss.backward()
        optimizer.step()
        
    if (epoch + 1) % 10 == 0:
        print(f'Epoch [{epoch + 1}/{num_epochs}], Loss: {loss.item():.4f}')



  attn_output = scaled_dot_product_attention(q, k, v, attn_mask, dropout_p, is_causal)


Epoch [1/10] - Output shape: torch.Size([1, 1000, 1]), Target shape: torch.Size([1, 1000, 1])
Epoch [1/10] - Output shape: torch.Size([1, 1000, 1]), Target shape: torch.Size([1, 1000, 1])
Epoch [1/10] - Output shape: torch.Size([1, 1000, 1]), Target shape: torch.Size([1, 1000, 1])
Epoch [1/10] - Output shape: torch.Size([1, 1000, 1]), Target shape: torch.Size([1, 1000, 1])
Epoch [1/10] - Output shape: torch.Size([1, 1000, 1]), Target shape: torch.Size([1, 1000, 1])
Epoch [1/10] - Output shape: torch.Size([1, 1000, 1]), Target shape: torch.Size([1, 1000, 1])
Epoch [1/10] - Output shape: torch.Size([1, 1000, 1]), Target shape: torch.Size([1, 1000, 1])
Epoch [1/10] - Output shape: torch.Size([1, 1000, 1]), Target shape: torch.Size([1, 1000, 1])
Epoch [1/10] - Output shape: torch.Size([1, 1000, 1]), Target shape: torch.Size([1, 1000, 1])
Epoch [1/10] - Output shape: torch.Size([1, 1000, 1]), Target shape: torch.Size([1, 1000, 1])
Epoch [1/10] - Output shape: torch.Size([1, 1000, 1]), Targe

In [None]:
def test_model(model, test_loader):
    model.eval()  # Set the model to evaluation mode
    total_loss = 0.0

    with torch.no_grad():  # Disable gradient computation
        for x_test, y_test in test_loader:
            x_test, y_test = x_test.to(device), y_test.to(device)
            
            # Forward pass
            output = model(x_test, x_test)  # Using x_test as both src and tgt

            # Compute loss
            loss = criterion(output, y_test)
            total_loss += loss.item()

            # Print some example outputs for inspection
            print(f"Output: {output.squeeze().cpu().numpy()[:10]}")  # Print first 10 values of the output
            print(f"Target: {y_test.squeeze().cpu().numpy()[:10]}")  # Print first 10 values of the target

    avg_loss = total_loss / len(test_loader)
    print(f"Average Test Loss: {avg_loss:.4f}")

# Run the test
test_model(model, test_loader)

In [None]:
test_dataset = ECGDataset(x_test, y_test)
test_loader = DataLoader(test_dataset)

