# Part 1: Build CpG Detector

Here we have a simple problem, given a DNA sequence (of N, A, C, G, T), count the number of CpGs in the sequence (consecutive CGs).

We have defined a few helper functions / parameters for performing this task.

We need you to build a LSTM model and train it to complish this task in PyTorch.

A good solution will be a model that can be trained, with high confidence in correctness.

In [61]:
from typing import Sequence
from functools import partial
import random
import torch
import numpy as np
import random
import torch.nn.functional as F
import torch.optim as optim

In [62]:
# DO NOT CHANGE HERE
def set_seed(seed=13):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed_all(seed)

set_seed(13)

# Use this for getting x label
def rand_sequence(n_seqs: int, seq_len: int=128) -> Sequence[int]:
    for i in range(n_seqs):
        yield [random.randint(0, 4) for _ in range(seq_len)]

# Use this for getting y label
def count_cpgs(seq: str) -> int:
    cgs = 0
    for i in range(0, len(seq) - 1):
        dimer = seq[i:i+2]
        # note that seq is a string, not a list
        if dimer == "CG":
            cgs += 1
    return cgs

# Alphabet helpers
alphabet = 'NACGT'
dna2int = { a: i for a, i in zip(alphabet, range(5))}
int2dna = { i: a for a, i in zip(alphabet, range(5))}

intseq_to_dnaseq = partial(map, int2dna.get)
dnaseq_to_intseq = partial(map, dna2int.get)

In [None]:
# we prepared two datasets for training and evaluation
# training data scale we set to 2048
# we test on 512

def prepare_data(num_samples=100):
    # prepared the training and test data
    # you need to call rand_sequence and count_cpgs here to create the dataset
    # step 1
    X_dna_seqs_train = list(rand_sequence(num_samples))
    """
    hint:
        1. You can check X_dna_seqs_train by print, the data is ids which is your training X
        2. You first convert ids back to DNA sequence
        3. Then you run count_cpgs which will yield CGs counts - this will be the labels (Y)
    """
    #step2
    temp =  [list(intseq_to_dnaseq(i)) for i in X_dna_seqs_train] # use intseq_to_dnaseq here to convert ids back to DNA seqs
    #step3
    y_dna_seqs =  [count_cpgs("".join(i)) for i in temp] # use count_cpgs here to generate labels with temp generated in step2

    return X_dna_seqs_train, y_dna_seqs

train_x, train_y = prepare_data(2048)
test_x, test_y = prepare_data(512)

In [None]:
# some config
LSTM_HIDDEN = 128
LSTM_LAYER = 2
batch_size = 32
learning_rate = 0.001
epoch_num = 100

In [None]:
# create data loader
from torch.utils.data import DataLoader, TensorDataset

def convert_to_tensor(data, dtype=torch.long):
    return torch.tensor(data, dtype=dtype)

train_x_tensor = convert_to_tensor(train_x)
train_y_tensor = convert_to_tensor(train_y)
test_x_tensor = convert_to_tensor(test_x)
test_y_tensor = convert_to_tensor(test_y)

# Prepare DataLoader
train_dataset = TensorDataset(train_x_tensor, train_y_tensor)
train_data_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

In [None]:
# Model
class CpGPredictor(torch.nn.Module):
    ''' Simple model that uses a LSTM to count the number of CpGs in a sequence '''
    def __init__(self, input_dim, hidden_dim, layer_dim, output_dim, dropout_prob=0.5):
        super(CpGPredictor, self).__init__()
        # TODO complete model, you are free to add whatever layers you need here
        # We do need a lstm and a classifier layer here but you are free to implement them in your way
        self.embedding = torch.nn.Embedding(5, input_dim)
        self.lstm = torch.nn.LSTM(input_dim, hidden_dim, layer_dim, batch_first=True)
        self.dropout = torch.nn.Dropout(dropout_prob)
        self.classifier = torch.nn.Linear(hidden_dim, output_dim)

    def forward(self, x):
        # TODO complete forward function
        x = self.embedding(x)
        lstm_out, _ = self.lstm(x)
        lstm_out = self.dropout(lstm_out)
        lstm_out = lstm_out[:, -1, :]
        logits = self.classifier(lstm_out)
        return logits

In [None]:
# init model / loss function / optimizer etc.

LSTM_LAYER = 1
learning_rate = 0.001
EPOCH_NUM = 100
input_dim = 128
LSTM_HIDDEN = 128
layer_dim = 1
output_dim = 1

model = CpGPredictor(input_dim, LSTM_HIDDEN, layer_dim, output_dim)
# loss_fn = torch.nn.MSELoss()
loss_fn = torch.nn.SmoothL1Loss()
# optimizer = torch.optim.SGD(model.parameters(), lr=learning_rate)
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

In [None]:
# training (you can modify the code below)
# t_loss = .0
# model.train()
# model.zero_grad()
# for _ in range(epoch_num):
#     for batch in train_data_loader:
#         #TODO complete training loop
#         t_loss += loss.item()
#         loss.backward()

#     print(t_loss)
#     t_loss = .0

def train(model, train_data_loader, loss_fn, EPOCH_NUM=100):
  for epoch in range(EPOCH_NUM):
    model.train()
    for inputs, labels in train_data_loader:
      optimizer.zero_grad()
      outputs = model(inputs)
      loss = loss_fn(outputs.squeeze(), labels.float())
      loss.backward()
      optimizer.step()

    print(f'Epoch {epoch + 1}/{epoch_num}, Loss: {loss.item()}')

train(model, train_data_loader, loss_fn, EPOCH_NUM=100)

In [70]:
# eval (you can modify the code below)
# model.eval()

res_gs = []
res_pred = []

def evaluate_model(model, test_loader, loss_fn):
  model.eval()
  total_loss = 0.0
  with torch.no_grad():
    for inputs, labels in test_loader:
      outputs = model(inputs)
      loss = loss_fn(outputs.squeeze(), labels.float())
      total_loss += loss.item()




  avg_loss = total_loss/len(test_loader)
  print(f'Average Loss on Test Data: {avg_loss}')


test_dataset = TensorDataset(test_x_tensor, test_y_tensor)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)
evaluate_model(model, test_loader, loss_fn)

Average Loss on Test Data: 1.20827254652977


In [None]:
# Testing wth examples

examples = ["NCACANNTNCGGAGGCGNA", "NCACANNTNCGGAGGCGCG"]

def get_actual_count(examples):
  return [count_cpgs("".join(i)) for i in examples]

def encode_to_integer(examples):
  int_sequence = [list(dnaseq_to_intseq(i)) for i in examples]
  return int_sequence

def transform_examples(data):
  int_sequence = encode_to_integer(data)
  test_sequnce = convert_to_tensor(int_sequence)
  return test_sequnce

def predict(model, unseen_data):
  with torch.no_grad():
    model.eval()
    inputs = transform_examples(unseen_data)
    outputs = model(inputs)
    predictions = outputs.squeeze().cpu().numpy()
  return predictions

predictions = predict(model, examples)
actual_vals = get_actual_count(examples)

In [None]:
for i in range(len(examples)):
  print(f"Sequence: {examples[i]}, Predicted CpGs: {round(predictions[i])}, Actual CpGs: {actual_vals[i]}")

Sequence: NCACANNTNCGGAGGCGNA, Predicted CpGs: 6, Actual CpGs: 2
Sequence: NCACANNTNCGGAGGCGCG, Predicted CpGs: 6, Actual CpGs: 3


# Part 2: what if the DNA sequences are not the same length

In [None]:
# hint we will need following imports
from torch.nn.utils.rnn import pad_sequence, pack_padded_sequence, pad_packed_sequence

In [64]:
# DO NOT CHANGE HERE
random.seed(13)

# Use this for getting x label
def rand_sequence_var_len(n_seqs: int, lb: int=16, ub: int=128) -> Sequence[int]:
    for i in range(n_seqs):
        seq_len = random.randint(lb, ub)
        yield [random.randint(1, 5) for _ in range(seq_len)]


# Use this for getting y label
def count_cpgs(seq: str) -> int:
    cgs = 0
    for i in range(0, len(seq) - 1):
        dimer = seq[i:i+2]
        # note that seq is a string, not a list
        if dimer == "CG":
            cgs += 1
    return cgs


# Alphabet helpers
alphabet = 'NACGT'
dna2int = {a: i for a, i in zip(alphabet, range(1, 6))}
int2dna = {i: a for a, i in zip(alphabet, range(1, 6))}
dna2int.update({"pad": 0})
int2dna.update({0: "<pad>"})

intseq_to_dnaseq = partial(map, int2dna.get)
dnaseq_to_intseq = partial(map, dna2int.get)

In [None]:
# TODO complete the task based on the change
def prepare_data(num_samples=100, min_len=16, max_len=128):
    # TODO prepared the training and test data
    # you need to call rand_sequence and count_cpgs here to create the dataset
    #step 1
    X_dna_seqs_train = list(rand_sequence_var_len(num_samples, min_len, max_len))
    #step 2
    temp = [list(intseq_to_dnaseq(i)) for i in X_dna_seqs_train]
    #step3
    y_dna_seqs = [count_cpgs("".join(i)) for i in temp]

    return X_dna_seqs_train, y_dna_seqs


min_len, max_len = 64, 128
train_x, train_y = prepare_data(2048, min_len, max_len)
test_x, test_y = prepare_data(512, min_len, max_len)

In [None]:
class MyDataset(torch.utils.data.Dataset):
    def __init__(self, lists, labels) -> None:
        self.lists = lists
        self.labels = labels

    def __getitem__(self, index):
        return torch.LongTensor(self.lists[index]), self.labels[index]

    def __len__(self):
        return len(self.lists)


# this will be a collate_fn for dataloader to pad sequence
class PadSequence:
  def __call__(self, batch):
    batch.sort(key=lambda x: len(x[0]), reverse=True)
    sequences, labels = zip(*batch)

    padded_sequences = pad_sequence(sequences, batch_first=True, padding_value=dna2int['pad'])
    label_tensor = torch.Tensor(labels).long()

    return padded_sequences, label_tensor

In [72]:
batch_size = 64

train_dataset = MyDataset(train_x, train_y)
train_loader = DataLoader(train_dataset, batch_size=batch_size, collate_fn=PadSequence(), shuffle=True)

test_dataset = MyDataset(test_x, test_y)
test_loader = DataLoader(test_dataset, batch_size=batch_size, collate_fn=PadSequence(), shuffle=False)

In [None]:
# Model
class CpGPredictor(torch.nn.Module):
    ''' Simple model that uses a LSTM to count the number of CpGs in a sequence '''
    def __init__(self, input_dim, hidden_dim, layer_dim, output_dim, dropout_prob=0.5):
        super(CpGPredictor, self).__init__()
        # TODO complete model, you are free to add whatever layers you need here
        # We do need a lstm and a classifier layer here but you are free to implement them in your way
        self.embedding = torch.nn.Embedding(6, input_dim)
        self.lstm = torch.nn.LSTM(input_dim, hidden_dim, layer_dim, batch_first=True)
        self.dropout = torch.nn.Dropout(dropout_prob)
        self.classifier = torch.nn.Linear(hidden_dim, output_dim)

    def forward(self, x):
        # TODO complete forward function
        x = self.embedding(x)
        lstm_out, _ = self.lstm(x)
        lstm_out = self.dropout(lstm_out)
        lstm_out = lstm_out[:, -1, :]
        logits = self.classifier(lstm_out)
        return logits

In [60]:
LSTM_LAYER = 1
learning_rate = 0.001
EPOCH_NUM = 100
input_dim = 256 #128
LSTM_HIDDEN = 128
layer_dim = 1
output_dim = 1

model = CpGPredictor(input_dim, LSTM_HIDDEN, layer_dim, output_dim)
# loss_fn = torch.nn.MSELoss()
loss_fn = torch.nn.SmoothL1Loss()
# optimizer = torch.optim.SGD(model.parameters(), lr=learning_rate)
# optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
optimizer = torch.optim.Adagrad(model.parameters(), lr=learning_rate)


train(model, train_loader, loss_fn, EPOCH_NUM=100)

Epoch 1/100, Loss: 1.1707208156585693
Epoch 2/100, Loss: 1.3886330127716064
Epoch 4/100, Loss: 1.158286452293396
Epoch 5/100, Loss: 1.058457612991333
Epoch 6/100, Loss: 1.2002017498016357
Epoch 7/100, Loss: 1.0855227708816528
Epoch 8/100, Loss: 0.9004484415054321
Epoch 9/100, Loss: 0.9857519268989563
Epoch 10/100, Loss: 1.0864111185073853
Epoch 11/100, Loss: 0.8217935562133789
Epoch 12/100, Loss: 0.9576296210289001
Epoch 13/100, Loss: 0.9363933205604553
Epoch 14/100, Loss: 1.0122158527374268
Epoch 15/100, Loss: 0.9667893052101135
Epoch 16/100, Loss: 0.9405697584152222
Epoch 17/100, Loss: 1.0823549032211304
Epoch 18/100, Loss: 1.1331709623336792
Epoch 19/100, Loss: 1.0601670742034912
Epoch 20/100, Loss: 0.8752838373184204
Epoch 21/100, Loss: 0.7618842124938965
Epoch 22/100, Loss: 0.9221265316009521
Epoch 23/100, Loss: 0.7274192571640015
Epoch 24/100, Loss: 0.9484416246414185
Epoch 25/100, Loss: 0.9782217741012573
Epoch 26/100, Loss: 0.8381202816963196
Epoch 27/100, Loss: 0.7835568189620

In [73]:
evaluate_model(model, test_loader, loss_fn)

Average Loss on Test Data: 0.2769583389163017


In [74]:
# Testing wth examples

examples = ["NCACANNTNCGGAGGCGNA", "NCACANNTNCGGAGGCGCG", "CG"]

def get_actual_count(examples):
  return [count_cpgs("".join(i)) for i in examples]

def encode_to_integer(examples):
  int_sequence = [list(dnaseq_to_intseq(i)) for i in examples]
  return int_sequence

def transform_examples(data):
  int_sequence = encode_to_integer(data)
  padded_sequence = pad_sequence([torch.tensor(seq) for seq in int_sequence], batch_first=True, padding_value=dna2int['pad'])
  test_sequnce = convert_to_tensor(padded_sequence)
  return test_sequnce

def predict(model, unseen_data):
  with torch.no_grad():
    model.eval()
    inputs = transform_examples(unseen_data)
    outputs = model(inputs)
    predictions = outputs.squeeze().cpu().numpy()
  return predictions

predictions = predict(model, examples)
actual_vals = get_actual_count(examples)


  return torch.tensor(data, dtype=dtype)


In [75]:
for i in range(len(examples)):
  print(f"Sequence: {examples[i]}, Predicted CpGs: {round(predictions[i])}, Actual CpGs: {actual_vals[i]}")

Sequence: NCACANNTNCGGAGGCGNA, Predicted CpGs: 5, Actual CpGs: 2
Sequence: NCACANNTNCGGAGGCGCG, Predicted CpGs: 4, Actual CpGs: 3
Sequence: CG, Predicted CpGs: 4, Actual CpGs: 1
