# Part 1: Build CpG Detector

Here we have a simple problem, given a DNA sequence (of N, A, C, G, T), count the number of CpGs in the sequence (consecutive CGs).

We have defined a few helper functions / parameters for performing this task.

We need you to build a LSTM model and train it to complish this task in PyTorch.

A good solution will be a model that can be trained, with high confidence in correctness.

In [1]:
from typing import Sequence, List
from functools import partial
import random
import torch
from torch.utils.data import Dataset, DataLoader
import numpy as np
import random

In [2]:
# DO NOT CHANGE HERE
def set_seed(seed=13):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed_all(seed)

set_seed(13)

# Use this for getting x label
def rand_sequence(n_seqs: int, seq_len: int=128) -> Sequence[int]:
    for i in range(n_seqs):
        yield [random.randint(0, 4) for _ in range(seq_len)]

# Use this for getting y label
def count_cpgs(seq: str) -> int:
    cgs = 0
    for i in range(0, len(seq) - 1):
        dimer = seq[i:i+2]
        # note that seq is a string, not a list
        if dimer == "CG":
            cgs += 1
    return cgs

# Alphabet helpers
alphabet = 'NACGT'
dna2int = { a: i for a, i in zip(alphabet, range(5))}
int2dna = { i: a for a, i in zip(alphabet, range(5))}

intseq_to_dnaseq = partial(map, int2dna.get)
dnaseq_to_intseq = partial(map, dna2int.get)

In [3]:
def custom_label_seq(seq: str) -> List:
    """
    we will return a list of 0 and 1 such that:
        1. if previous character + current character == "CG", we will add 1 to the list
        2. if previous character + current character != "CG", we will add 0 to the list
        3. length of final list will be len(seq) - 1
    """
    labels = []
    for i in range(0, len(seq) - 1):
        dimer = seq[i:i+2]
        if dimer == "CG":
            labels.append(1)
        else:
            labels.append(0)
    return labels

In [4]:
# we prepared two datasets for training and evaluation
# training data scale we set to 2048
# we test on 512

def prepare_data(num_samples=100):
    # prepared the training and test data
    # you need to call rand_sequence and count_cpgs here to create the dataset
    # step 1
    X_dna_seqs_train = list(rand_sequence(num_samples))
    """
    hint:
        1. You can check X_dna_seqs_train by print, the data is ids which is your training X
        2. You first convert ids back to DNA sequence
        3. Then you run count_cpgs which will yield CGs counts - this will be the labels (Y)
    """
    #step2
    temp = ["".join(list(intseq_to_dnaseq(seq))) for seq in X_dna_seqs_train] # use intseq_to_dnaseq here to convert ids back to DNA seqs
    #step3
    y_dna_seqs = [count_cpgs(seq) for seq in temp] # use count_cpgs here to generate labels with temp generated in step2

    custom_y_dna_seqs = [custom_label_seq(seq) for seq in temp]

    return X_dna_seqs_train, custom_y_dna_seqs, y_dna_seqs

train_x, train_y, train_y_int = prepare_data(2048)
test_x, test_y, test_y_int = prepare_data(512)

In [5]:
# assert that length of train_x, train_y and train_y_int are same
for l in range(len(train_x)):
    assert len(train_x[l]) == 128 and len(train_y[l]) == 127, print(l)

In [6]:
# some config
LSTM_HIDDEN = 64
EMBEDDING_DIM = 16
LSTM_LAYER = 1
batch_size = 32
learning_rate = 0.005
epoch_num = 50

In [7]:
# create data loader
class SequenceDataset(Dataset):
    def __init__(self, sequences, custom_labels, int_labels):
        self.sequences = sequences
        self.custom_labels = custom_labels
        self.int_labels = int_labels

    def __len__(self):
        return len(self.sequences)

    def __getitem__(self, idx):
        return torch.tensor(self.sequences[idx], dtype=torch.long), torch.tensor(self.custom_labels[idx], dtype=torch.float), self.int_labels[idx]


train_dataset = SequenceDataset(train_x, train_y, train_y_int)
test_dataset = SequenceDataset(test_x, test_y, test_y_int)

train_data_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_data_loader = DataLoader(test_dataset, batch_size=batch_size*2, shuffle=False)

In [8]:
# test dataloader
for batch in train_data_loader:
    sequence, custom_labels, int_labels = batch
    print(sequence.shape, custom_labels.shape, int_labels.shape)
    break

torch.Size([32, 128]) torch.Size([32, 127]) torch.Size([32])


In [9]:
# Model
import torch.nn as nn

class CpGPredictor(torch.nn.Module):
    ''' Simple model that uses a LSTM to count the number of CpGs in a sequence '''
    def __init__(self, embedding_dim, hidden_dim, vocab_size):
        super(CpGPredictor, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.linear_lstm_layer = nn.Linear(embedding_dim * 2, 1)

    def forward(self, x):
        # x shape: (batch_size, seq_len)
        embedded = self.embedding(x) # embedded shape: (batch_size, seq_len, embedding_dim)

        # Prepare input for the linear layer: concatenate embeddings of prev and curr tokens
        # We need to process pairs of tokens (x[i], x[i+1])
        # This means the output will have length seq_len - 1
        batch_size, seq_len, embedding_dim = embedded.shape
        logits = []

        for i in range(1, seq_len):
            # Get the embeddings of the current and previous tokens
            curr_emb = embedded[:, i, :] # curr_emb shape: (batch_size, embedding_dim)
            prev_emb = embedded[:, i-1, :] # prev_emb shape: (batch_size, embedding_dim)

            pair_input = torch.cat([prev_emb, curr_emb], dim=1) # pair_input shape: (batch_size, 2 * embedding_dim)
            lstm_out = self.linear_lstm_layer(pair_input) # lstm_out shape: (batch_size, 1)
            logits.append(lstm_out)

        logits = torch.stack(logits, dim=1) # logits shape: (batch_size, seq_len - 1, 1)
        return logits.squeeze(-1)

In [10]:
# init model / loss function / optimizer etc.
import torch.optim as optim
import torch.nn as nn

# Vocabulary size is 5 (N, A, C, G, T)
VOCAB_SIZE = 5

model = CpGPredictor(embedding_dim=EMBEDDING_DIM, hidden_dim=LSTM_HIDDEN, vocab_size=VOCAB_SIZE)
loss_fn = nn.BCEWithLogitsLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

In [11]:
# training (you can modify the code below)
model.train()
model.zero_grad()
for epoch in range(epoch_num):
    total_loss = 0
    total_cpg_diff = 0
    for batch in train_data_loader:
        # complete training loop
        sequences, custom_labels, int_labels = batch
        optimizer.zero_grad()
        logits = model(sequences)
        loss = loss_fn(logits, custom_labels)
        loss.backward()
        optimizer.step()

        total_loss += loss.item()

        # Calculate the sum of sigmoid outputs
        sigmoid_outputs = torch.sigmoid(logits)
        sum_sigmoid = torch.sum(sigmoid_outputs, dim=1) # Sum over the sequence length dimension

        # Calculate the difference with integer labels
        cpg_diff = torch.sum(torch.abs(sum_sigmoid - int_labels))
        total_cpg_diff += cpg_diff.item()

    avg_loss = total_loss / len(train_data_loader)
    avg_cpg_diff = total_cpg_diff / len(train_data_loader)

    print(f"Epoch {epoch+1}/{epoch_num}, Train Loss: {avg_loss:.4f}, Avg CpG Sum Difference: {avg_cpg_diff:.4f}")

Epoch 1/50, Train Loss: 0.2589, Avg CpG Sum Difference: 633.1917
Epoch 2/50, Train Loss: 0.0591, Avg CpG Sum Difference: 69.0247
Epoch 3/50, Train Loss: 0.0307, Avg CpG Sum Difference: 33.8325
Epoch 4/50, Train Loss: 0.0179, Avg CpG Sum Difference: 19.8090
Epoch 5/50, Train Loss: 0.0112, Avg CpG Sum Difference: 12.4842
Epoch 6/50, Train Loss: 0.0075, Avg CpG Sum Difference: 8.3829
Epoch 7/50, Train Loss: 0.0053, Avg CpG Sum Difference: 5.9696
Epoch 8/50, Train Loss: 0.0039, Avg CpG Sum Difference: 4.3713
Epoch 9/50, Train Loss: 0.0030, Avg CpG Sum Difference: 3.3710
Epoch 10/50, Train Loss: 0.0023, Avg CpG Sum Difference: 2.6576
Epoch 11/50, Train Loss: 0.0019, Avg CpG Sum Difference: 2.1249
Epoch 12/50, Train Loss: 0.0016, Avg CpG Sum Difference: 1.7675
Epoch 13/50, Train Loss: 0.0013, Avg CpG Sum Difference: 1.4600
Epoch 14/50, Train Loss: 0.0011, Avg CpG Sum Difference: 1.2350
Epoch 15/50, Train Loss: 0.0009, Avg CpG Sum Difference: 1.0689
Epoch 16/50, Train Loss: 0.0008, Avg CpG Su

In [12]:
# eval (you can modify the code below)
model.eval()

res_gs = []
res_pred = []

with torch.no_grad():
    for sequences, custom_labels, int_labels in test_data_loader:
        # TODO complete inference loop
        logits = model(sequences)
        sigmoid_outputs = torch.sigmoid(logits)
        sum_sigmoid = torch.sum(sigmoid_outputs, dim=1)

        res_gs.extend(int_labels)
        res_pred.extend(sum_sigmoid.tolist())

In [13]:
# TODO complete evaluation of the model
# Diplay accuracy and confusion matrix
from sklearn.metrics import accuracy_score, confusion_matrix
import numpy as np

# Convert lists to numpy arrays
res_gs = np.array(res_gs)
res_pred = np.array(res_pred)
res_pred_rounded = np.round(res_pred)

# Calculate accuracy
accuracy = accuracy_score(res_gs, res_pred_rounded)
print(f"Accuracy: {accuracy}")

# Display confusion matrix
conf_matrix = confusion_matrix(res_gs, res_pred_rounded)
print("Confusion Matrix:")
print(conf_matrix)

Accuracy: 1.0
Confusion Matrix:
[[  2   0   0   0   0   0   0   0   0   0   0   0   0]
 [  0  10   0   0   0   0   0   0   0   0   0   0   0]
 [  0   0  37   0   0   0   0   0   0   0   0   0   0]
 [  0   0   0  70   0   0   0   0   0   0   0   0   0]
 [  0   0   0   0  92   0   0   0   0   0   0   0   0]
 [  0   0   0   0   0 107   0   0   0   0   0   0   0]
 [  0   0   0   0   0   0  85   0   0   0   0   0   0]
 [  0   0   0   0   0   0   0  50   0   0   0   0   0]
 [  0   0   0   0   0   0   0   0  27   0   0   0   0]
 [  0   0   0   0   0   0   0   0   0  19   0   0   0]
 [  0   0   0   0   0   0   0   0   0   0   9   0   0]
 [  0   0   0   0   0   0   0   0   0   0   0   3   0]
 [  0   0   0   0   0   0   0   0   0   0   0   0   1]]


# Part 2: what if the DNA sequences are not the same length

In [14]:
# hint we will need following imports
from torch.nn.utils.rnn import pad_sequence, pack_padded_sequence, pad_packed_sequence

In [15]:
# DO NOT CHANGE HERE
random.seed(13)

# Use this for getting x label
def rand_sequence_var_len(n_seqs: int, lb: int=16, ub: int=128):
    for i in range(n_seqs):
        seq_len = random.randint(lb, ub)
        yield [random.randint(1, 5) for _ in range(seq_len)]


# Use this for getting y label
def count_cpgs(seq: str) -> int:
    cgs = 0
    for i in range(0, len(seq) - 1):
        dimer = seq[i:i+2]
        # note that seq is a string, not a list
        if dimer == "CG":
            cgs += 1
    return cgs


# Alphabet helpers
alphabet = 'NACGT'
dna2int = {a: i for a, i in zip(alphabet, range(1, 6))}
int2dna = {i: a for a, i in zip(alphabet, range(1, 6))}
dna2int.update({"pad": 0})
int2dna.update({0: "<pad>"})

intseq_to_dnaseq = partial(map, int2dna.get)
dnaseq_to_intseq = partial(map, dna2int.get)

In [16]:
# TODO complete the task based on the change
def prepare_data(num_samples=100, min_len=16, max_len=128):
    # TODO prepared the training and test data
    # you need to call rand_sequence and count_cpgs here to create the dataset
    #step 1
    X_dna_seqs_train = list(rand_sequence_var_len(num_samples, min_len, max_len))
    #step 2
    temp = ["".join(list(intseq_to_dnaseq(seq))) for seq in X_dna_seqs_train] # use intseq_to_dnaseq here to convert ids back to DNA seqs
    #step3
    y_dna_seqs = [count_cpgs(seq) for seq in temp] # use count_cpgs here to generate labels with temp generated in step2

    return X_dna_seqs_train, y_dna_seqs


min_len, max_len = 64, 128
train_x, train_y = prepare_data(2048, min_len, max_len)
test_x, test_y = prepare_data(512, min_len, max_len)

In [17]:
class MyDataset(torch.utils.data.Dataset):
    def __init__(self, lists, labels) -> None:
        self.lists = lists
        self.labels = labels

    def __getitem__(self, index):
        return torch.LongTensor(self.lists[index]), self.labels[index]

    def __len__(self):
        return len(self.lists)


# this will be a collate_fn for dataloader to pad sequence
class PadSequence:
    #TODO
    def __call__(self, batch):
        # batch is a list of (sequence, label) tuples
        sequences, labels = zip(*batch)

        # Get original lengths
        lengths = torch.tensor([len(seq) for seq in sequences])

        # Pad sequences
        # pad_sequence pads to the longest sequence in the batch
        # padding_value=0 because 0 is our padding token id
        padded_sequences = pad_sequence(sequences, batch_first=True, padding_value=0)

        # Convert labels to a tensor
        labels = torch.tensor(labels, dtype=torch.float)

        return padded_sequences, lengths, labels

In [18]:
# some config
LSTM_HIDDEN = 64
EMBEDDING_DIM = 16
LSTM_LAYER = 1
batch_size = 32
learning_rate = 0.005
epoch_num = 50

In [19]:
# TODO complete the rest
# create dataset
train_dataset = MyDataset(train_x, train_y)
test_dataset = MyDataset(test_x, test_y)

# create dataloader with collate_fn
train_data_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, collate_fn=PadSequence())
test_data_loader = DataLoader(test_dataset, batch_size=batch_size*2, shuffle=False, collate_fn=PadSequence())

In [20]:
# test dataloader
for batch in train_data_loader:
    sequences, lengths, labels = batch
    print(sequences.shape, lengths.shape, labels.shape)
    break

torch.Size([32, 127]) torch.Size([32]) torch.Size([32])


In [21]:
class CpGPredictor2(torch.nn.Module):
    ''' Simple model that uses a LSTM to count the number of CpGs in a sequence '''
    def __init__(self, embedding_dim, hidden_dim, num_layers, vocab_size):
        super(CpGPredictor2, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.lstm = nn.LSTM(embedding_dim, hidden_dim, num_layers, batch_first=True)
        # Linear layer to predict the count from the last hidden state
        self.linear = nn.Linear(hidden_dim, 1)


    def forward(self, x, lengths):
        embedded = self.embedding(x) # embedded shape: (batch_size, seq_len, embedding_dim)

        # Pack the padded sequences
        packed_embedded = pack_padded_sequence(embedded, lengths.cpu(), batch_first=True, enforce_sorted=False)

        # Pass through LSTM
        packed_output, (hidden, cell) = self.lstm(packed_embedded)

        # Use the hidden state from the last time step of the last layer
        # hidden shape: (num_layers, batch_size, hidden_dim)
        # We want the last layer's hidden state: hidden[-1, :, :]
        last_hidden_state = hidden[-1, :, :] # shape: (batch_size, hidden_dim)

        # Pass the last hidden state through the linear layer
        logits = self.linear(last_hidden_state) # shape: (batch_size, 1)

        # Squeeze the last dimension to get shape (batch_size,)
        logits = logits.squeeze(-1)

        return logits

In [25]:
# Vocabulary size is 5 (N, A, C, G, T, <pad>)
VOCAB_SIZE = 6

model2 = CpGPredictor2(embedding_dim=EMBEDDING_DIM, hidden_dim=LSTM_HIDDEN, num_layers=LSTM_LAYER, vocab_size=VOCAB_SIZE)
loss_fn = nn.MSELoss()
optimizer = optim.Adam(model2.parameters(), lr=learning_rate)

In [26]:
# training loop
model2.train()
optimizer.zero_grad()

for epoch in range(epoch_num):
    total_loss = 0
    for sequences, lengths, labels in train_data_loader:
        optimizer.zero_grad()
        # Forward pass
        logits = model2(sequences, lengths)
        # Calculate loss
        loss = loss_fn(logits, labels)
        # Backward pass and optimize
        loss.backward()
        optimizer.step()

        total_loss += loss.item()

    avg_loss = total_loss / len(train_data_loader)
    print(f"Epoch {epoch+1}/{epoch_num}, Train Loss: {avg_loss:.4f}")

Epoch 1/50, Train Loss: 4.7456
Epoch 2/50, Train Loss: 3.6923
Epoch 3/50, Train Loss: 3.6871
Epoch 4/50, Train Loss: 3.6673
Epoch 5/50, Train Loss: 3.6819
Epoch 6/50, Train Loss: 3.6492
Epoch 7/50, Train Loss: 3.6678
Epoch 8/50, Train Loss: 3.6476
Epoch 9/50, Train Loss: 3.6237
Epoch 10/50, Train Loss: 3.6586
Epoch 11/50, Train Loss: 3.6781
Epoch 12/50, Train Loss: 3.7141
Epoch 13/50, Train Loss: 3.6311
Epoch 14/50, Train Loss: 3.5425
Epoch 15/50, Train Loss: 2.7049
Epoch 16/50, Train Loss: 1.1222
Epoch 17/50, Train Loss: 0.4451
Epoch 18/50, Train Loss: 0.1909
Epoch 19/50, Train Loss: 0.0873
Epoch 20/50, Train Loss: 0.0667
Epoch 21/50, Train Loss: 0.0653
Epoch 22/50, Train Loss: 0.0347
Epoch 23/50, Train Loss: 0.0197
Epoch 24/50, Train Loss: 0.0256
Epoch 25/50, Train Loss: 0.0185
Epoch 26/50, Train Loss: 0.0135
Epoch 27/50, Train Loss: 0.0237
Epoch 28/50, Train Loss: 0.0271
Epoch 29/50, Train Loss: 0.0108
Epoch 30/50, Train Loss: 0.0088
Epoch 31/50, Train Loss: 0.0111
Epoch 32/50, Trai

In [27]:
model2.eval()

res_gs2 = []
res_pred2 = []

with torch.no_grad():
    for sequences, lengths, labels in test_data_loader:
        logits = model2(sequences, lengths)
        res_gs2.extend(labels.tolist())
        res_pred2.extend(logits.tolist())

In [29]:
# Convert lists to numpy arrays
res_gs2 = np.array(res_gs2)
res_pred2 = np.array(res_pred2)
res_pred_rounded2 = np.round(res_pred2)

# Calculate accuracy
accuracy = accuracy_score(res_gs2, res_pred_rounded2)
print(f"Accuracy: {accuracy}")

# Display confusion matrix
conf_matrix = confusion_matrix(res_gs2, res_pred_rounded2)
print("Confusion Matrix:")
print(conf_matrix)

Accuracy: 1.0
Confusion Matrix:
[[12  0  0  0  0  0  0  0  0  0  0  0]
 [ 0 55  0  0  0  0  0  0  0  0  0  0]
 [ 0  0 81  0  0  0  0  0  0  0  0  0]
 [ 0  0  0 93  0  0  0  0  0  0  0  0]
 [ 0  0  0  0 84  0  0  0  0  0  0  0]
 [ 0  0  0  0  0 93  0  0  0  0  0  0]
 [ 0  0  0  0  0  0 47  0  0  0  0  0]
 [ 0  0  0  0  0  0  0 31  0  0  0  0]
 [ 0  0  0  0  0  0  0  0  9  0  0  0]
 [ 0  0  0  0  0  0  0  0  0  4  0  0]
 [ 0  0  0  0  0  0  0  0  0  0  2  0]
 [ 0  0  0  0  0  0  0  0  0  0  0  1]]


# Saving Models

In [30]:
def save_model(model, path):
    torch.save(model.state_dict(), path)

save_model(model, "model1.pth")
save_model(model2, "model2.pth")

In [40]:
def predict(model_instance, seq, lstm_model: bool):
    model_instance.eval()
    seq_tensor = torch.tensor([list(dnaseq_to_intseq(seq))], dtype=torch.long)

    with torch.no_grad():
        if not lstm_model:
            print(seq_tensor.shape)
            logits = model_instance(seq_tensor)
            # Apply sigmoid and sum to get the predicted count
            sigmoid_outputs = torch.sigmoid(logits)
            predicted_count = torch.sum(sigmoid_outputs, dim=1).item()
            return predicted_count
        else:
            lengths = torch.tensor([len(seq)])
            logits = model_instance(seq_tensor, lengths)
            # Squeeze the output and get the item
            predicted_count = logits.squeeze(-1).item()

            return predicted_count, round(predicted_count)

'NACGT'

In [34]:
predict(model2, "NNACTCGCGANTC", lstm_model=True)

(2.064582347869873, 2)

In [39]:
len("".join([random.choice("NACGT") for _ in range(128)]))

128

In [41]:
predict(model, "".join([random.choice("NACGT") for _ in range(128)]), lstm_model=False)

torch.Size([1, 128])


IndexError: index out of range in self