In [1]:
import pathlib

import torch
import torch.nn.functional as F
import torch.optim as optim
from dataset import SequenceDataset
from sequence_transformations import TransformationRefined
from torch import Tensor, nn
from torch.utils.data import DataLoader, random_split
from torch.utils.tensorboard import SummaryWriter
from training_loop import fit

In [2]:
print("GPU available:", torch.cuda.is_available())
print("Device id:", torch.cuda.current_device())
print("GPU:", torch.cuda.get_device_name(torch.cuda.current_device()))

GPU available: True
Device id: 0
GPU: Quadro T1000 with Max-Q Design


In [3]:
device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")

In [4]:
DATASET_PATH = pathlib.Path("../data/classification/data.csv")
BATCH_SIZE = 64
SEQUENCE_LEN = 500

In [5]:
def count_trainable_parameters(model: nn.Module) -> int:
    return sum(p.numel() for p in model.parameters() if p.requires_grad)

# "Refined" Represenation

In [17]:
dataset = SequenceDataset(DATASET_PATH, TransformationRefined())

train_size = int(0.8 * len(dataset))
val_size = len(dataset) - train_size

train_dataset, val_dataset = random_split(
    dataset,
    [train_size, val_size],
)


train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=(BATCH_SIZE * 2), shuffle=False)

## Baseline

In [6]:
class LogisticRegression(nn.Module):
    def __init__(self, sequence_len: int) -> None:
        super(LogisticRegression, self).__init__()
        self.linear = nn.Linear(in_features=sequence_len * 2, out_features=1)

    def forward(self, x: Tensor) -> Tensor:  # B, T, C
        x = x.view(x.size(0), -1)  # B, T * C
        x = F.sigmoid(self.linear(x))  # B, 1
        x = x.squeeze()  # B
        return x

In [7]:
logistic_regression_model = LogisticRegression(SEQUENCE_LEN).to(device)
opt = optim.Adam(logistic_regression_model.parameters(), lr=0.02)

In [13]:
writer = SummaryWriter("runs/logistic_regression_refined")
fit(
    epochs=2,
    model=logistic_regression_model,
    loss_func=F.binary_cross_entropy,
    opt=opt,
    train_dl=train_loader,
    valid_dl=val_loader,
    writer=writer,
    device=device,
)
writer.flush()

100%|██████████| 2/2 [00:10<00:00,  5.28s/it]


# LSTM

In [10]:
class LSTMClassifier(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim, num_layers):
        super(LSTMClassifier, self).__init__()
        self.hidden_dim = hidden_dim
        self.num_layers = num_layers

        # Define the LSTM layer
        self.lstm = nn.LSTM(input_dim, hidden_dim, num_layers, batch_first=True)

        # Define a fully connected layer
        self.fc = nn.Linear(hidden_dim, output_dim)

    def forward(self, x: Tensor) -> Tensor:
        # Initialize hidden state and cell state with zeros
        h0 = torch.zeros(
            self.num_layers, x.size(0), self.hidden_dim, dtype=torch.float32
        ).to(x.device)
        c0 = torch.zeros(
            self.num_layers, x.size(0), self.hidden_dim, dtype=torch.float32
        ).to(x.device)

        # Forward propagate the LSTM
        out, _ = self.lstm(
            x, (h0, c0)
        )  # out: tensor of shape (batch_size, seq_length, hidden_dim)

        # Get the last time step's output
        out = out[:, -1, :]  # out: tensor of shape (batch_size, hidden_dim)

        # Pass the output through the fully connected layer
        out = self.fc(out)

        # Apply sigmoid activation
        out = F.sigmoid(out)

        out = out.squeeze()

        return out

In [14]:
input_dim = 2  # Each entry in the sequence is 2-dimensional
hidden_dim = 128  # Number of features in the hidden state
output_dim = 1  # Output dimension (binary classification)
num_layers = 1  # Number of stacked LSTM layers

In [15]:
lstm = LSTMClassifier(input_dim, hidden_dim, output_dim, num_layers).to(device)
opt = optim.Adam(lstm.parameters(), lr=0.05)

In [85]:
count_trainable_parameters(lstm)

199809

In [16]:
writer = SummaryWriter("runs/lstm_refined")
fit(
    epochs=5,
    model=lstm,
    loss_func=F.binary_cross_entropy,
    opt=opt,
    train_dl=train_loader,
    valid_dl=val_loader,
    writer=writer,
    device=device,
)
writer.flush()

100%|██████████| 5/5 [17:16<00:00, 207.37s/it]


# Encoder Transformer

In [18]:
import torch
import torch.nn as nn
import torch.optim as optim


class TransformerEncoderClassifier(nn.Module):
    def __init__(
        self, input_dim, d_model, nhead, num_layers, max_seq_length, device
    ) -> None:
        super(TransformerEncoderClassifier, self).__init__()

        # Linear layer to project input_dim to d_model
        self.input_projection = nn.Linear(input_dim, d_model)

        # Precompute fixed positional encodings
        self.positional_encoding = self.generate_positional_encoding(
            max_seq_length, d_model
        ).to(device)

        # Transformer encoder layer with batch_first=True
        self.encoder_layer = nn.TransformerEncoderLayer(
            d_model=d_model,
            nhead=nhead,
            dim_feedforward=256,
            batch_first=True,
            norm_first=True,
            activation=F.gelu,
            device=device,
        )

        self.transformer_encoder = nn.TransformerEncoder(
            self.encoder_layer,
            num_layers=num_layers,
            enable_nested_tensor=False,
            norm=nn.LayerNorm(d_model),
        )

        # Mean pooling and final classification layer
        self.pooling = nn.AdaptiveAvgPool1d(1)
        self.fc = nn.Linear(d_model, 1)

    def generate_positional_encoding(self, max_len, d_model):
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(
            torch.arange(0, d_model, 2).float()
            * (-torch.log(torch.tensor(10000.0)) / d_model)
        )
        pe = torch.zeros(max_len, d_model)
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0)  # (1, max_len, d_model)
        return pe

    def forward(self, x: Tensor) -> Tensor:
        # x: (B, T, input_dim)

        _, T, _ = x.size()

        # Project input to d_model
        x = self.input_projection(x)  # (B, T, d_model)

        # Add positional encoding
        x = x + self.positional_encoding[:, :T, :]  # (B, T, d_model)

        # Apply transformer encoder
        x = self.transformer_encoder(x)  # (B, T, d_model)

        # Permute and pool
        x = x.permute(0, 2, 1)  # (B, d_model, T)
        x = self.pooling(x)  # (B, d_model, 1)

        # Squeeze and apply final classification layer
        x = x.squeeze(-1)  # (B, d_model)
        x = self.fc(x)  # (B, 1)

        # Apply sigmoid to get the probability
        x = torch.sigmoid(x)  # (B, 1)

        x = x.squeeze()  # (B)

        return x

In [19]:
input_dim = 2  # Number of features per nucleotide
d_model = 2  # Transformer model dimension
nhead = 1  # Number of attention heads
num_layers = 1  # Number of transformer layers

In [22]:
transformer = TransformerEncoderClassifier(
    input_dim=input_dim, d_model=d_model, nhead=nhead, num_layers=num_layers, max_seq_length=501, device=device
).to(device)
opt = optim.Adam(transformer.parameters(), lr=0.05)

In [23]:
count_trainable_parameters(transformer)

2641

In [24]:
writer = SummaryWriter("runs/transformer_refined")
fit(
    epochs=5,
    model=transformer,
    loss_func=F.binary_cross_entropy,
    opt=opt,
    train_dl=train_loader,
    valid_dl=val_loader,
    writer=writer,
    device=device,
)
writer.flush()

Epoch: 0


  0%|[31m          [0m| 0/3592 [00:00<?, ?it/s]

100%|[31m██████████[0m| 3592/3592 [01:55<00:00, 30.97it/s]
100%|[32m██████████[0m| 449/449 [00:14<00:00, 30.60it/s]


Epoch: 1


100%|[31m██████████[0m| 3592/3592 [01:54<00:00, 31.34it/s]
100%|[32m██████████[0m| 449/449 [00:14<00:00, 30.84it/s]


Epoch: 2


100%|[31m██████████[0m| 3592/3592 [01:54<00:00, 31.31it/s]
100%|[32m██████████[0m| 449/449 [00:14<00:00, 30.61it/s]


Epoch: 3


100%|[31m██████████[0m| 3592/3592 [01:54<00:00, 31.32it/s]
100%|[32m██████████[0m| 449/449 [00:13<00:00, 33.22it/s]


Epoch: 4


100%|[31m██████████[0m| 3592/3592 [01:52<00:00, 31.98it/s]
100%|[32m██████████[0m| 449/449 [00:14<00:00, 31.37it/s]


# CNN

## Image Representation

In [6]:
from sequence_transformations import TransformationImageGrayscale

In [7]:
dataset = SequenceDataset(DATASET_PATH, TransformationImageGrayscale())

train_size = int(0.8 * len(dataset))
val_size = len(dataset) - train_size

train_dataset, val_dataset = random_split(
    dataset,
    [train_size, val_size],
)


train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=(BATCH_SIZE * 2), shuffle=False)

In [8]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class DNAClassifierCNN(nn.Module):
    def __init__(self) -> None:
        super(DNAClassifierCNN, self).__init__()
        
        # Define convolutional layers
        conv_layers = []
        in_channels = 1
        for out_channels in [4, 3, 2]:
            conv_layers.append(nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1))
            conv_layers.append(nn.ReLU())
            conv_layers.append(nn.MaxPool2d(kernel_size=2, stride=2))
            in_channels = out_channels
        
        self.conv_block = nn.Sequential(*conv_layers)
        
        # Define fully connected layers
        self.fc_layers = nn.Sequential(
            nn.Linear(2 * 62 * 12, 128),
            nn.ReLU(),
            nn.Linear(128, 64),
            nn.ReLU(),
            nn.Linear(64, 1),
            nn.Sigmoid()
        )

    def forward(self, x: Tensor) -> Tensor:
        x = x.unsqueeze(2)  # (batch_size, 500, 1)
        x = x.repeat(1, 1, 100)  # (batch_size, 500, 100)
        x = x.unsqueeze(1)  # (batch_size, 1, 500, 100)
        x = self.conv_block(x)
        x = x.view(-1, 2 * 62 * 12) # Flatten the output
        x = self.fc_layers(x) # Apply fully connected layers
        return x.squeeze()


In [9]:
cnn = DNAClassifierCNN().to(device)
opt = optim.Adam(cnn.parameters(), lr=0.05)

In [10]:
count_trainable_parameters(cnn)

199120

In [11]:
xbb = None
for xb, _ in train_loader:
    xbb = xb.to(device)

In [12]:
cnn(xbb)

  return F.conv2d(input, weight, bias, self.stride,


tensor([0.4872, 0.4863, 0.4869, 0.4888, 0.4860, 0.4877, 0.4872, 0.4875, 0.4862,
        0.4876, 0.4888, 0.4873, 0.4878, 0.4869, 0.4873, 0.4865, 0.4866, 0.4868,
        0.4868, 0.4864, 0.4872, 0.4869, 0.4867, 0.4872, 0.4868, 0.4864, 0.4875],
       device='cuda:0', grad_fn=<SqueezeBackward0>)

In [13]:
writer = SummaryWriter("runs/cnn_refined")
fit(
    epochs=2,
    model=cnn,
    loss_func=F.binary_cross_entropy,
    opt=opt,
    train_dl=train_loader,
    valid_dl=val_loader,
    writer=writer,
    device=device,
    start_epoch_idx=10
)
writer.flush()

Epoch: 10


100%|[31m██████████[0m| 3592/3592 [02:00<00:00, 29.84it/s]
100%|[32m██████████[0m| 449/449 [00:07<00:00, 62.64it/s]


Epoch: 11


  8%|[31m▊         [0m| 278/3592 [00:09<01:52, 29.40it/s]


KeyboardInterrupt: 