# Part 1: Build CpG Detector

Here we have a simple problem, given a DNA sequence (of N, A, C, G, T), count the number of CpGs in the sequence (consecutive CGs).

We have defined a few helper functions / parameters for performing this task.

We need you to build a LSTM model and train it to complish this task in PyTorch.

A good solution will be a model that can be trained, with high confidence in correctness.

In [1]:
from typing import Sequence
from functools import partial
import random
import torch
import numpy as np
import random

In [2]:
# DO NOT CHANGE HERE
def set_seed(seed=13):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)    
    if torch.cuda.is_available():
        torch.cuda.manual_seed_all(seed)

set_seed(13)

# Use this for getting x label
def rand_sequence(n_seqs: int, seq_len: int=128) -> Sequence[int]:
    for i in range(n_seqs):
        yield [random.randint(0, 4) for _ in range(seq_len)]

# Use this for getting y label
def count_cpgs(seq: str) -> int:
    cgs = 0
    for i in range(0, len(seq) - 1):
        dimer = seq[i:i+2]
        # note that seq is a string, not a list
        if dimer == "CG":
            cgs += 1
    return cgs

# Alphabet helpers   
alphabet = 'NACGT'
dna2int = { a: i for a, i in zip(alphabet, range(5))}
int2dna = { i: a for a, i in zip(alphabet, range(5))}

intseq_to_dnaseq = partial(map, int2dna.get)
dnaseq_to_intseq = partial(map, dna2int.get)

In [25]:
dna_sequence = "ACGTN"
int_sequence = list(dnaseq_to_intseq(dna_sequence))
int_sequence

[1, 2, 3, 4, 0]

In [15]:
for a, i in zip(alphabet, range(5)):
    print(a,i)

N 0
A 1
C 2
G 3
T 4


In [18]:
for a, i in zip(alphabet, range(5)):
    print(i,a)

0 N
1 A
2 C
3 G
4 T


In [16]:
# import torch
print(torch.cuda.is_available())  # Should return True
print(torch.version.cuda)         # Should match your CUDA version
print(torch.cuda.get_device_name(0))  # Name of your GPU


True
12.4
NVIDIA GeForce RTX 4060 Laptop GPU


In [34]:
X_dna_seqs_train = list(rand_sequence(5))
print(X_dna_seqs_train)
temp = ["".join(intseq_to_dnaseq(seq)) for seq in X_dna_seqs_train]
temp



[[0, 1, 2, 3, 0, 1, 0, 2, 2, 0, 0, 1, 3, 0, 2, 0, 3, 1, 3, 1, 3, 1, 2, 4, 2, 3, 1, 0, 3, 4, 4, 0, 2, 0, 4, 1, 2, 0, 4, 1, 0, 1, 2, 1, 2, 4, 2, 2, 0, 2, 1, 3, 1, 1, 0, 4, 2, 0, 0, 0, 2, 4, 0, 4, 0, 0, 3, 0, 2, 0, 3, 0, 2, 0, 4, 2, 0, 1, 3, 4, 2, 4, 4, 2, 2, 1, 3, 1, 4, 0, 3, 2, 1, 3, 4, 2, 1, 1, 1, 0, 2, 0, 4, 3, 1, 1, 0, 2, 3, 4, 4, 3, 2, 0, 0, 3, 4, 1, 3, 1, 1, 3, 0, 4, 4, 2, 4, 4], [0, 0, 4, 0, 0, 3, 2, 3, 3, 3, 4, 2, 4, 0, 0, 0, 2, 1, 4, 0, 0, 1, 4, 1, 0, 0, 0, 4, 3, 2, 3, 1, 1, 3, 0, 2, 0, 1, 2, 2, 0, 3, 0, 0, 2, 4, 0, 1, 0, 4, 1, 1, 2, 2, 0, 3, 3, 3, 1, 4, 1, 3, 0, 1, 0, 4, 2, 0, 3, 0, 3, 2, 0, 0, 4, 4, 4, 4, 3, 1, 3, 3, 1, 1, 3, 1, 4, 0, 0, 0, 1, 1, 0, 4, 1, 1, 3, 3, 1, 2, 2, 3, 1, 4, 4, 3, 3, 1, 1, 2, 2, 4, 1, 1, 1, 4, 0, 3, 4, 4, 4, 2, 3, 0, 0, 1, 4, 2], [0, 0, 1, 3, 3, 0, 4, 3, 1, 0, 0, 4, 0, 1, 1, 1, 2, 1, 2, 0, 3, 1, 0, 0, 4, 2, 2, 2, 1, 3, 3, 2, 2, 0, 3, 2, 0, 0, 3, 4, 1, 2, 0, 0, 1, 1, 3, 1, 1, 4, 1, 1, 2, 0, 3, 4, 3, 1, 0, 3, 3, 3, 4, 2, 2, 3, 0, 0, 2, 4, 1, 1, 0, 3, 3, 2

['NACGNANCCNNAGNCNGAGAGACTCGANGTTNCNTACNTANACACTCCNCAGAANTCNNNCTNTNNGNCNGNCNTCNAGTCTTCCAGATNGCAGTCAAANCNTGAANCGTTGCNNGTAGAAGNTTCTT',
 'NNTNNGCGGGTCTNNNCATNNATANNNTGCGAAGNCNACCNGNNCTNANTAACCNGGGATAGNANTCNGNGCNNTTTTGAGGAAGATNNNAANTAAGGACCGATTGGAACCTAAATNGTTTCGNNATC',
 'NNAGGNTGANNTNAAACACNGANNTCCCAGGCCNGCNNGTACNNAAGAATAACNGTGANGGGTCCGNNCTAANGGCNNCNATTNGGTGCCTCAGCAACCATTGANTCGNACTTCTAATNATGNTCGCG',
 'NTAAGTTNACTAGNAATCAANTGGTNGCNCGGNATNNTTNAGTTTAGNGATNTNCNCTGAACAGNAGGGAAGNACNGTTANGGTCNTTGNACTGCACTAANNAGTNGCCNTGGACNTNANCGGNACNT',
 'NACCTGGCAAAACGCNCANCTAAACNCTGCGANGNTNNACGTNCNANACTGTATTTNNGCCCCTGTTNGCAANACTCTTGNATAATTNTNNCAACTGTGAACNATGANTANATCCTGTNTTTTTNGGA']

In [35]:
a=[count_cpgs(seq) for seq in temp]
a

[3, 4, 4, 2, 3]

In [3]:
# we prepared two datasets for training and evaluation
# training data scale we set to 2048
# we test on 512

def prepare_data(num_samples=100):
    # prepared the training and test data
    # you need to call rand_sequence and count_cpgs here to create the dataset
    # step 1
    X_dna_seqs_train = list(rand_sequence(num_samples))
    # print(X_dna_seqs_train)
    """
    hint:
        1. You can check X_dna_seqs_train by print, the data is ids which is your training X 
        2. You first convert ids back to DNA sequence
        3. Then you run count_cpgs which will yield CGs counts - this will be the labels (Y)
    """
    #step2
    temp = ["".join(intseq_to_dnaseq(seq)) for seq in X_dna_seqs_train] # use intseq_to_dnaseq here to convert ids back to DNA seqs
    #step3
    y_dna_seqs = [count_cpgs(seq) for seq in temp] # use count_cpgs here to generate labels with temp generated in step2
    
    return X_dna_seqs_train, y_dna_seqs
    
train_x, train_y = prepare_data(2048)
test_x, test_y = prepare_data(512)

In [48]:
print(train_x[0])
print(train_y[0])

[0, 2, 3, 2, 0, 2, 4, 1, 1, 3, 0, 3, 4, 2, 3, 0, 0, 2, 1, 3, 2, 4, 0, 1, 1, 3, 4, 0, 2, 1, 1, 1, 3, 3, 4, 4, 1, 0, 2, 4, 0, 2, 2, 4, 1, 1, 1, 2, 3, 4, 0, 1, 4, 0, 2, 2, 1, 4, 2, 3, 1, 0, 1, 2, 2, 3, 2, 2, 3, 2, 0, 3, 4, 0, 0, 4, 3, 2, 0, 2, 0, 0, 1, 3, 4, 1, 0, 0, 4, 0, 0, 3, 1, 4, 0, 0, 3, 1, 4, 1, 3, 0, 4, 3, 2, 3, 0, 1, 3, 1, 4, 4, 0, 1, 3, 1, 2, 4, 3, 4, 1, 1, 1, 4, 2, 0, 2, 4]
7


In [46]:
"".join(intseq_to_dnaseq(train_x[0]))
# "".join(intseq_to_dnaseq(seq)) for seq in X_dna_seqs_train

'CCACNGTCGTAACGNNCGGNNTACCTCGGCAAGGCNAGATTATNTCGNCCANGTGTGTACCATNTNAGNNGTAAGGAGNANACCCCNAGGGNANNGCGCTNCGCAGAAGNTAANTAGCNNTNNNNTNT'

In [43]:
# some config

LSTM_HIDDEN = 128          # Number of hidden units in LSTM layer (can be increased for more complex models)
LSTM_LAYER = 2             # Number of LSTM layers (2-3 is typical for moderate complexity)
batch_size = 64            # Size of each mini-batch during training
learning_rate = 0.001      # Learning rate for optimization (you can try reducing it for better accuracy)
epoch_num = 20 
input_size = 128
output_size = 1

In [45]:
# create data loader
from torch.utils.data import DataLoader
from torch.utils.data import TensorDataset
import matplotlib.pyplot as plt

# Convert data to PyTorch tensors
train_x = torch.tensor(train_x)
train_y = torch.tensor(train_y)
test_x = torch.tensor(test_x)
test_y = torch.tensor(test_y)
train_data_loader = DataLoader(TensorDataset(train_x, train_y), batch_size=batch_size)
test_data_loader = DataLoader(TensorDataset(test_x, test_y), batch_size=batch_size)

  train_x = torch.tensor(train_x)
  train_y = torch.tensor(train_y)
  test_x = torch.tensor(test_x)
  test_y = torch.tensor(test_y)


In [47]:
# Model
class CpGPredictor(torch.nn.Module):
    ''' Simple model that uses a LSTM to count the number of CpGs in a sequence '''
    def __init__(self,input_size=5, hidden_size=128, num_layers=1, output_size=1):
        super(CpGPredictor, self).__init__()
        # TODO complete model, you are free to add whatever layers you need here
        # We do need a lstm and a classifier layer here but you are free to implement them in your way
        self.lstm = torch.nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
        self.classifier = torch.nn.Linear(hidden_size, output_size)

    def forward(self, x):
        x = self.embedding(x)
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
        c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
        out, _ = self.lstm(x, (h0, c0))
        logits = self.fc(out[:, -1, :])
        return logits

In [60]:
class CpGPredictor1(torch.nn.Module):
    ''' Simple model that uses an LSTM to count the number of CpGs in a sequence '''
    def __init__(self, input_size=5, hidden_size=128, num_layers=1, output_size=1):
        super(CpGPredictor, self).__init__()
        self.embedding = torch.nn.Embedding(5, input_size)  # Embedding layer for input sequence
        self.lstm = torch.nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
        self.classifier = torch.nn.Linear(hidden_size, output_size)  # Linear layer for classification

    def forward(self, x):
        # Apply embedding layer to input
        x = self.embedding(x)
        
        # Initialize hidden and cell states for LSTM
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
        c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
        
        # Pass through LSTM
        out, _ = self.lstm(x, (h0, c0))
        
        # Apply classifier to the last hidden state
        logits = self.classifier(out[:, -1, :])
        
        return logits


In [49]:
# print(CpGPredictor.summ)

AttributeError: type object 'CpGPredictor' has no attribute 'summ'

In [50]:
import torch

class CpGPredictor2(torch.nn.Module):
    ''' Simple model that uses an LSTM to count the number of CpGs in a sequence '''
    def __init__(self, input_size=5, hidden_size=128, num_layers=1, output_size=1):
        super(CpGPredictor2, self).__init__()
        
        # Embedding layer for the DNA sequence
        self.embedding = torch.nn.Embedding(5, input_size)  # 5 possible symbols in DNA
        self.lstm = torch.nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
        self.classifier = torch.nn.Linear(hidden_size, output_size)

    def forward(self, x):
        # Convert input to LongTensor before passing to embedding
        x = x.long()  # Ensure input is of type LongTensor
        x = self.embedding(x)  # Apply the embedding layer
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
        c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
        out, _ = self.lstm(x, (h0, c0))
        logits = self.classifier(out[:, -1, :])
        return logits


In [53]:
from torchinfo import summary

# Assuming CpGPredictor model is defined and loaded as model
model = CpGPredictor(input_size=5, hidden_size=128, num_layers=1, output_size=1)

# Print the model summary for an input size (e.g., a batch size of 32, sequence length 128)
summary(model, input_size=(128,))  # (batch_size, sequence_length)


RuntimeError: Failed to run torchinfo. See above stack traces for more details. Executed layers up to: []

In [64]:
model = CpGPredictor1(input_size=5, hidden_size=LSTM_HIDDEN, num_layers=LSTM_LAYER, output_size=1)
model

TypeError: super(type, obj): obj must be an instance or subtype of type

In [58]:
def count_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)

# Get the number of parameters
num_params = count_parameters(model)
print(f'The model has {num_params} trainable parameters')

The model has 201345 trainable parameters


In [65]:
# init model / loss function / optimizer etc.

model = CpGPredictor1(input_size, LSTM_HIDDEN, LSTM_LAYER, output_size)
loss_fn = torch.nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

TypeError: super(type, obj): obj must be an instance or subtype of type

In [None]:
# training (you can modify the code below)
t_loss = .0
model.train()
model.zero_grad()
for _ in range(epoch_num):
    for batch in train_data_loader:
        #TODO complete training loop
        t_loss += loss.item()
        loss.backward()

    print(t_loss)
    t_loss = .0

In [66]:
class CpGPredictor(torch.nn.Module):
    ''' Simple model that uses a LSTM to count the number of CpGs in a sequence '''
    def __init__(self, input_size, hidden_size, num_layers, output_size):
        super(CpGPredictor, self).__init__()
        # We do need a lstm and a classifier layer here but you are free to implement them in your way
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.embedding = torch.nn.Embedding(5, input_size)
        self.lstm = torch.nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
        self.fc = torch.nn.Linear(hidden_size, output_size)

    def forward(self, x):
        x = self.embedding(x)
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
        c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
        out, _ = self.lstm(x, (h0, c0))
        out = self.fc(out[:, -1, :])
        return out

In [67]:
model = CpGPredictor(input_size, LSTM_HIDDEN, LSTM_LAYER, output_size)
loss_fn = torch.nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

In [68]:
graph_losses = []
for epoch in range(epoch_num):
    model.train()
    total_loss = 0
    for batch_x, batch_y in train_data_loader:
        optimizer.zero_grad()
        outputs = model(batch_x)
        loss = loss_fn(outputs, batch_y.float().unsqueeze(1))
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    epoch_loss = total_loss / len(train_data_loader)
    graph_losses.append(epoch_loss)
    print(f"Epoch {epoch+1}/{epoch_num}, Loss: {epoch_loss}")

Epoch 1/20, Loss: 9.990298725664616
Epoch 2/20, Loss: 4.216798312962055
Epoch 3/20, Loss: 4.192945018410683
Epoch 4/20, Loss: 4.1402736976742744
Epoch 5/20, Loss: 3.9744697585701942
Epoch 6/20, Loss: 3.8071797639131546
Epoch 7/20, Loss: 3.259594563394785
Epoch 8/20, Loss: 2.1341355107724667
Epoch 9/20, Loss: 1.0653156340122223
Epoch 10/20, Loss: 0.7300802897661924
Epoch 11/20, Loss: 0.5070901606231928
Epoch 12/20, Loss: 0.36563044413924217
Epoch 13/20, Loss: 0.2792621161788702
Epoch 14/20, Loss: 0.20765071082860231
Epoch 15/20, Loss: 0.20905277412384748
Epoch 16/20, Loss: 0.14201264269649982
Epoch 17/20, Loss: 0.10674727987498045
Epoch 18/20, Loss: 0.08013340027537197
Epoch 19/20, Loss: 0.06362906843423843
Epoch 20/20, Loss: 0.05347717896802351


In [59]:
graph_losses = []
for epoch in range(epoch_num):
    model.train()
    total_loss = 0
    for batch_x, batch_y in train_data_loader:
        optimizer.zero_grad()
        outputs = model(batch_x)
        loss = loss_fn(outputs, batch_y.float().unsqueeze(1))
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    epoch_loss = total_loss / len(train_data_loader)
    graph_losses.append(epoch_loss)
    print(f"Epoch {epoch+1}/{epoch_num}, Loss: {epoch_loss}")

AttributeError: 'CpGPredictor' object has no attribute 'embedding'

In [None]:
# eval (you can modify the code below)
model.eval()

res_gs = []
res_pred = []

for batch in test_data_loader:
    # TODO complete inference loop

In [69]:
model.eval()
total_loss = 0
with torch.no_grad():
    for batch_x, batch_y in test_data_loader:
        outputs = model(batch_x)
        loss = loss_fn(outputs, batch_y.float().unsqueeze(1))
        total_loss += loss.item()

print(f"Test Loss: {total_loss}")

Test Loss: 0.4357474483549595


In [70]:
# Compute accuracy on new set of test samples by running predictions
N = 100
test_x, test_y = prepare_data(N)

# Convert data to PyTorch tensors
test_x = torch.tensor(test_x)
test_y = torch.tensor(test_y)

# Run predictions for each sequence and compute overall accuracy
correct_predictions = 0
for sequence, y in zip(test_x, test_y):
    # Perform inference
    with torch.no_grad():
        output = model(sequence.unsqueeze(0))
    # Convert output to CpG count
    predicted_cpg_count = output.item()
    # Check if prediction is within a tolerance of 0.5
    if abs(predicted_cpg_count - (y.float().unsqueeze(0))) <= 0.3:
        correct_predictions += 1

# Compute overall accuracy
accuracy = correct_predictions / N
print("Overall Accuracy:", accuracy)

Overall Accuracy: 0.84


In [71]:
torch.save(model.state_dict(), "model.pth")

# Part 2: what if the DNA sequences are not the same length

In [None]:
# hint we will need following imports
from torch.nn.utils.rnn import pad_sequence, pack_padded_sequence, pad_packed_sequence

In [None]:
# DO NOT CHANGE HERE
random.seed(13)

# Use this for getting x label
def rand_sequence_var_len(n_seqs: int, lb: int=16, ub: int=128) -> Sequence[int]:
    for i in range(n_seqs):
        seq_len = random.randint(lb, ub)
        yield [random.randint(1, 5) for _ in range(seq_len)]


# Use this for getting y label
def count_cpgs(seq: str) -> int:
    cgs = 0
    for i in range(0, len(seq) - 1):
        dimer = seq[i:i+2]
        # note that seq is a string, not a list
        if dimer == "CG":
            cgs += 1
    return cgs


# Alphabet helpers   
alphabet = 'NACGT'
dna2int = {a: i for a, i in zip(alphabet, range(1, 6))}
int2dna = {i: a for a, i in zip(alphabet, range(1, 6))}
dna2int.update({"pad": 0})
int2dna.update({0: "<pad>"})

intseq_to_dnaseq = partial(map, int2dna.get)
dnaseq_to_intseq = partial(map, dna2int.get)

In [None]:
# TODO complete the task based on the change
def prepare_data(num_samples=100, min_len=16, max_len=128):
    # TODO prepared the training and test data
    # you need to call rand_sequence and count_cpgs here to create the dataset
    #step 1
    X_dna_seqs_train = list(rand_sequence_var_len(num_samples, min_len, max_len))
    #step 2
    temp = ???
    #step3
    y_dna_seqs = ???
    
    return X_dna_seqs_train, y_dna_seqs
    
    
min_len, max_len = 64, 128
train_x, train_y = prepare_data(2048, min_len, max_len)
test_x, test_y = prepare_data(512, min_len, max_len)

In [None]:
class MyDataset(torch.utils.data.Dataset):
    def __init__(self, lists, labels) -> None:
        self.lists = lists
        self.labels = labels

    def __getitem__(self, index):
        return torch.LongTensor(self.lists[index]), self.labels[index]

    def __len__(self):
        return len(self.lists)

    
# this will be a collate_fn for dataloader to pad sequence  
class PadSequence:
    #TODO

In [None]:
# TODO complete the rest