# Part 1: Build CpG Detector

Here we have a simple problem, given a DNA sequence (of N, A, C, G, T), count the number of CpGs in the sequence (consecutive CGs).

We have defined a few helper functions / parameters for performing this task.

We need you to build a LSTM model and train it to complish this task in PyTorch.

A good solution will be a model that can be trained, with high confidence in correctness.

In [5]:
from typing import Sequence
from functools import partial
import random
import torch
import numpy as np
import random
import torch
from torch.utils.data import Dataset, DataLoader
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F

In [6]:
# DO NOT CHANGE HERE
def set_seed(seed=13):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed_all(seed)

set_seed(13)

# Use this for getting x label
def rand_sequence(n_seqs: int, seq_len: int=128) -> Sequence[int]:
    for i in range(n_seqs):
        yield [random.randint(0, 4) for _ in range(seq_len)]

# Use this for getting y label
def count_cpgs(seq: str) -> int:
    cgs = 0
    for i in range(0, len(seq) - 1):
        dimer = seq[i:i+2]
        # note that seq is a string, not a list
        if dimer == "CG":
            cgs += 1
    return cgs

# Alphabet helpers   
alphabet = 'NACGT'
dna2int = { a: i for a, i in zip(alphabet, range(5))}
int2dna = { i: a for a, i in zip(alphabet, range(5))}

intseq_to_dnaseq = partial(map, int2dna.get)
dnaseq_to_intseq = partial(map, dna2int.get)

In [8]:
# we prepared two datasets for training and evaluation
# training data scale we set to 2048
# we test on 512

def prepare_data(num_samples=100):
    # prepared the training and test data
    # you need to call rand_sequence and count_cpgs here to create the dataset
    # step 1
    X_dna_seqs_train = list(rand_sequence(num_samples))
    """
    hint:
        1. You can check X_dna_seqs_train by print, the data is ids which is your training X 
        2. You first convert ids back to DNA sequence
        3. Then you run count_cpgs which will yield CGs counts - this will be the labels (Y)
    """
    #step2
    temp = [list(intseq_to_dnaseq(seq)) for seq in X_dna_seqs_train] # use intseq_to_dnaseq here to convert ids back to DNA seqs
    #step3
    y_dna_seqs = [count_cpgs("".join(seq)) for seq in temp] # use count_cpgs here to generate labels with temp generated in step2
    
    return X_dna_seqs_train, y_dna_seqs
    
train_x, train_y = prepare_data(2048)
test_x, test_y = prepare_data(512)

based on train_x and train_y, there are 12 classes lets treat it as a classification problem.
this can also be trated as regression problem given the count of CG is the ask. but we will be skipping that for now.

In [9]:
# some config
LSTM_HIDDEN = 64 # input is 128 so we change the hidden layer to 64
LSTM_LAYER = 1 # we start with one layer only. this is one of hyper-parameter
batch_size = 4 # 4 ideal size for my laptop
learning_rate = 1e-3 # standard params, also to be treated as hyper-parameter
num_classes = len(np.unique(train_y)) # total number of classes
classes = np.unique(train_y)

In [10]:

class DnaDataset(Dataset):
    """
    Dataset to fetch x, y based on index
    """
    def __init__(self, x, y) -> None:
        super().__init__()
        self.x = x
        self.y = y
    
    def __len__(self):
        return len(self.x)
    
    def __getitem__(self, index):
        return torch.Tensor(self.x[index]), torch.tensor(self.y[index])



In [30]:
import json

with open("128_fixed_dim_config.json", "w") as f:
    f.write(json.dumps(
        {
            "alphabet": alphabet,
            "dna2int" : dna2int,
            "int2dna":int2dna,
            "classes":classes.tolist(),
            "num_classes": int(num_classes)
        }

    ))

In [11]:
# create data loader

# load training data
train_data = DnaDataset(train_x, train_y)
train_data_loader = DataLoader(train_data, batch_size=batch_size, shuffle=True)

# load test data
test_data = DnaDataset(test_x, test_y)
test_data_loader = DataLoader(test_data, batch_size=batch_size, shuffle=True)

In [12]:
# Model
class CpGPredictor(torch.nn.Module):
    ''' Simple model that uses a LSTM to count the number of CpGs in a sequence '''
    def __init__(self):
        super(CpGPredictor, self).__init__()
        # TODO complete model, you are free to add whatever layers you need here
        # We do need a lstm and a classifier layer here but you are free to implement them in your way
        self.lstm = nn.LSTM(input_size=128, hidden_size=LSTM_HIDDEN)
        self.classifier = nn.Linear(LSTM_HIDDEN, num_classes)
        self.softmax = nn.Softmax(dim=1) # to get confidence score

    def forward(self, x):
        # TODO complete forward function
        x, _ = self.lstm(x)
        logits = self.classifier(x)
        logits = self.softmax(logits)
        return logits

In [13]:
# init model / loss function / optimizer etc.
model = CpGPredictor()
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr=0.1)

In [14]:
epochs = 400
for epoch in range(epochs):
  running_loss = 0.0
  for i, data in enumerate(train_data_loader):
    inputs, labels = data
    # labels = torch.argmax(labels, dim=1)
    # forward propagation
    outputs = model(inputs)
    loss = loss_fn(outputs, labels)
    # set optimizer to zero grad
    # to remove previous epoch gradients
    optimizer.zero_grad()
    # backward propagation
    loss.backward()
    # optimize
    optimizer.step()
    running_loss += loss.item()
  # display statistics
  if not ((epoch + 1) % (epochs // 10)):
    print(f'Epochs:{epoch + 1:5d} | ' \
          f'Batches per epoch: {i + 1:3d} | ' \
          f'Loss: {running_loss / (i + 1):.10f}')

Epochs:   40 | Batches per epoch: 512 | Loss: 2.2646126824
Epochs:   80 | Batches per epoch: 512 | Loss: 2.0689358509
Epochs:  120 | Batches per epoch: 512 | Loss: 1.9184411524
Epochs:  160 | Batches per epoch: 512 | Loss: 1.9133717888
Epochs:  200 | Batches per epoch: 512 | Loss: 1.9110925191
Epochs:  240 | Batches per epoch: 512 | Loss: 1.8466680693
Epochs:  280 | Batches per epoch: 512 | Loss: 1.8391595234
Epochs:  320 | Batches per epoch: 512 | Loss: 1.7844871006
Epochs:  360 | Batches per epoch: 512 | Loss: 1.7738771003
Epochs:  400 | Batches per epoch: 512 | Loss: 1.7723775983


In [15]:
model.eval()

res_gs = []
res_pred = []

with torch.no_grad():
  loss = 0
  for i, (inputs, labels) in enumerate(test_data_loader):
    # calculate output by running through the network
    predictions = model(inputs)
    res_pred.extend(predictions)
    # labels = torch.argmax(labels, dim=1)
    res_gs.extend(labels)
    loss += loss_fn(predictions, labels)
  print(f'Loss: {loss / (i + 1)}')

Loss: 2.5340142250061035


## Live test

In [16]:
# sample = list(rand_sequence(128))[0]
sample = test_x[10]
gt = "".join([int2dna.get(i) for i in sample]).count("CG")
pred_logit = model(torch.Tensor(sample).unsqueeze(0))
pred = torch.argmax(pred_logit)
# conf = float(F.softmax(pred_logit, dim=1)[0][int(pred)]*100)
print(gt, classes[pred])

7 5


In [17]:
torch.save(model.state_dict() ,"128dim_model.pt")

In [18]:
model_128_dim = CpGPredictor()
model_128_dim.load_state_dict(torch.load("128dim_model.pt"))

<All keys matched successfully>

In [19]:
classes

array([ 0,  1,  2,  3,  4,  5,  6,  7,  8,  9, 10, 11, 12])

In [34]:
print(test_x[0])

[3, 3, 3, 0, 2, 3, 2, 2, 2, 0, 2, 4, 2, 4, 4, 1, 3, 3, 3, 3, 3, 1, 1, 0, 0, 2, 1, 4, 4, 4, 0, 3, 1, 2, 4, 3, 0, 3, 4, 0, 2, 3, 4, 0, 4, 3, 2, 1, 1, 1, 4, 1, 2, 4, 3, 0, 1, 0, 0, 4, 3, 2, 2, 3, 4, 3, 4, 1, 1, 4, 4, 1, 4, 0, 0, 2, 3, 0, 4, 1, 2, 4, 3, 4, 4, 0, 0, 3, 2, 0, 2, 2, 1, 2, 0, 3, 2, 2, 2, 1, 3, 0, 1, 3, 0, 4, 3, 1, 3, 0, 0, 0, 3, 0, 4, 0, 1, 3, 2, 2, 1, 1, 2, 2, 1, 0, 3, 3]


In [37]:
len("".join([int2dna.get(i) for i in test_x[0]]))

128