# Part 1: Build CpG Detector

Here we have a simple problem, given a DNA sequence (of N, A, C, G, T), count the number of CpGs in the sequence (consecutive CGs).

We have defined a few helper functions / parameters for performing this task.

We need you to build a LSTM model and train it to complish this task in PyTorch.

A good solution will be a model that can be trained, with high confidence in correctness.

In [1]:
from typing import Sequence
from functools import partial
from torch.nn.utils.rnn import pad_sequence, pack_padded_sequence, pad_packed_sequence
import random
import torch
import numpy as np
import random

In [2]:
# DO NOT CHANGE HERE
def set_seed(seed=13):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed_all(seed)

set_seed(13)

# Use this for getting x label
def rand_sequence(n_seqs: int, seq_len: int=128) -> Sequence[int]:
    for i in range(n_seqs):
        yield [random.randint(0, 4) for _ in range(seq_len)]

# Use this for getting y label
def count_cpgs(seq: str) -> int:
    cgs = 0
    for i in range(0, len(seq) - 1):
        dimer = seq[i:i+2]
        # note that seq is a string, not a list
        if dimer == "CG":
            cgs += 1
    return cgs

# Alphabet helpers
alphabet = 'NACGT'
dna2int = { a: i for a, i in zip(alphabet, range(5))}
int2dna = { i: a for a, i in zip(alphabet, range(5))}

intseq_to_dnaseq = partial(map, int2dna.get)
dnaseq_to_intseq = partial(map, dna2int.get)

In [3]:
# we prepared two datasets for training and evaluation
# training data scale we set to 2048
# we test on 512

def prepare_data(num_samples=100):
    # prepared the training and test data
    # you need to call rand_sequence and count_cpgs here to create the dataset
    # step 1
    X_dna_seqs_train = list(rand_sequence(num_samples))
    """
    hint:
        1. You can check X_dna_seqs_train by print, the data is ids which is your training X
        2. You first convert ids back to DNA sequence
        3. Then you run count_cpgs which will yield CGs counts - this will be the labels (Y)
    """
    #step2
    temp = [list(intseq_to_dnaseq(seq)) for seq in X_dna_seqs_train] # use intseq_to_dnaseq here to convert ids back to DNA seqs

    #step3
    y_dna_seqs = [count_cpgs(seq) for seq in temp] # use count_cpgs here to generate labels with temp generated in step2

    return X_dna_seqs_train, y_dna_seqs

train_x, train_y = prepare_data(2048)
test_x, test_y = prepare_data(512)

In [4]:
# some config
LSTM_HIDDEN = 64
LSTM_LAYER = 2
batch_size = 32
learning_rate = 0.001
epoch_num = 10

In [5]:
class MyDataset(torch.utils.data.Dataset):
    def __init__(self, lists, labels) -> None:
        self.lists = lists
        self.labels = labels

    def __getitem__(self, index):
        return torch.LongTensor(self.lists[index]), self.labels[index]

    def __len__(self):
        return len(self.lists)


class PadSequence:
    #TODO
    def __call__(self, batch):
        # Sort batch by sequence length (descending order) to use pack_padded_sequence
        batch = sorted(batch, key=lambda x: len(x[0]), reverse=True)
        sequences, labels = zip(*batch)
        # Pad sequences
        padded_sequences = pad_sequence(sequences, batch_first=True)
        return padded_sequences, torch.tensor(labels, dtype=torch.float)

In [6]:
# create data loader
train_data_loader = torch.utils.data.DataLoader(
    MyDataset(train_x, train_y),
    batch_size=batch_size,
    shuffle=True,
    collate_fn=PadSequence()  # Use PadSequence as the collate_fn
)

test_data_loader = torch.utils.data.DataLoader(
    MyDataset(test_x, test_y),
    batch_size=batch_size,
    shuffle=False,
    collate_fn=PadSequence()  # Use PadSequence as the collate_fn
)

In [7]:
# Model
class CpGPredictor(torch.nn.Module):
    ''' Simple model that uses a LSTM to count the number of CpGs in a sequence '''
    def __init__(self):
        super(CpGPredictor, self).__init__()
        # TODO complete model, you are free to add whatever layers you need here
        # We do need a lstm and a classifier layer here but you are free to implement them in your way

        # Define LSTM layer with input size 1
        self.lstm = torch.nn.LSTM(input_size=1, hidden_size=LSTM_HIDDEN, num_layers=LSTM_LAYER, batch_first=True)
        self.fc = torch.nn.Linear(LSTM_HIDDEN, 1)

    def forward(self, x):
        # Reshape input tensor to match LSTM input requirements
        print("Shape after unsqueeze:", x.shape)
        x = x.float()  # Convert to float
        x = x.unsqueeze(-1)  # Add a new dimension for the input size
        print("Shape after adding new dimension:", x.shape)
        # Forward pass
        lstm_out, _ = self.lstm(x)
        print("Shape after LSTM layer:", lstm_out.shape)
        # Get the output of the last time step
        last_output = lstm_out[:, -1, :]
        # Apply fully connected layer
        logits = self.fc(last_output)
        print("Shape after fully connected layer:", logits.shape)
        # Squeeze to remove the singleton dimension
        return logits.squeeze()

In [8]:
# init model / loss function / optimizer etc.
model = CpGPredictor()
loss_fn = torch.nn.MSELoss()  # Mean Squared Error loss for regression task
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)  # Adam optimizer

In [9]:
# training (you can modify the code below)

# Move model to GPU if available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device:", device)
model.to(device)

for epoch in range(epoch_num):
    model.train()
    total_loss = .0

    for batch_x, batch_y in train_data_loader:
        #TODO complete training loop

        batch_x, batch_y = batch_x.to(device), batch_y.to(device)

        optimizer.zero_grad()

        # Forward pass
        y_pred = model(batch_x)

        # Compute loss
        loss = loss_fn(y_pred.squeeze(), batch_y.float())

        # Backward pass
        loss.backward()

        # Update Weights
        optimizer.step()
        total_loss += loss.item()

    print(f"Epoch {epoch + 1}, Loss: {total_loss / len(train_data_loader)}")

Using device: cpu
Shape after unsqueeze: torch.Size([32, 128])
Shape after adding new dimension: torch.Size([32, 128, 1])
Shape after LSTM layer: torch.Size([32, 128, 64])
Shape after fully connected layer: torch.Size([32, 1])
Shape after unsqueeze: torch.Size([32, 128])
Shape after adding new dimension: torch.Size([32, 128, 1])
Shape after LSTM layer: torch.Size([32, 128, 64])
Shape after fully connected layer: torch.Size([32, 1])
Shape after unsqueeze: torch.Size([32, 128])
Shape after adding new dimension: torch.Size([32, 128, 1])
Shape after LSTM layer: torch.Size([32, 128, 64])
Shape after fully connected layer: torch.Size([32, 1])
Shape after unsqueeze: torch.Size([32, 128])
Shape after adding new dimension: torch.Size([32, 128, 1])
Shape after LSTM layer: torch.Size([32, 128, 64])
Shape after fully connected layer: torch.Size([32, 1])
Shape after unsqueeze: torch.Size([32, 128])
Shape after adding new dimension: torch.Size([32, 128, 1])
Shape after LSTM layer: torch.Size([32, 12

In [10]:
# eval (you can modify the code below)
model.eval()

res_gs = []
res_pred = []

with torch.no_grad():
  for batch_x, batch_y in test_data_loader:
      # TODO complete inference loop
      batch_x = batch_x.to(device)

      # Forward pass
      y_pred = model(batch_x)
      res_pred.extend(y_pred.squeeze().tolist())
      res_gs.extend(batch_y.tolist())


Shape after unsqueeze: torch.Size([32, 128])
Shape after adding new dimension: torch.Size([32, 128, 1])
Shape after LSTM layer: torch.Size([32, 128, 64])
Shape after fully connected layer: torch.Size([32, 1])
Shape after unsqueeze: torch.Size([32, 128])
Shape after adding new dimension: torch.Size([32, 128, 1])
Shape after LSTM layer: torch.Size([32, 128, 64])
Shape after fully connected layer: torch.Size([32, 1])
Shape after unsqueeze: torch.Size([32, 128])
Shape after adding new dimension: torch.Size([32, 128, 1])
Shape after LSTM layer: torch.Size([32, 128, 64])
Shape after fully connected layer: torch.Size([32, 1])
Shape after unsqueeze: torch.Size([32, 128])
Shape after adding new dimension: torch.Size([32, 128, 1])
Shape after LSTM layer: torch.Size([32, 128, 64])
Shape after fully connected layer: torch.Size([32, 1])
Shape after unsqueeze: torch.Size([32, 128])
Shape after adding new dimension: torch.Size([32, 128, 1])
Shape after LSTM layer: torch.Size([32, 128, 64])
Shape afte

In [11]:
# TODO complete evaluation of the model
from sklearn.metrics import mean_absolute_error

# Calculate Mean Absolute Error
mae = mean_absolute_error(res_gs, res_pred)
print(f"Mean Absolute Error: {mae}")

Mean Absolute Error: 6.23855012236163e-05


# Part 2: what if the DNA sequences are not the same length

In [12]:
# DO NOT CHANGE HERE
random.seed(13)

# Use this for getting x label
def rand_sequence_var_len(n_seqs: int, lb: int=16, ub: int=128) -> Sequence[int]:
    for i in range(n_seqs):
        seq_len = random.randint(lb, ub)
        yield [random.randint(1, 5) for _ in range(seq_len)]


# Use this for getting y label
def count_cpgs(seq: str) -> int:
    cgs = 0
    for i in range(0, len(seq) - 1):
        dimer = seq[i:i+2]
        # note that seq is a string, not a list
        if dimer == "CG":
            cgs += 1
    return cgs


# Alphabet helpers
alphabet = 'NACGT'
dna2int = {a: i for a, i in zip(alphabet, range(1, 6))}
int2dna = {i: a for a, i in zip(alphabet, range(1, 6))}
dna2int.update({"pad": 0})
int2dna.update({0: "<pad>"})

intseq_to_dnaseq = partial(map, int2dna.get)
dnaseq_to_intseq = partial(map, dna2int.get)

In [13]:
# TODO complete the task based on the change
def prepare_data(num_samples=100, min_len=16, max_len=128):
    # TODO prepared the training and test data
    # you need to call rand_sequence and count_cpgs here to create the dataset
    X_dna_seqs_train = list(rand_sequence_var_len(num_samples, min_len, max_len))
    y_dna_seqs = [count_cpgs(seq) for seq in X_dna_seqs_train]

    return X_dna_seqs_train, y_dna_seqs


min_len, max_len = 64, 128
train_x, train_y = prepare_data(2048, min_len, max_len)
test_x, test_y = prepare_data(512, min_len, max_len)

In [14]:
# Update the PadSequence class to handle variable-length sequences
class PadSequence:
    def __call__(self, batch):
        # Sort batch by sequence length (descending order)
        batch = sorted(batch, key=lambda x: len(x[0]), reverse=True)
        sequences, labels = zip(*batch)
        # No padding needed for variable-length sequences
        return sequences, torch.tensor(labels, dtype=torch.float)

In [15]:
# TODO complete the rest

# Create data loader for variable-length sequences
train_data_loader_var_len = torch.utils.data.DataLoader(
    MyDataset(train_x, train_y),
    batch_size=batch_size,
    shuffle=True,
    collate_fn=PadSequence()  # Use PadSequence as the collate_fn
)

test_data_loader_var_len = torch.utils.data.DataLoader(
    MyDataset(test_x, test_y),
    batch_size=batch_size,
    shuffle=False,
    collate_fn=PadSequence()  # Use PadSequence as the collate_fn
)

In [16]:
# Save trained model after training
torch.save(model.state_dict(), 'trained_model.pth')
torch.save(model, 'cpg_detector_model.pth')

# Download the 'trained_model.pth' and 'cpg_detector_model.pth' model files
from google.colab import files
files.download('trained_model.pth')
files.download('cpg_detector_model.pth')

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>