In [1]:
%autosave 300
%load_ext autoreload
%autoreload 2
%reload_ext autoreload
%config Completer.use_jedi = False

Autosaving every 300 seconds


In [2]:
import os

os.chdir("../.")
print(os.getcwd())

/mnt/batch/tasks/shared/LS_root/mounts/clusters/soutrik-gpuvm/code/Users/Soutrik.Chowdhury/GeneCpGSequencing


#### First we will experiment with rnn padding sequences concepts like padding, packing and unpacking sequences
- Sequence Padding and Packing for RNNs
- Implementation of Sequence Padding and Sequence Packing
- Handling Sequence Padding and Packing in PyTorch for RNNs

This is where padding comes and pad all sequences to the maximum length (8 in this case) with meaningless values. This creates an 8x8 matrix for computations, even though some sequences are shorter. This wastes processing power because we perform unnecessary calculations (64 computations instead of the actual 45 needed).

For this , packing plays an important role as It packs the sequences into a data structure that preserves their original lengths before padding. By doing so, the RNN model can process only the non-padded portions of each sequence, effectively reducing the computational overhead.

In [3]:
import torch
import torch.nn.utils.rnn as rnn_utils

# Define sequences
sequences = [[1, 2, 3, 7], [4, 5], [6, 7, 8, 9, 11], [10]]
sequences_tensor = [
    torch.tensor(seq) for seq in sequences
]  # Convert sequences to PyTorch tensors
sequences_tensor[0].shape

torch.Size([4])

In [4]:
# Padding : pad_sequence function from torch.nn.utils.rnn module to pad the sequences to the same length
padded_sequences = rnn_utils.pad_sequence(sequences_tensor, batch_first=True)
print("Padded sequences:", "\n", padded_sequences)
print(
    "Padded sequences shape:", "\n", padded_sequences.shape
)  # (4, 5) : 4 sequences, each of length 5

Padded sequences: 
 tensor([[ 1,  2,  3,  7,  0],
        [ 4,  5,  0,  0,  0],
        [ 6,  7,  8,  9, 11],
        [10,  0,  0,  0,  0]])
Padded sequences shape: 
 torch.Size([4, 5])


In [5]:
# Packing : pack_padded_sequence function from torch.nn.utils.rnn module to pack the padded sequences into a packed sequence object
sequence_lengths = torch.tensor(
    [len(seq) for seq in sequences]
)  # Calculating actual lengths of sequences before padding
# Pack padded sequences into a packed sequence object
packed_sequences = rnn_utils.pack_padded_sequence(
    padded_sequences, sequence_lengths, batch_first=True, enforce_sorted=False
)
print("\nPacked sequences:", packed_sequences)
# Packed sequences object contains data, batch_sizes, sorted_indices, and unsorted_indices
# row wise data is first sorted by length and then packed into a single tensor by column wise


Packed sequences: PackedSequence(data=tensor([ 6,  1,  4, 10,  7,  2,  5,  8,  3,  9,  7, 11]), batch_sizes=tensor([4, 3, 2, 2, 1]), sorted_indices=tensor([2, 0, 1, 3]), unsorted_indices=tensor([1, 2, 0, 3]))


How pad sequence works is that it takes a list of sequences, and pads them to the same length. The sequences are expected to be in the form of PyTorch tensors. The pad_sequence function returns a tensor with the padded sequences. The sequences are padded with zeros by default. The padding is done at the beginning of the sequences. The pad_sequence function also takes an optional batch_first argument, which specifies whether the batch dimension should be the first dimension of the output tensor. If batch_first is set to True, the output tensor will have the batch dimension as the first dimension. If batch_first is set to False, the output tensor will have the batch dimension as the second dimension

In the packed sequence:
- data: contains the flattened non-padded elements from the padded sequences.
- batch_sizes: indicates how many elements are present at each time step, reflecting the varying sequence lengths within the batch.
- This packed sequence is feed into your recurrent neural network (RNN) model during training, allowing it to efficiently process variable-length sequences.
- How it works is that the RNN model processes the packed sequence element by element, using the batch_sizes tensor to keep track of the sequence lengths. The RNN model skips the padding elements and only processes the actual sequence elements, reducing the computational overhead.

Next we will show with real world example of using pack_padded_sequence and pad_packed_sequence functions in PyTorch.

 Implementing Padding and Packing in PyTorch

In [6]:
import torch
from torch.nn.utils.rnn import pad_sequence, pack_padded_sequence, pad_packed_sequence

# Example sequences (already encoded as numbers)
sequences = [
    torch.tensor([1, 2]),  # "Hello"
    torch.tensor([3]),  # "Hi"
    torch.tensor([4, 5, 6, 7]),  # "Goodbye"
]
print(sequences)
print(sequences[0].shape)

[tensor([1, 2]), tensor([3]), tensor([4, 5, 6, 7])]
torch.Size([2])


In [7]:
# Sort sequences by length (descending order required for packing)
sequences.sort(key=len, reverse=True)
print(sequences)

# Extract lengths before padding (important for packing)
sequence_lengths = torch.tensor([len(seq) for seq in sequences])

# Pad sequences to match the longest one
padded_sequences = pad_sequence(sequences, batch_first=True, padding_value=0)

print("Padded Sequences:\n", padded_sequences)
print("Sequence Lengths:\n", sequence_lengths)

[tensor([4, 5, 6, 7]), tensor([1, 2]), tensor([3])]
Padded Sequences:
 tensor([[4, 5, 6, 7],
        [1, 2, 0, 0],
        [3, 0, 0, 0]])
Sequence Lengths:
 tensor([4, 2, 1])


In [8]:
# Pack the padded sequence
packed_sequences = pack_padded_sequence(
    padded_sequences, sequence_lengths, batch_first=True, enforce_sorted=True
)

print("Packed Sequences:\n", packed_sequences)
# Packed sequences object contains data, batch_sizes, sorted_indices, and unsorted_indices
# row wise data is first sorted by length and then packed into a single tensor by column
# batch_sizes contains the number of elements in each batch at each time step which gives the actual length of each sequence without padding

Packed Sequences:
 PackedSequence(data=tensor([4, 1, 3, 5, 2, 6, 7]), batch_sizes=tensor([3, 2, 1, 1]), sorted_indices=None, unsorted_indices=None)


In [9]:
pad_packed_sequence(packed_sequences, batch_first=True)
# pad_packed_sequence function is used to unpack the packed sequence object back into a padded sequence
# this will be useful when we want to use the output of an RNN layer in a subsequent layer

(tensor([[4, 5, 6, 7],
         [1, 2, 0, 0],
         [3, 0, 0, 0]]),
 tensor([4, 2, 1]))

Why Packing Helps?
* It ignores padded values during computation.
* The RNN only processes valid time steps, improving efficiency and accuracy.

In [10]:
import torch
import torch.nn as nn
from torch.nn.utils.rnn import pad_sequence, pack_padded_sequence, pad_packed_sequence


class LSTMModel(nn.Module):
    """
    LSTM model that takes a sequence of embeddings and returns the output of the LSTM layer
    and the final hidden state of the LSTM layer (h_n) for each sequence in the batch.
    Uses pack_padded_sequence and pad_packed_sequence to handle variable-length sequences.
    """

    def __init__(self, vocab_size, embedding_dim, hidden_size):
        super(LSTMModel, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim)  # Embed token IDs
        self.lstm = nn.LSTM(
            embedding_dim, hidden_size, batch_first=True
        )  # Fix input size

    def forward(self, x, lengths):
        # Embed the input
        embedded = self.embedding(
            x
        )  # Output shape: (batch_size, seq_len, embedding_dim)
        print("Embedded Shape:", embedded.shape)

        # Pack the sequence
        packed_x = pack_padded_sequence(
            embedded, lengths, batch_first=True, enforce_sorted=True
        )
        print("Packed X Shape:", packed_x.data.shape)

        # Process with LSTM
        packed_output, (h_n, c_n) = self.lstm(packed_x)
        print("Packed Output Shape:", packed_output.data.shape)

        # Unpack the output
        output, _ = pad_packed_sequence(packed_output, batch_first=True)
        print("Unpacked Output Shape:", output.shape)

        return output, h_n

In [11]:
# ------------------------ Data Preparation ------------------------

# Example tokenized sequences (variable-length)
sequences = [
    torch.tensor([1, 2, 3]),
    torch.tensor([4, 5]),
    torch.tensor([6, 7, 8, 9]),
    torch.tensor([10]),
]

# Sort sequences by length in descending order (required for packing)
sequences.sort(key=len, reverse=True)

# Pad sequences to match the longest one
padded_sequences = pad_sequence(sequences, batch_first=True, padding_value=0)
sequence_lengths = torch.tensor([len(seq) for seq in sequences])

print("Padded Sequences:\n", padded_sequences)
print("Sequence Lengths:\n", sequence_lengths)

Padded Sequences:
 tensor([[ 6,  7,  8,  9],
        [ 1,  2,  3,  0],
        [ 4,  5,  0,  0],
        [10,  0,  0,  0]])
Sequence Lengths:
 tensor([4, 3, 2, 1])


In [12]:
# ------------------------ Model Initialization ------------------------

VOCAB_SIZE = 11  # Number of unique tokens (including padding)
EMBEDDING_DIM = 100  # Embedding vector size
HIDDEN_DIM = 256  # LSTM hidden state size

# Initialize model
model = LSTMModel(VOCAB_SIZE, EMBEDDING_DIM, HIDDEN_DIM)

# ------------------------ Forward Pass ------------------------

# Pass the padded sequences through the model
outputs, hidden_state = model(padded_sequences, sequence_lengths)

print("Final Output Shape:", outputs.shape)
print("Final Hidden State Shape:", hidden_state.shape)

Embedded Shape: torch.Size([4, 4, 100])
Packed X Shape: torch.Size([10, 100])
Packed Output Shape: torch.Size([10, 256])
Unpacked Output Shape: torch.Size([4, 4, 256])
Final Output Shape: torch.Size([4, 4, 256])
Final Hidden State Shape: torch.Size([1, 4, 256])


##### Next we will deep dive into pack_padded_sequence and pad_packed_sequence functions

## CPG Counting using LSTM
This project involves building a neural network to count the number of CpGs (consecutive CGs) in DNA sequences. Here's a structured solution for this problem, from problem understanding to advanced implementations using LSTMs, PyTorch optimizations, and hyperparameter tuning with Optuna.


In [2]:
import random
import numpy as np
import torch
from functools import partial
from typing import Sequence, Tuple, List


# Set seed for reproducibility
def set_seed(seed=13):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    if torch.cuda.is_available():
        print("Using CUDA")
        torch.cuda.manual_seed_all(seed)


set_seed(13)

Using CUDA


##### Data Preparation #####

In [3]:
# DNA Alphabet Encoding
alphabet = "NACGT"
dna2int = {a: i for a, i in zip(alphabet, range(1, 6))}  # Map A, C, G, T, N ‚Üí [1,5]
int2dna = {i: a for a, i in zip(alphabet, range(1, 6))}  # Reverse mapping
dna2int.update({"pad": 0})  # Add padding token
int2dna.update({0: "<pad>"})  # Padding representation

# Mapping Functions
intseq_to_dnaseq = partial(map, int2dna.get)
dnaseq_to_intseq = partial(map, dna2int.get)
list(intseq_to_dnaseq([1]))
print(list(dnaseq_to_intseq(["C"])))
print(list(dnaseq_to_intseq(["G"])))

[3]
[4]


In [10]:
print(dna2int)
print(int2dna)

{'N': 1, 'A': 2, 'C': 3, 'G': 4, 'T': 5, 'pad': 0}
{1: 'N', 2: 'A', 3: 'C', 4: 'G', 5: 'T', 0: '<pad>'}


In [4]:
# Function to count CpGs in a DNA sequence
def count_cpgs(seq: str) -> int:
    """
    Count the number of CpG dinucleotides in a DNA sequence.
    """
    return sum(1 for i in range(len(seq) - 1) if seq[i : i + 2] == "CG")


# Unified function to generate data
def generate_dna_data(
    n_seqs: int, fixed_len: int = None, lb: int = 16, ub: int = 128
) -> Tuple[List[List[int]], List[int]]:
    """
    Generate DNA sequences and their CpG counts.

    Parameters:
    - n_seqs (int): Number of sequences to generate.
    - fixed_len (int, optional): If given, generates fixed-length sequences.
    - lb (int): Lower bound for sequence length (used if fixed_len is None).
    - ub (int): Upper bound for sequence length (used if fixed_len is None).

    Returns:
    - X (List[List[int]]): List of integer-encoded DNA sequences.
    - y (List[int]): List of CpG counts corresponding to each sequence.
    """
    X, y = [], []

    for _ in range(n_seqs):
        seq_len = (
            fixed_len if fixed_len else random.randint(lb, ub)
        )  # Choose sequence length
        int_seq = [
            random.randint(1, 5) for _ in range(seq_len)
        ]  # Generate integer-encoded DNA sequence
        dna_seq = "".join(intseq_to_dnaseq(int_seq))  # Convert to DNA sequence

        X.append(int_seq)  # Store integer-encoded sequence
        y.append(count_cpgs(dna_seq))  # Store CpG count

    return X, y

In [5]:
# Generate fixed-length dataset (128 nucleotides per sequence)
X_fixed, y_fixed = generate_dna_data(n_seqs=1000, fixed_len=32)

# Generate variable-length dataset (random lengths between 16 and 128)
X_variable, y_variable = generate_dna_data(n_seqs=1000, lb=16, ub=32)

In [6]:
len(X_fixed), len(X_variable)
len(y_fixed), len(y_variable)

min(map(len, X_fixed)), max(map(len, X_fixed))
min(map(len, X_variable)), max(map(len, X_variable))

(16, 32)

In [9]:
# X_fixed[0],y_fixed[0]

In [18]:
##### ------------------------ Hyperparameters ------------------------
batch_size = 16
embedding_dim = 32
hidden_size = 64
num_layers = 2
dropout = 0.2
learning_rate = 0.001
num_epochs = 50
weight_decay = 1e-4

#### ------------------------ Pytorch Dataset ------------------------

In [19]:
import torch
from torch.utils.data import Dataset, DataLoader
from torch.nn.utils.rnn import pad_sequence, pack_padded_sequence, pad_packed_sequence
import torch.nn as nn
import torch.optim as optim
from torch.optim.lr_scheduler import ReduceLROnPlateau
from sklearn.metrics import mean_absolute_error, mean_squared_error
from typing import Tuple, List
import time
from sklearn.model_selection import train_test_split
from torch.amp import GradScaler, autocast

First we will test with same length sequences and then with variable length sequences

In [20]:
# Split variable-length dataset (80% train, 20% validation)
X_train, X_val, y_train, y_val = train_test_split(
    X_fixed, y_fixed, test_size=0.1, random_state=13
)
print(len(X_train), len(X_val))
print(len(y_train), len(y_val))

900 100
900 100


In [21]:
class CPGDataset(Dataset):
    def __init__(self, sequences, labels):
        """
        Initialize the dataset with sequences and labels (CpG counts) as input arguments and store them as attributes.

        Parameters:
        - sequences (List[List[int]]): List of integer-encoded DNA sequences.
        - labels (List[int]): List of CpG counts corresponding to each sequence.

        Returns:
        - None
        """
        self.sequences = [torch.tensor(seq, dtype=torch.long) for seq in sequences]
        self.labels = torch.tensor(labels, dtype=torch.float32)

    def __len__(self):
        """
        Return the number of sequences in the dataset.
        """
        return len(self.sequences)

    def __getitem__(self, idx):
        """
        Retrieve a sequence and its label by index.
        """
        return self.sequences[idx], self.labels[idx]

    @staticmethod
    def collate_fn(batch):
        """
        Custom collate function to pad sequences dynamically and return a batch of sequences and labels.
        """
        sequences, labels = zip(*batch)
        padded_sequences = pad_sequence(sequences, batch_first=True, padding_value=0)
        return padded_sequences, torch.tensor(labels, dtype=torch.float32)

Create datasets and dataloaders for fixed and variable-length sequences

In [22]:
# pytorch standard dataset
train_dataset = CPGDataset(X_train, y_train)
val_dataset = CPGDataset(X_val, y_val)

In [23]:
# each iteration of the dataset will return a list of sequences and a labels
x, y = next(iter(train_dataset))
print(x.shape, y.shape)

torch.Size([32]) torch.Size([])


In [24]:
# dataloader
train_dataloader = DataLoader(
    train_dataset,
    batch_size=batch_size,
    shuffle=True,
    collate_fn=CPGDataset.collate_fn,
)

val_dataloader = DataLoader(
    val_dataset,
    batch_size=batch_size,
    shuffle=False,
    collate_fn=CPGDataset.collate_fn,
)

In [25]:
# each iteration of the dataloader will return a batch of sequences and labels
for x_batch, y_batch in train_dataloader:
    print(x_batch.shape, y_batch.shape)
    break

for x_batch, y_batch in val_dataloader:
    print(x_batch.shape, y_batch.shape)
    break

torch.Size([16, 32]) torch.Size([16])
torch.Size([16, 32]) torch.Size([16])


In [26]:
# Sample batch
sample_batch = next(iter(train_dataloader))
print("Padded Batch Shape:", sample_batch[0].shape)  # (batch_size, max_seq_len)

Padded Batch Shape: torch.Size([16, 32])


##### ------------------------ Model Initialization ------------------------
The model consists of:

- Embedding Layer ‚Äì Converts integer-encoded DNA sequences into dense vector representations.
- LSTM Layer ‚Äì Captures sequential patterns in DNA.
- Fully Connected Layer ‚Äì Outputs the CpG count as a single continuous value.
- ReLU Activation ‚Äì Ensures non-negative predictions.

### **Final Summary of Shape Transformations**

| Step | Layer                          | Input Shape                     | Output Shape                    | Explanation |
|------|--------------------------------|---------------------------------|---------------------------------|-------------|
| 1Ô∏è‚É£  | **Raw Input**                  | `(batch_size, seq_len)`         | `(4, 10)`                        | Raw integer-encoded DNA sequences, where `batch_size = 4` (number of sequences in a batch) and `seq_len = 10` (length of each sequence). |
| 2Ô∏è‚É£  | **Embedding**                  | `(batch_size, seq_len)`         | `(4, 10, 64)`                     | Converts integer tokens into dense vectors of size `embedding_dim = 64`. This means each nucleotide is mapped to a 64-dimensional vector. |
| 3Ô∏è‚É£  | **LSTM**                        | `(batch_size, seq_len, embedding_dim)` | `(4, 10, 128)`          | The LSTM processes the sequence and outputs a hidden state of size `hidden_size = 128` for each time step in the sequence. |
| 4Ô∏è‚É£  | **Selecting Last Hidden State** | `(batch_size, seq_len, hidden_size)` | `(4, 128)`             | Extracts the last time step's hidden state (`lstm_output[:, -1, :]`), which summarizes the sequence information. |
| 5Ô∏è‚É£  | **Fully Connected Layer**       | `(batch_size, hidden_size)`     | `(4, 1)`                         | Linear transformation reduces `hidden_size = 128` to a single output value per sequence (predicting the CpG count). |
| 6Ô∏è‚É£  | **ReLU Activation**             | `(batch_size, 1)`               | `(4, 1)`                         | Ensures non-negative predictions for CpG site counts. |


### **Hyperparameters Used in the Model**
- **`batch_size = 4`** ‚Üí Number of sequences in a batch.
- **`seq_len = 10`** ‚Üí Length of each DNA sequence.
- **`vocab_size = 11`** ‚Üí Total number of unique nucleotides (A, C, G, T) including padding.
- **`embedding_dim = 64`** ‚Üí Size of the dense representation of each nucleotide.
- **`hidden_size = 128`** ‚Üí Number of hidden units in the LSTM layer.
- **`num_layers = 2`** ‚Üí Number of stacked LSTM layers.
- **`dropout = 0.3`** ‚Üí Dropout rate for regularization in LSTM.

This breakdown ensures **clear understanding of shape transformations** throughout the model. üöÄ

In [27]:
class CpGCounter(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_size, num_layers, dropout):
        super(CpGCounter, self).__init__()
        # Embedding layer to convert integer-encoded sequences to embeddings of fixed size (embedding_dim)
        self.embedding = nn.Embedding(
            vocab_size, embedding_dim, padding_idx=0
        )  # input: (batch_size, seq_len), output: (batch_size, seq_len, embedding_dim)
        # LSTM layer to process the embeddings and capture sequential information (hidden_size)
        self.lstm = nn.LSTM(
            embedding_dim, hidden_size, num_layers, batch_first=True, dropout=dropout
        )  # input: (batch_size, seq_len, embedding_dim), output: (batch_size, seq_len, hidden_size)
        # Fully connected layer to predict the number of CpG sites
        self.fc = nn.Linear(
            hidden_size, 1
        )  # input: (batch_size, seq_len, hidden_size), output: (batch_size, seq_len, 1)

        self.relu = nn.ReLU()

    def forward(self, x):
        # Embed the input sequences
        embedded = self.embedding(x)
        # Process the embeddings with an LSTM layer
        lstm_output, _ = self.lstm(embedded)
        # we only take the last hidden state
        lstm_output = lstm_output[
            :, -1, :
        ]  # shape is (batch_size, hidden_size) and we only take the last hidden state of the sequence
        # Predict the number of CpG sites
        output = self.fc(lstm_output)
        return self.relu(output)

In [28]:
# check cuda availability and set device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

cuda


In [29]:
vocab_size = len(alphabet) + 1  # Add 1 for padding token
print(vocab_size)
# print(len(int2dna))

6


only quick testing of the model

In [30]:
# # initialize model
# model = CpGCounter(
#     vocab_size=vocab_size,
#     embedding_dim=embedding_dim,
#     hidden_size=hidden_size,
#     num_layers=num_layers,
#     dropout=dropout,
# ).to(device)
# print(model)

In [31]:
# # define loss function and optimizer and scheduler
# criterion = nn.MSELoss()
# optimizer = optim.Adam(model.parameters(), lr=learning_rate, weight_decay=weight_decay)
# scheduler = ReduceLROnPlateau(
#     optimizer, mode="min", factor=0.1, patience=5, verbose=True
# )

In [32]:
# len(
#     train_dataloader
# )  # 50 sets of batch and each batch has 16 sequences with each sequence having 32 nucleotides

In [33]:
# for epoch in range(num_epochs):
#     total_loss = 0.0
#     all_preds, all_labels = [], []  # Store predictions and labels
#     model.train()

#     for inputs, labels in train_dataloader:
#         inputs, labels = inputs.to(device), labels.to(device)  # Move data to GPU

#         optimizer.zero_grad()
#         outputs = model(inputs).squeeze()  # Remove extra dimension

#         loss = criterion(outputs, labels)  # Compute loss
#         loss.backward()
#         torch.nn.utils.clip_grad_norm_(
#             model.parameters(), max_norm=1.0
#         )  # Gradient Clipping
#         optimizer.step()

#         total_loss += loss.item()

#         # Store predictions and labels for metrics
#         all_preds.extend(outputs.detach().cpu().numpy())
#         all_labels.extend(labels.detach().cpu().numpy())

#     # Scheduler Step
#     scheduler.step(total_loss)

#     # Compute Regression Metrics (if applicable)
#     mae = mean_absolute_error(all_labels, all_preds)
#     mse = mean_squared_error(all_labels, all_preds)
#     rmse = mse**0.5  # Root Mean Squared Error

#     print(
#         f"Epoch {epoch+1}/{num_epochs}, Loss: {total_loss:.4f}, MAE: {mae:.4f}, RMSE: {rmse:.4f}"
#     )

We will improve the training loop by adding 
- validation and early stopping to prevent overfitting 
- saving the best model
- more functionalities like logging, plotting, etc.

In [34]:
def training_loop(model, dataloader, device, optimizer, criterion):
    """
    Runs one training epoch.

    Parameters:
    - model (nn.Module): PyTorch model to train.
    - dataloader (DataLoader): Training DataLoader.
    - device (torch.device): CPU or GPU.
    - optimizer (torch.optim.Optimizer): Optimizer.
    - criterion (nn.Module): Loss function.

    Returns:
    - avg_loss (float): Average loss over dataset.
    - avg_mae (float): Average MAE over dataset.
    - avg_rmse (float): Average RMSE over dataset.
    """
    model.train()
    total_loss, total_mae, total_rmse = 0.0, 0.0, 0.0
    num_batches = len(dataloader)

    for inputs, labels in dataloader:
        inputs, labels = (
            inputs.to(device),
            labels.to(device),
        )

        optimizer.zero_grad()
        outputs = model(inputs).squeeze()

        loss = criterion(outputs, labels)
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
        optimizer.step()

        # Compute batch-wise metrics
        batch_mae = mean_absolute_error(
            labels.cpu().numpy(), outputs.detach().cpu().numpy()
        )
        batch_rmse = (
            mean_squared_error(labels.cpu().numpy(), outputs.detach().cpu().numpy())
            ** 0.5
        )

        total_loss += loss.item()
        total_mae += batch_mae
        total_rmse += batch_rmse

    return total_loss / num_batches, total_mae / num_batches, total_rmse / num_batches

In [35]:
def validation_loop(model, dataloader, device, criterion):
    """
    Runs one validation epoch.

    Parameters:
    - model (nn.Module): PyTorch model to evaluate.
    - dataloader (DataLoader): Validation DataLoader.
    - device (torch.device): CPU or GPU.
    - criterion (nn.Module): Loss function.

    Returns:
    - avg_loss (float): Average loss over dataset.
    - avg_mae (float): Average MAE over dataset.
    - avg_rmse (float): Average RMSE over dataset.
    """
    model.eval()
    total_loss, total_mae, total_rmse = 0.0, 0.0, 0.0
    num_batches = len(dataloader)

    with torch.no_grad():
        for inputs, labels in dataloader:
            inputs, labels = (
                inputs.to(device),
                labels.to(device),
            )
            outputs = model(inputs).squeeze()

            loss = criterion(outputs, labels)

            # Compute batch-wise metrics
            batch_mae = mean_absolute_error(labels.cpu().numpy(), outputs.cpu().numpy())
            batch_rmse = (
                mean_squared_error(labels.cpu().numpy(), outputs.cpu().numpy()) ** 0.5
            )

            total_loss += loss.item()
            total_mae += batch_mae
            total_rmse += batch_rmse

    return total_loss / num_batches, total_mae / num_batches, total_rmse / num_batches

In [36]:
def save_checkpoint(
    epoch, model, optimizer, scheduler, best_val_loss, save_path="best_cpg_model.pth"
):
    """
    Saves the model, optimizer, and scheduler state for training resumption.
    """
    checkpoint = {
        "epoch": epoch + 1,  # Save next epoch to resume correctly
        "model_state_dict": model.state_dict(),
        "optimizer_state_dict": optimizer.state_dict(),
        "scheduler_state_dict": scheduler.state_dict(),
        "best_val_loss": best_val_loss,
    }
    torch.save(checkpoint, save_path)
    print(f"Model checkpoint saved at {save_path}")


def load_checkpoint(
    model, optimizer, scheduler, device, save_path="best_cpg_model.pth"
):
    """
    Loads a saved checkpoint to resume training.
    """
    checkpoint = torch.load(save_path, map_location=device, weights_only=True)
    model.load_state_dict(checkpoint["model_state_dict"])
    optimizer.load_state_dict(checkpoint["optimizer_state_dict"])
    scheduler.load_state_dict(checkpoint["scheduler_state_dict"])
    best_val_loss = checkpoint["best_val_loss"]

    print(
        f"Loaded checkpoint from {save_path}, resuming from epoch {checkpoint['epoch']} with best validation loss: {best_val_loss:.4f}"
    )
    return model, optimizer, scheduler, checkpoint["epoch"], best_val_loss

In [37]:
def train_model(
    model,
    train_loader,
    val_loader,
    device,
    epochs=25,
    patience=5,
    save_path="best_cpg_model.pth",
    lr=0.001,
    weight_decay=1e-4,
    resume=False,
):
    """
    Trains an LSTM model with validation and early stopping. Supports resuming training.

    Parameters:
    - model: LSTM model.
    - train_loader: Training DataLoader.
    - val_loader: Validation DataLoader.
    - device: CPU or GPU.
    - epochs: Max training epochs.
    - patience: Early stopping patience.
    - save_path: Path to save the best model.
    - lr: Initial learning rate.
    - weight_decay: L2 regularization weight.
    - resume: Whether to resume training from the last checkpoint.

    Returns:
    - Best trained model.
    """
    # initialize model and optimizer and scheduler and criterion and best_val_loss and start_epoch
    model.to(device)
    optimizer = optim.AdamW(model.parameters(), lr=lr, weight_decay=weight_decay)
    criterion = nn.MSELoss()
    scheduler = ReduceLROnPlateau(
        optimizer,
        mode="min",
        factor=0.1,
        patience=2,
    )

    best_val_loss = float("inf")
    start_epoch = 0

    # Load checkpoint if resuming
    if resume and os.path.exists(save_path):
        try:
            model, optimizer, scheduler, start_epoch, best_val_loss = load_checkpoint(
                model, optimizer, scheduler, device, save_path
            )
        except FileNotFoundError:
            print("No checkpoint found. Starting training from scratch.")

    else:
        print("Starting training from scratch.")

    no_improvement = 0
    start_time = time.time()

    try:
        for epoch in range(start_epoch, epochs):
            train_loss, train_mae, train_rmse = training_loop(
                model, train_loader, device, optimizer, criterion
            )
            val_loss, val_mae, val_rmse = validation_loop(
                model, val_loader, device, criterion
            )

            # Scheduler step
            if epoch > 0:
                scheduler.step(val_loss)

            # Print results
            print(
                f"Epoch {epoch+1}/{epochs}, Train Loss: {train_loss:.4f}, Train MAE: {train_mae:.4f}, Train RMSE: {train_rmse:.4f}, "
                f"Val Loss: {val_loss:.4f}, Val MAE: {val_mae:.4f}, Val RMSE: {val_rmse:.4f}"
            )

            # Save best model checkpoint
            if val_loss < best_val_loss:
                print(
                    f"New best validation loss: {val_loss:.4f} (previous best: {best_val_loss:.4f})"
                )
                best_val_loss = val_loss
                no_improvement = 0
                save_checkpoint(
                    epoch, model, optimizer, scheduler, best_val_loss, save_path
                )
            else:
                no_improvement += 1
                print(f"No improvement, patience left: {patience - no_improvement}")

            # Early stopping
            if no_improvement >= patience:
                print(f"Early Stopping Triggered after epoch {epoch+1}")
                break

    except KeyboardInterrupt:
        print("Training Interrupted! Saving last checkpoint...")
        save_checkpoint(epoch, model, optimizer, scheduler, best_val_loss, save_path)

    print(f"Training Completed in {(time.time() - start_time):.2f} seconds")

    # Load the best model before returning
    model, optimizer, scheduler, _, _ = load_checkpoint(
        model, optimizer, scheduler, device, save_path
    )
    return model

In [38]:
new_model = CpGCounter(
    vocab_size=vocab_size,
    embedding_dim=embedding_dim,
    hidden_size=hidden_size,
    num_layers=num_layers,
    dropout=dropout,
)

In [39]:
best_model = train_model(
    new_model,
    train_dataloader,
    val_dataloader,
    device,
    epochs=num_epochs,
    patience=5,
    save_path="best_cpg_model.pth",
    lr=learning_rate,
    weight_decay=weight_decay,
)

Starting training from scratch.
Epoch 1/50, Train Loss: 2.5070, Train MAE: 1.2232, Train RMSE: 1.5631, Val Loss: 1.1817, Val MAE: 0.8110, Val RMSE: 1.0319
New best validation loss: 1.1817 (previous best: inf)
Model checkpoint saved at best_cpg_model.pth
Epoch 2/50, Train Loss: 1.0280, Train MAE: 0.8017, Train RMSE: 0.9932, Val Loss: 0.8604, Val MAE: 0.7497, Val RMSE: 0.9089
New best validation loss: 0.8604 (previous best: 1.1817)
Model checkpoint saved at best_cpg_model.pth
Epoch 3/50, Train Loss: 0.7643, Train MAE: 0.6924, Train RMSE: 0.8549, Val Loss: 0.9482, Val MAE: 0.7549, Val RMSE: 0.9457
No improvement, patience left: 4
Epoch 4/50, Train Loss: 0.3489, Train MAE: 0.4511, Train RMSE: 0.5580, Val Loss: 0.1724, Val MAE: 0.2882, Val RMSE: 0.4067
New best validation loss: 0.1724 (previous best: 0.8604)
Model checkpoint saved at best_cpg_model.pth
Epoch 5/50, Train Loss: 0.1202, Train MAE: 0.2536, Train RMSE: 0.3324, Val Loss: 0.1413, Val MAE: 0.2668, Val RMSE: 0.3567
New best validati

In [30]:
print(best_model.state_dict()["embedding.weight"].sum())
print(new_model.state_dict()["embedding.weight"].sum())

tensor(-2.2883, device='cuda:0')
tensor(-2.2883, device='cuda:0')


In [31]:
def predict_cpgs_from_dna(
    model_path: str,
    dna_sequence: str,
    dna2int: dict,
    embedding_dim,
    hidden_size,
    num_layers,
    dropout,
    device,
    model_class=CpGCounter,
):
    """
    Predict CpG count from a human DNA string.

    Parameters:
    - model_path: Path to trained LSTM model.
    - dna_sequence: Human-readable DNA string.
    - dna2int: Dictionary mapping DNA bases to integer values.
    - embedding_dim: Dimension of embedding layer.
    - hidden_size: Size of LSTM hidden state.
    - num_layers: Number of LSTM layers.
    - dropout: Dropout rate.
    - device: The device ('cpu' or 'cuda') for inference.

    Returns:
    - Predicted CpG count (rounded to 2 decimal places).
    """

    # Check if the model checkpoint exists
    if not os.path.exists(model_path):
        raise FileNotFoundError(f"Model checkpoint not found at {model_path}")

    # Load Model
    vocab_size = len(dna2int)
    model = model_class(
        vocab_size=vocab_size,
        embedding_dim=embedding_dim,
        hidden_size=hidden_size,
        num_layers=num_layers,
        dropout=dropout,
    )

    # Load the trained model checkpoint
    checkpoint = torch.load(model_path, map_location=device, weights_only=True)
    model.load_state_dict(checkpoint["model_state_dict"])
    model.to(device)  # Move model to the correct device
    model.eval()

    # Convert DNA string to integer sequence
    int_sequence = [
        dna2int.get(base, 0) for base in dna_sequence
    ]  # Map bases to integers
    int_tensor = (
        torch.tensor(int_sequence, dtype=torch.long).unsqueeze(0).to(device)
    )  # Move to same device

    # Inference
    with torch.no_grad():
        predicted_count = model(int_tensor).squeeze().item()  # Ensure it's a scalar

    return round(predicted_count, 2)

In [32]:
# Test Example
test_dna = "NCACANNTNCGGAGGCGNAGCTCG"
# no of CpG sites = 3


predicted_cpgs = predict_cpgs_from_dna(
    "best_cpg_model.pth",
    test_dna,
    dna2int,
    embedding_dim,
    hidden_size,
    num_layers,
    dropout,
    device,
    model_class=CpGCounter,
)

print(f"DNA: {test_dna} \nüîπ Predicted CpG Count: {predicted_cpgs}")

DNA: NCACANNTNCGGAGGCGNAGCTCG 
üîπ Predicted CpG Count: 3.24


Next we will have a improved version of the model additional elements like regularization, dropout, and batch normalization to improve the model's performance
We will also use a more complex dataset with variable-length sequences to demonstrate the model's ability to handle such data

In [33]:
class CpGCounterAdvanced(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_size, num_layers, dropout):
        super(CpGCounterAdvanced, self).__init__()
        # Embedding layer to convert integer-encoded sequences to embeddings of fixed size (embedding_dim)
        self.embedding = nn.Embedding(
            vocab_size, embedding_dim, padding_idx=0
        )  # input: (batch_size, seq_len), output: (batch_size, seq_len, embedding_dim)
        # LSTM layer to process the embeddings and capture sequential information (hidden_size) and bidirectional
        self.lstm = nn.LSTM(
            embedding_dim,
            hidden_size,
            num_layers,
            batch_first=True,
            dropout=dropout,
            bidirectional=True,
        )  # input: (batch_size, seq_len, embedding_dim), output: (batch_size, seq_len, hidden_size)
        # add batch normalization 2 additional hidden states
        self.batch_norm = nn.BatchNorm1d(
            hidden_size * 2
        )  # input: (batch_size, seq_len, hidden_size), output: (batch_size, seq_len, hidden_size)
        # Fully connected layer to predict the number of CpG sites and ReLU activation
        self.fc = nn.Linear(
            hidden_size * 2, 1
        )  # input: (batch_size, seq_len, hidden_size), output: (batch_size, seq_len, 1)

        self.relu = nn.ReLU()

    def forward(self, x):
        # Embed the input sequences
        embedded = self.embedding(x)
        # Process the embeddings with an LSTM layer
        lstm_output, _ = self.lstm(embedded)
        # we only take the last hidden state
        lstm_output = lstm_output[
            :, -1, :
        ]  # shape is (batch_size, hidden_size) and we only take the last hidden state of the sequence
        # Batch normalization
        lstm_output = self.batch_norm(lstm_output)
        # Predict the number of CpG sites
        output = self.fc(lstm_output)
        return self.relu(output)

- Keep the validation loop unchanged
- Use GradScaler only in the training loop 
- Use autocast() only in the training loop

In [34]:
def training_loop_advanced(
    model, dataloader, device, optimizer, criterion, grad_scaler
):
    """
    Runs one training epoch.

    Parameters:
    - model (nn.Module): PyTorch model to train.
    - dataloader (DataLoader): Training DataLoader.
    - device (torch.device): CPU or GPU.
    - optimizer (torch.optim.Optimizer): Optimizer.
    - criterion (nn.Module): Loss function.

    Returns:
    - avg_loss (float): Average loss over dataset.
    - avg_mae (float): Average MAE over dataset.
    - avg_rmse (float): Average RMSE over dataset.
    """
    model.train()
    total_loss, total_mae, total_rmse = 0.0, 0.0, 0.0
    num_batches = len(dataloader)

    for inputs, labels in dataloader:
        inputs, labels = (
            inputs.to(device),
            labels.to(device),
        )

        optimizer.zero_grad()

        with autocast("cuda" if device.type == "cuda" else "cpu"):
            outputs = model(inputs).squeeze()
            loss = criterion(outputs, labels)

        grad_scaler.scale(loss).backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
        grad_scaler.step(optimizer)
        grad_scaler.update()

        # Compute batch-wise metrics
        batch_mae = mean_absolute_error(
            labels.cpu().numpy(), outputs.detach().cpu().numpy()
        )
        batch_rmse = (
            mean_squared_error(labels.cpu().numpy(), outputs.detach().cpu().numpy())
            ** 0.5
        )

        total_loss += loss.item()
        total_mae += batch_mae
        total_rmse += batch_rmse

    return total_loss / num_batches, total_mae / num_batches, total_rmse / num_batches

In [35]:
def train_model_advanced(
    model,
    train_loader,
    val_loader,
    device,
    epochs=25,
    patience=5,
    save_path="best_cpg_model_advanced.pth",
    lr=0.001,
    weight_decay=1e-4,
    resume=False,
):
    """
    Trains an LSTM model with validation and early stopping. Supports resuming training.

    Parameters:
    - model: LSTM model.
    - train_loader: Training DataLoader.
    - val_loader: Validation DataLoader.
    - device: CPU or GPU.
    - epochs: Max training epochs.
    - patience: Early stopping patience.
    - save_path: Path to save the best model.
    - lr: Initial learning rate.
    - weight_decay: L2 regularization weight.
    - resume: Whether to resume training from the last checkpoint.

    Returns:
    - Best trained model.
    """
    # initialize model and optimizer and scheduler and criterion and best_val_loss and start_epoch
    model.to(device)
    optimizer = optim.AdamW(model.parameters(), lr=lr, weight_decay=weight_decay)
    criterion = nn.MSELoss()
    scheduler = ReduceLROnPlateau(
        optimizer,
        mode="min",
        factor=0.1,
        patience=2,
    )
    # initialize gradient scaler
    grad_scaler = GradScaler()

    best_val_loss = float("inf")
    start_epoch = 0

    # Load checkpoint if resuming
    if resume and os.path.exists(save_path):
        try:
            model, optimizer, scheduler, start_epoch, best_val_loss = load_checkpoint(
                model, optimizer, scheduler, device, save_path
            )
        except FileNotFoundError:
            print("No checkpoint found. Starting training from scratch.")

    else:
        print("Starting training from scratch.")

    no_improvement = 0
    start_time = time.time()

    try:
        for epoch in range(start_epoch, epochs):
            train_loss, train_mae, train_rmse = training_loop_advanced(
                model, train_loader, device, optimizer, criterion, grad_scaler
            )
            val_loss, val_mae, val_rmse = validation_loop(
                model, val_loader, device, criterion
            )

            # Scheduler step
            if epoch > 0:
                scheduler.step(val_loss)

            # Print results
            print(
                f"Epoch {epoch+1}/{epochs}, Train Loss: {train_loss:.4f}, Train MAE: {train_mae:.4f}, Train RMSE: {train_rmse:.4f}, "
                f"Val Loss: {val_loss:.4f}, Val MAE: {val_mae:.4f}, Val RMSE: {val_rmse:.4f}"
            )

            # Save best model checkpoint
            if val_loss < best_val_loss:
                print(
                    f"New best validation loss: {val_loss:.4f} (previous best: {best_val_loss:.4f})"
                )
                best_val_loss = val_loss
                no_improvement = 0
                save_checkpoint(
                    epoch, model, optimizer, scheduler, best_val_loss, save_path
                )
            else:
                no_improvement += 1
                print(f"No improvement, patience left: {patience - no_improvement}")

            # Early stopping
            if no_improvement >= patience:
                print(f"Early Stopping Triggered after epoch {epoch+1}")
                break

    except KeyboardInterrupt:
        print("Training Interrupted! Saving last checkpoint...")
        save_checkpoint(epoch, model, optimizer, scheduler, best_val_loss, save_path)

    print(f"Training Completed in {(time.time() - start_time):.2f} seconds")

    # Load the best model before returning
    model, optimizer, scheduler, _, _ = load_checkpoint(
        model, optimizer, scheduler, device, save_path
    )
    return model

In [36]:
# initialize model and check the model architecture
advanced_model = CpGCounterAdvanced(
    vocab_size=vocab_size,
    embedding_dim=embedding_dim,
    hidden_size=hidden_size,
    num_layers=num_layers,
    dropout=dropout,
)
print(advanced_model)

CpGCounterAdvanced(
  (embedding): Embedding(6, 32, padding_idx=0)
  (lstm): LSTM(32, 64, num_layers=2, batch_first=True, dropout=0.2, bidirectional=True)
  (batch_norm): BatchNorm1d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (fc): Linear(in_features=128, out_features=1, bias=True)
  (relu): ReLU()
)


In [37]:
best_model_advanced = train_model_advanced(
    advanced_model,
    train_dataloader,
    val_dataloader,
    device,
    epochs=num_epochs,
    patience=5,
    save_path="best_cpg_model_advanced.pth",
    lr=learning_rate,
    weight_decay=weight_decay,
)

Starting training from scratch.
Epoch 1/50, Train Loss: 1.7186, Train MAE: 1.0320, Train RMSE: 1.2908, Val Loss: 1.5080, Val MAE: 0.8679, Val RMSE: 1.1319
New best validation loss: 1.5080 (previous best: inf)
Model checkpoint saved at best_cpg_model_advanced.pth
Epoch 2/50, Train Loss: 1.5898, Train MAE: 0.9833, Train RMSE: 1.2440, Val Loss: 1.1794, Val MAE: 0.7681, Val RMSE: 1.0123
New best validation loss: 1.1794 (previous best: 1.5080)
Model checkpoint saved at best_cpg_model_advanced.pth
Epoch 3/50, Train Loss: 1.3589, Train MAE: 0.9067, Train RMSE: 1.1408, Val Loss: 1.2141, Val MAE: 0.7913, Val RMSE: 1.0257
No improvement, patience left: 4
Epoch 4/50, Train Loss: 1.1039, Train MAE: 0.7801, Train RMSE: 1.0272, Val Loss: 0.8746, Val MAE: 0.7056, Val RMSE: 0.9086
New best validation loss: 0.8746 (previous best: 1.1794)
Model checkpoint saved at best_cpg_model_advanced.pth
Epoch 5/50, Train Loss: 0.6821, Train MAE: 0.5959, Train RMSE: 0.7945, Val Loss: 0.3252, Val MAE: 0.3981, Val RMS

In [38]:
# Test Example
test_dna = "NCAACGCGNAGCTCGGCNAGCTCG"
# no of cpg sites in the test example is 4


predicted_cpgs = predict_cpgs_from_dna(
    "best_cpg_model_advanced.pth",
    test_dna,
    dna2int,
    embedding_dim,
    hidden_size,
    num_layers,
    dropout,
    device,
    model_class=CpGCounterAdvanced,
)

print(f"DNA: {test_dna} \nüîπ Predicted CpG Count: {predicted_cpgs}")

DNA: NCAACGCGNAGCTCGGCNAGCTCG 
üîπ Predicted CpG Count: 4.46


#### Next we will train the model on variable length sequences with only padding

#### ------------------------ Data Preparation ------------------------

In [42]:
# Split variable-length dataset (80% train, 20% validation)
X_train, X_val, y_train, y_val = train_test_split(
    X_variable, y_variable, test_size=0.1, random_state=13
)
print(len(X_train), len(X_val))
print(len(y_train), len(y_val))

900 100
900 100


In [43]:
# pytorch standard dataset
train_dataset = CPGDataset(X_train, y_train)
val_dataset = CPGDataset(X_val, y_val)

print(len(train_dataset), len(val_dataset))
# check different lengths of sequences
print(min(map(len, X_train)), max(map(len, X_train)))
# dataloader
train_dataloader = DataLoader(
    train_dataset,
    batch_size=batch_size,
    shuffle=True,
    collate_fn=CPGDataset.collate_fn,
)

val_dataloader = DataLoader(
    val_dataset,
    batch_size=batch_size,
    shuffle=False,
    collate_fn=CPGDataset.collate_fn,
)

900 100
16 32


In [44]:
i = 0
for x_batch, y_batch in train_dataloader:
    print(x_batch.shape, y_batch.shape)
    i += 1
    if i == 5:
        break
print("----")
i = 0
for x_batch, y_batch in val_dataloader:
    print(x_batch.shape, y_batch.shape)
    i += 1
    if i == 5:
        break

torch.Size([16, 31]) torch.Size([16])
torch.Size([16, 32]) torch.Size([16])
torch.Size([16, 32]) torch.Size([16])
torch.Size([16, 32]) torch.Size([16])
torch.Size([16, 31]) torch.Size([16])
----
torch.Size([16, 32]) torch.Size([16])
torch.Size([16, 30]) torch.Size([16])
torch.Size([16, 32]) torch.Size([16])
torch.Size([16, 31]) torch.Size([16])
torch.Size([16, 32]) torch.Size([16])


##### Basic Model Training with Variable-Length Sequences (LSTM) and Advanced Model Training with Variable-Length Sequences (LSTM + BatchNorm)

In [50]:
new_model = CpGCounter(
    vocab_size=vocab_size,
    embedding_dim=embedding_dim,
    hidden_size=hidden_size,
    num_layers=num_layers,
    dropout=dropout,
)

In [51]:
best_model = train_model(
    new_model,
    train_dataloader,
    val_dataloader,
    device,
    epochs=num_epochs,
    patience=10,
    save_path="best_cpg_model.pth",
    lr=learning_rate,
    weight_decay=weight_decay,
)

Starting training from scratch.
Epoch 1/50, Train Loss: 0.9302, Train MAE: 0.7714, Train RMSE: 0.9495, Val Loss: 0.7795, Val MAE: 0.6885, Val RMSE: 0.8534
New best validation loss: 0.7795 (previous best: inf)
Model checkpoint saved at best_cpg_model.pth
Epoch 2/50, Train Loss: 0.7226, Train MAE: 0.7017, Train RMSE: 0.8340, Val Loss: 0.4869, Val MAE: 0.5521, Val RMSE: 0.6775
New best validation loss: 0.4869 (previous best: 0.7795)
Model checkpoint saved at best_cpg_model.pth
Epoch 3/50, Train Loss: 0.4878, Train MAE: 0.5664, Train RMSE: 0.6800, Val Loss: 0.3238, Val MAE: 0.4529, Val RMSE: 0.5483
New best validation loss: 0.3238 (previous best: 0.4869)
Model checkpoint saved at best_cpg_model.pth
Epoch 4/50, Train Loss: 0.2458, Train MAE: 0.3766, Train RMSE: 0.4785, Val Loss: 0.1645, Val MAE: 0.2930, Val RMSE: 0.3935
New best validation loss: 0.1645 (previous best: 0.3238)
Model checkpoint saved at best_cpg_model.pth
Epoch 5/50, Train Loss: 0.1069, Train MAE: 0.2023, Train RMSE: 0.2978, 

In [52]:
# Test Example
test_dna = "NCACANNTNCGGAGGCGNAGCTCG"
# no of CpG sites = 3

predicted_cpgs = predict_cpgs_from_dna(
    "best_cpg_model.pth",
    test_dna,
    dna2int,
    embedding_dim,
    hidden_size,
    num_layers,
    dropout,
    device,
    model_class=CpGCounter,
)

print(f"DNA: {test_dna} \nüîπ Predicted CpG Count: {predicted_cpgs}")

DNA: NCACANNTNCGGAGGCGNAGCTCG 
üîπ Predicted CpG Count: 3.01


In [53]:
# initialize model and check the model architecture
advanced_model = CpGCounterAdvanced(
    vocab_size=vocab_size,
    embedding_dim=embedding_dim,
    hidden_size=hidden_size,
    num_layers=num_layers,
    dropout=dropout,
)
print(advanced_model)

CpGCounterAdvanced(
  (embedding): Embedding(6, 32, padding_idx=0)
  (lstm): LSTM(32, 64, num_layers=2, batch_first=True, dropout=0.2, bidirectional=True)
  (batch_norm): BatchNorm1d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (fc): Linear(in_features=128, out_features=1, bias=True)
  (relu): ReLU()
)


In [54]:
best_model_advanced = train_model_advanced(
    advanced_model,
    train_dataloader,
    val_dataloader,
    device,
    epochs=num_epochs,
    patience=5,
    save_path="best_cpg_model_advanced.pth",
    lr=learning_rate,
    weight_decay=weight_decay,
)

Starting training from scratch.
Epoch 1/50, Train Loss: 1.2382, Train MAE: 0.8367, Train RMSE: 1.0873, Val Loss: 1.1232, Val MAE: 0.7975, Val RMSE: 1.0384
New best validation loss: 1.1232 (previous best: inf)
Model checkpoint saved at best_cpg_model_advanced.pth
Epoch 2/50, Train Loss: 1.0024, Train MAE: 0.7535, Train RMSE: 0.9696, Val Loss: 0.7408, Val MAE: 0.6321, Val RMSE: 0.8458
New best validation loss: 0.7408 (previous best: 1.1232)
Model checkpoint saved at best_cpg_model_advanced.pth
Epoch 3/50, Train Loss: 0.7406, Train MAE: 0.6242, Train RMSE: 0.8424, Val Loss: 0.4314, Val MAE: 0.4677, Val RMSE: 0.6521
New best validation loss: 0.4314 (previous best: 0.7408)
Model checkpoint saved at best_cpg_model_advanced.pth
Epoch 4/50, Train Loss: 0.5260, Train MAE: 0.5069, Train RMSE: 0.7053, Val Loss: 0.4188, Val MAE: 0.4911, Val RMSE: 0.6347
New best validation loss: 0.4188 (previous best: 0.4314)
Model checkpoint saved at best_cpg_model_advanced.pth
Epoch 5/50, Train Loss: 0.3015, Tra

In [55]:
# Test Example
test_dna = "NCAACGCGNAGCTCGGCNAGCTCG"
# no of cpg sites in the test example is 4


predicted_cpgs = predict_cpgs_from_dna(
    "best_cpg_model_advanced.pth",
    test_dna,
    dna2int,
    embedding_dim,
    hidden_size,
    num_layers,
    dropout,
    device,
    model_class=CpGCounterAdvanced,
)

print(f"DNA: {test_dna} \nüîπ Predicted CpG Count: {predicted_cpgs}")

DNA: NCAACGCGNAGCTCGGCNAGCTCG 
üîπ Predicted CpG Count: 4.1


##### Next we will try using packed sequences to train the model with variable length sequences which will be more efficient. The problems with only padding are:
- The model processes padded values, which are meaningless and waste computation.
- The model treats all sequences as the same length, which is not true for variable-length sequences.
- The model may not learn the actual sequence patterns due to the presence of padding.
##### Packed sequences solve these problems by:
- Ignoring padded values during computation.
- Preserving the original sequence lengths for each batch.
- Allowing the model to learn the actual sequence patterns without interference from padding.
- Improving the model's efficiency and accuracy.
- We will use the pack_padded_sequence and pad_packed_sequence functions in PyTorch to handle variable-length sequences.

##### Padding and Packing Sequences for RNNs in PyTorch

- Sort sequences by length (longest first).
- Pack sequences using torch.nn.utils.rnn.pack_padded_sequence.
- LSTM skips padded timesteps, processing only actual data.
- Unpack the sequences back using pad_packed_sequence.

In [56]:
class CPGDatasetPackPadding(Dataset):
    def __init__(self, sequences, labels):
        """
        Initialize the dataset with sequences and labels (CpG counts) as input arguments and store them as attributes.

        Parameters:
        - sequences (List[List[int]]): List of integer-encoded DNA sequences.
        - labels (List[int]): List of CpG counts corresponding to each sequence.

        Returns:
        - None
        """
        self.sequences = [torch.tensor(seq, dtype=torch.long) for seq in sequences]
        self.labels = torch.tensor(labels, dtype=torch.float32)

    def __len__(self):
        """
        Return the number of sequences in the dataset.
        """
        return len(self.sequences)

    def __getitem__(self, idx):
        """
        Retrieve a sequence and its label by index.
        """
        return self.sequences[idx], self.labels[idx]

    @staticmethod
    def collate_fn(batch):
        """
        Custom collate function to pack sequences dynamically and return a batch of sequences and labels.
        """
        sequences, labels = zip(*batch)
        # find the lenghts of the sequences and sort them
        lengths = torch.tensor([len(seq) for seq in sequences], dtype=torch.long)
        sorted_indices = torch.argsort(lengths, descending=True)
        # sort sequences by length
        sequences = [sequences[i] for i in sorted_indices]
        # sort labels by length
        labels = torch.tensor([labels[i] for i in sorted_indices], dtype=torch.float32)
        lengths = lengths[sorted_indices]

        # pad sequences and return a packed sequence and labels and lengths
        pad_sequences = pad_sequence(sequences, batch_first=True, padding_value=0)

        return pad_sequences, labels, lengths

In [58]:
# Split the X_variable and y_variable
X_train, X_val, y_train, y_val = train_test_split(
    X_variable, y_variable, test_size=0.1, random_state=18
)
print(len(X_train), len(X_val))

900 100


In [59]:
# pytorch standard dataset
train_dataset = CPGDatasetPackPadding(X_train, y_train)
val_dataset = CPGDatasetPackPadding(X_val, y_val)

In [61]:
# check different lengths of sequences
print(min(map(len, X_train)), max(map(len, X_train)))

16 32


In [90]:
# dataloader with pack padding sequence
train_dataloader = DataLoader(
    train_dataset,
    batch_size=batch_size,
    shuffle=True,
    collate_fn=CPGDatasetPackPadding.collate_fn,
)

val_dataloader = DataLoader(
    val_dataset,
    batch_size=batch_size,
    shuffle=False,
    collate_fn=CPGDatasetPackPadding.collate_fn,
)

In [91]:
for x_batch, y_batch, lengths in train_dataloader:
    print(x_batch.shape, y_batch.shape, lengths.shape)
    break

torch.Size([16, 32]) torch.Size([16]) torch.Size([16])


In [92]:
class CpGCounterAdvancedPackPadding(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_size, num_layers, dropout):
        super(CpGCounterAdvancedPackPadding, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx=0)

        self.lstm = nn.LSTM(
            embedding_dim,
            hidden_size,
            num_layers,
            batch_first=True,
            dropout=dropout,
            bidirectional=True,
        )

        self.batch_norm = nn.BatchNorm1d(hidden_size * 2)
        self.fc = nn.Linear(hidden_size * 2, 1)
        self.relu = nn.ReLU()

    def forward(self, x, lengths):
        """
        Forward pass using packed sequences.

        Parameters:
        - x: (batch_size, seq_len) -> Padded sequences.
        - lengths: (batch_size) -> Actual sequence lengths.

        Returns:
        - CpG count prediction.
        """
        embedded = self.embedding(
            x
        )  # input: (batch_size, seq_len), output: (batch_size, seq_len, embedding_dim)

        # Pack sequence to ignore padding
        packed_embedded = pack_padded_sequence(
            embedded, lengths.cpu(), batch_first=True, enforce_sorted=True
        )

        # LSTM processing
        packed_output, (hidden, cell) = self.lstm(packed_embedded)

        # Unpack sequence to get the output of each time step (hidden states)
        output, _ = pad_packed_sequence(packed_output, batch_first=True)

        # Extract last valid hidden state dynamically (from both directions)
        batch_indices = torch.arange(x.size(0), device=x.device)
        last_indices = lengths - 1
        last_hidden_states = output[batch_indices, last_indices, :]

        # Apply batch norm & fully connected layer
        last_hidden_states = self.batch_norm(last_hidden_states)
        output = self.fc(last_hidden_states)
        return self.relu(output)

---

## **2Ô∏è‚É£ Shape Transformations at Each Step**
Let's assume:
- `batch_size = 4`
- `max_seq_len = 6`
- `embedding_dim = 64`
- `hidden_size = 128`
- `bidirectional = True` (so `hidden_size * 2` is the actual hidden size)
- Example sequences with variable lengths:
  ```
  Input Sequences (Padded) ‚Üí Shape: (4, 6)
  ‚îå‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚î¨‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îê
  ‚îÇ   Seq      ‚îÇ  Length   ‚îÇ
  ‚îú‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îº‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚î§
  ‚îÇ [A, C, G]  ‚îÇ 3         ‚îÇ
  ‚îÇ [T, A, C, G] ‚îÇ 4      ‚îÇ
  ‚îÇ [G, C, T, A, G, C] ‚îÇ 6  ‚îÇ
  ‚îÇ [A]        ‚îÇ 1         ‚îÇ
  ‚îî‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚î¥‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îò
  ```

### **üîπ Step 1: Embedding Layer**
```python
embedded = self.embedding(x)
```
üîπ **Before**: `x` ‚Üí **(batch_size, seq_len) = (4, 6)** (Integer indices of DNA bases)  
üîπ **After**: `embedded` ‚Üí **(batch_size, seq_len, embedding_dim) = (4, 6, 64)**  
- Each nucleotide is converted into a **64-dimensional embedding vector**.

**Example Output Shape**:
```
embedded.shape = (4, 6, 64)
```

---

### **üîπ Step 2: Pack the Embedded Sequences**
```python
packed_embedded = pack_padded_sequence(embedded, lengths.cpu(), batch_first=True, enforce_sorted=True)
```
üîπ **Before**: `embedded.shape = (4, 6, 64)`  
üîπ **After**: `packed_embedded.shape ‚â† (4, 6, 64)` (Packed representation removes padding!)

**Key Points:**
- **Pack sequences to remove padding** before sending them into LSTM.
- The **LSTM now only processes valid sequence elements**, ignoring padding.
- **No fixed shape** because it's a packed structure.

---

### **üîπ Step 3: LSTM Processing**
```python
packed_output, (hidden, cell) = self.lstm(packed_embedded)
```
üîπ **Before**: `packed_embedded` (Packed sequence object)  
üîπ **After**: 
- `packed_output`: Packed sequence object with LSTM outputs  
- `hidden`: `(num_layers * 2, batch_size, hidden_size)`  
- `cell`: `(num_layers * 2, batch_size, hidden_size)`

#### **Shapes Explanation**
- **Hidden State (`hidden`)**:
  ```
  Shape: (num_layers * 2, batch_size, hidden_size)
  Example: (2, 4, 128) if num_layers=1, bidirectional=True
  ```
- **Packed Output (`packed_output`)**:
  - Cannot have a fixed shape (depends on `pack_padded_sequence()`).

---

### **üîπ Step 4: Unpack the Sequences**
```python
output, _ = pad_packed_sequence(packed_output, batch_first=True)
```
üîπ **Before**: `packed_output` (Packed sequence)  
üîπ **After**: `output.shape = (batch_size, seq_len, hidden_size * 2) = (4, 6, 256)`

**Key Transformation**:
- Converts **packed LSTM output** back to a **padded format**.
- Now we have a **fixed shape**, but the padding has no meaningful values.

---

### **üîπ Step 5: Extract Last Valid Hidden State**
```python
batch_indices = torch.arange(x.size(0), device=x.device)
last_indices = lengths - 1
last_hidden_states = output[batch_indices, last_indices, :]
```
üîπ **Before**: `output.shape = (4, 6, 256)`  
üîπ **After**: `last_hidden_states.shape = (4, 256)`

**Why is this needed?**
- Since sequences have **different lengths**, the last meaningful hidden state **is not always at `[:, -1, :]`**.
- Instead, we **extract the last valid hidden state per sequence** using `lengths`.

**Example**:
```
Lengths = [3, 4, 6, 1] (Actual sequence lengths)

Indices:
  Sequence 1 ‚Üí last valid state = output[0, 2, :]  (index 2)
  Sequence 2 ‚Üí last valid state = output[1, 3, :]  (index 3)
  Sequence 3 ‚Üí last valid state = output[2, 5, :]  (index 5)
  Sequence 4 ‚Üí last valid state = output[3, 0, :]  (index 0)
```

---

### **üîπ Step 6: Apply Batch Normalization & Fully Connected Layer**
```python
last_hidden_states = self.batch_norm(last_hidden_states)
output = self.fc(last_hidden_states)
return self.relu(output)
```
üîπ **Before**: `last_hidden_states.shape = (4, 256)`  
üîπ **After Batch Norm**: **`(4, 256)`** (Batch normalization applied)  
üîπ **After Fully Connected Layer**: **`(4, 1)`**  
üîπ **After ReLU Activation**: **`(4, 1)`**  

---
## **üìå Summary of Shape Transformations**
| **Step** | **Operation** | **Input Shape** | **Output Shape** |
|---------|-------------|--------------|--------------|
| **1Ô∏è‚É£** | Embedding Layer | `(batch_size, seq_len)` ‚Üí `(4, 6)` | `(batch_size, seq_len, embedding_dim)` ‚Üí `(4, 6, 64)` |
| **2Ô∏è‚É£** | Pack Sequences | `(4, 6, 64)` | Packed Sequence Object (removes padding) |
| **3Ô∏è‚É£** | LSTM Processing | Packed Sequence | `hidden: (num_layers*2, batch_size, hidden_size)` |
| **4Ô∏è‚É£** | Unpack Sequences | Packed Output | `(batch_size, seq_len, hidden_size * 2)` ‚Üí `(4, 6, 256)` |
| **5Ô∏è‚É£** | Extract Last Valid State | `(4, 6, 256)` | `(batch_size, hidden_size * 2)` ‚Üí `(4, 256)` |
| **6Ô∏è‚É£** | Batch Norm + Fully Connected | `(4, 256)` | `(4, 1)` |

---

## **üöÄ Key Takeaways**
‚úÖ **Using `pack_padded_sequence()` improves efficiency**  
‚úÖ **Unpacking and extracting last valid state avoids padding bias**  
‚úÖ **Dynamic last hidden state selection improves performance**  
‚úÖ **Packed sequences reduce unnecessary computation on padded tokens**  

üöÄ **Now your model is optimized for CpG site counting!** üî• Let me know if you have any doubts! üöÄ

In [100]:
def training_loop_pack_padded(
    model, dataloader, device, optimizer, criterion, grad_scaler
):
    """
    Runs one training epoch.

    Parameters:
    - model (nn.Module): PyTorch model to train.
    - dataloader (DataLoader): Training DataLoader.
    - device (torch.device): CPU or GPU.
    - optimizer (torch.optim.Optimizer): Optimizer.
    - criterion (nn.Module): Loss function.

    Returns:
    - avg_loss (float): Average loss over dataset.
    - avg_mae (float): Average MAE over dataset.
    - avg_rmse (float): Average RMSE over dataset.
    """
    model.train()
    total_loss, total_mae, total_rmse = 0.0, 0.0, 0.0
    num_batches = len(dataloader)

    for inputs, labels, lengths in dataloader:
        inputs, labels, lengths = (
            inputs.to(device),
            labels.to(device),
            lengths.to(device),
        )

        optimizer.zero_grad()

        with autocast("cuda" if device.type == "cuda" else "cpu"):
            outputs = model(inputs, lengths).squeeze()
            loss = criterion(outputs, labels)

        grad_scaler.scale(loss).backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
        grad_scaler.step(optimizer)
        grad_scaler.update()

        # Compute batch-wise metrics
        batch_mae = mean_absolute_error(
            labels.cpu().numpy(), outputs.detach().cpu().numpy()
        )
        batch_rmse = (
            mean_squared_error(labels.cpu().numpy(), outputs.detach().cpu().numpy())
            ** 0.5
        )

        total_loss += loss.item()
        total_mae += batch_mae
        total_rmse += batch_rmse

    return total_loss / num_batches, total_mae / num_batches, total_rmse / num_batches

In [101]:
def validation_loop_pack_padded(model, dataloader, device, criterion):
    """
    Runs one validation epoch.

    Parameters:
    - model (nn.Module): PyTorch model to evaluate.
    - dataloader (DataLoader): Validation DataLoader.
    - device (torch.device): CPU or GPU.
    - criterion (nn.Module): Loss function.

    Returns:
    - avg_loss (float): Average loss over dataset.
    - avg_mae (float): Average MAE over dataset.
    - avg_rmse (float): Average RMSE over dataset.
    """
    model.eval()
    total_loss, total_mae, total_rmse = 0.0, 0.0, 0.0
    num_batches = len(dataloader)

    with torch.no_grad():
        for inputs, labels, lengths in dataloader:
            inputs, labels, lengths = (
                inputs.to(device),
                labels.to(device),
                lengths.to(device),
            )
            outputs = model(inputs, lengths).squeeze()

            loss = criterion(outputs, labels)

            # Compute batch-wise metrics
            batch_mae = mean_absolute_error(labels.cpu().numpy(), outputs.cpu().numpy())
            batch_rmse = (
                mean_squared_error(labels.cpu().numpy(), outputs.cpu().numpy()) ** 0.5
            )

            total_loss += loss.item()
            total_mae += batch_mae
            total_rmse += batch_rmse

    return total_loss / num_batches, total_mae / num_batches, total_rmse / num_batches

In [102]:
def train_model_pack_padded(
    model,
    train_loader,
    val_loader,
    device,
    epochs=25,
    patience=5,
    save_path="best_cpg_model_advanced.pth",
    lr=0.001,
    weight_decay=1e-4,
    resume=False,
):
    """
    Trains an LSTM model with validation and early stopping. Supports resuming training.

    Parameters:
    - model: LSTM model.
    - train_loader: Training DataLoader.
    - val_loader: Validation DataLoader.
    - device: CPU or GPU.
    - epochs: Max training epochs.
    - patience: Early stopping patience.
    - save_path: Path to save the best model.
    - lr: Initial learning rate.
    - weight_decay: L2 regularization weight.
    - resume: Whether to resume training from the last checkpoint.

    Returns:
    - Best trained model.
    """
    # initialize model and optimizer and scheduler and criterion and best_val_loss and start_epoch
    model.to(device)
    optimizer = optim.AdamW(model.parameters(), lr=lr, weight_decay=weight_decay)
    criterion = nn.MSELoss()
    scheduler = ReduceLROnPlateau(
        optimizer,
        mode="min",
        factor=0.1,
        patience=2,
    )
    # initialize gradient scaler
    grad_scaler = GradScaler()

    best_val_loss = float("inf")
    start_epoch = 0

    # Load checkpoint if resuming
    if resume and os.path.exists(save_path):
        try:
            model, optimizer, scheduler, start_epoch, best_val_loss = load_checkpoint(
                model, optimizer, scheduler, device, save_path
            )
        except FileNotFoundError:
            print("No checkpoint found. Starting training from scratch.")

    else:
        print("Starting training from scratch.")

    no_improvement = 0
    start_time = time.time()

    try:
        for epoch in range(start_epoch, epochs):
            train_loss, train_mae, train_rmse = training_loop_pack_padded(
                model, train_loader, device, optimizer, criterion, grad_scaler
            )
            val_loss, val_mae, val_rmse = validation_loop_pack_padded(
                model, val_loader, device, criterion
            )

            # Scheduler step
            if epoch > 0:
                scheduler.step(val_loss)

            # Print results
            print(
                f"Epoch {epoch+1}/{epochs}, Train Loss: {train_loss:.4f}, Train MAE: {train_mae:.4f}, Train RMSE: {train_rmse:.4f}, "
                f"Val Loss: {val_loss:.4f}, Val MAE: {val_mae:.4f}, Val RMSE: {val_rmse:.4f}"
            )

            # Save best model checkpoint
            if val_loss < best_val_loss:
                print(
                    f"New best validation loss: {val_loss:.4f} (previous best: {best_val_loss:.4f})"
                )
                best_val_loss = val_loss
                no_improvement = 0
                save_checkpoint(
                    epoch, model, optimizer, scheduler, best_val_loss, save_path
                )
            else:
                no_improvement += 1
                print(f"No improvement, patience left: {patience - no_improvement}")

            # Early stopping
            if no_improvement >= patience:
                print(f"Early Stopping Triggered after epoch {epoch+1}")
                break

    except KeyboardInterrupt:
        print("Training Interrupted! Saving last checkpoint...")
        save_checkpoint(epoch, model, optimizer, scheduler, best_val_loss, save_path)

    print(f"Training Completed in {(time.time() - start_time):.2f} seconds")

    # Load the best model before returning
    model, optimizer, scheduler, _, _ = load_checkpoint(
        model, optimizer, scheduler, device, save_path
    )
    return model

In [105]:
new_model = CpGCounterAdvancedPackPadding(
    vocab_size=vocab_size,
    embedding_dim=embedding_dim,
    hidden_size=hidden_size,
    num_layers=num_layers,
    dropout=dropout,
)

print(new_model)

CpGCounterAdvancedPackPadding(
  (embedding): Embedding(6, 32, padding_idx=0)
  (lstm): LSTM(32, 64, num_layers=2, batch_first=True, dropout=0.2, bidirectional=True)
  (batch_norm): BatchNorm1d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (fc): Linear(in_features=128, out_features=1, bias=True)
  (relu): ReLU()
)


In [106]:
trained_model = train_model_pack_padded(
    new_model,
    train_dataloader,
    val_dataloader,
    device,
    epochs=num_epochs,
    patience=5,
    save_path="best_cpg_model_packpad.pth",
    lr=learning_rate,
    weight_decay=weight_decay,
)

Starting training from scratch.
Epoch 1/50, Train Loss: 1.2279, Train MAE: 0.7977, Train RMSE: 1.0848, Val Loss: 0.8468, Val MAE: 0.6747, Val RMSE: 0.8743
New best validation loss: 0.8468 (previous best: inf)
Model checkpoint saved at best_cpg_model_packpad.pth
Epoch 2/50, Train Loss: 1.0048, Train MAE: 0.7209, Train RMSE: 0.9796, Val Loss: 1.0637, Val MAE: 0.8400, Val RMSE: 1.0165
No improvement, patience left: 4
Epoch 3/50, Train Loss: 0.7115, Train MAE: 0.5995, Train RMSE: 0.8237, Val Loss: 0.4988, Val MAE: 0.5230, Val RMSE: 0.6958
New best validation loss: 0.4988 (previous best: 0.8468)
Model checkpoint saved at best_cpg_model_packpad.pth
Epoch 4/50, Train Loss: 0.4069, Train MAE: 0.4263, Train RMSE: 0.6082, Val Loss: 0.4523, Val MAE: 0.4793, Val RMSE: 0.6620
New best validation loss: 0.4523 (previous best: 0.4988)
Model checkpoint saved at best_cpg_model_packpad.pth
Epoch 5/50, Train Loss: 0.2445, Train MAE: 0.3264, Train RMSE: 0.4771, Val Loss: 0.2778, Val MAE: 0.3261, Val RMSE: 

In [110]:
def predict_cpgs_from_dna_pack_padded(
    model_path: str,
    dna_sequence: str,
    dna2int: dict,
    embedding_dim,
    hidden_size,
    num_layers,
    dropout,
    device,
    model_class=CpGCounterAdvanced,  # Ensure the correct model class is used
):
    """
    Predict CpG count from a human DNA string.

    Parameters:
    - model_path: Path to trained LSTM model.
    - dna_sequence: Human-readable DNA string.
    - dna2int: Dictionary mapping DNA bases to integer values.
    - embedding_dim: Dimension of embedding layer.
    - hidden_size: Size of LSTM hidden state.
    - num_layers: Number of LSTM layers.
    - dropout: Dropout rate.
    - device: The device ('cpu' or 'cuda') for inference.
    - model_class: The model class to initialize the architecture.

    Returns:
    - Predicted CpG count (rounded to 2 decimal places).
    """

    # Check if the model checkpoint exists
    if not os.path.exists(model_path):
        raise FileNotFoundError(f"Model checkpoint not found at {model_path}")

    # Load Model
    vocab_size = len(dna2int)
    model = model_class(
        vocab_size=vocab_size,
        embedding_dim=embedding_dim,
        hidden_size=hidden_size,
        num_layers=num_layers,
        dropout=dropout,
    )

    # Load the trained model checkpoint
    checkpoint = torch.load(model_path, map_location=device, weights_only=True)
    model.load_state_dict(checkpoint["model_state_dict"])
    model.to(device)  # Move model to the correct device
    model.eval()

    # Convert DNA string to integer sequence
    int_sequence = [
        dna2int.get(base, 0) for base in dna_sequence
    ]  # Map bases to integers
    int_tensor = (
        torch.tensor(int_sequence, dtype=torch.long).unsqueeze(0).to(device)
    )  # Add batch dim

    # Compute sequence length (as tensor) and move to the same device
    lengths = torch.tensor([len(int_sequence)], dtype=torch.long).to(device)

    # Inference
    with torch.no_grad():
        predicted_count = (
            model(int_tensor, lengths).squeeze().item()
        )  # Ensure it's a scalar

    return round(predicted_count, 2)

In [111]:
# Test Example
test_dna = "NCAACGCGNAGCTCGGCNAGCTCG"
# no of cpg sites in the test example is 4


predicted_cpgs = predict_cpgs_from_dna_pack_padded(
    "best_cpg_model_packpad.pth",
    test_dna,
    dna2int,
    embedding_dim,
    hidden_size,
    num_layers,
    dropout,
    device,
    model_class=CpGCounterAdvancedPackPadding,
)

print(f"DNA: {test_dna} \nüîπ Predicted CpG Count: {predicted_cpgs}")

DNA: NCAACGCGNAGCTCGGCNAGCTCG 
üîπ Predicted CpG Count: 4.18


################################################################ END ################################################################