<a href="https://colab.research.google.com/github/rohanjuneja/DNA-project/blob/main/DCnet.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import torch
import torchvision
import torchvision.transforms as transforms
import torchvision.datasets as datasets
from torchvision.utils import save_image
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import numpy as np
import os
import glob
import PIL
from PIL import Image
from torch.utils import data as D
from torch.utils.data.sampler import SubsetRandomSampler
import random
import pandas as pd
import time
import torch.autograd as autograd
from torch.autograd import Variable

In [None]:
from matplotlib import pyplot as plt
%matplotlib inline

In [None]:
# Define the NN architecture

class ConsNet(nn.Module):
    def __init__(self, hidden_dim, fc1_dim, fc2_dim):
        super(ConsNet, self).__init__()
        self.hidden_dim = hidden_dim

        self.lstm = nn.LSTM(4, hidden_dim) 
        self.fc1 = nn.Linear(hidden_dim, fc1_dim)
        self.dropout1 = nn.Dropout(p=0.2)
        self.fc2 = nn.Linear(fc1_dim, fc2_dim)
        self.dropout2 = nn.Dropout(p=0.1)
        self.fc3 = nn.Linear(fc2_dim, 4)
        self.dropout3 = nn.Dropout(p=0.1)
        self.hidden_init_values = None
        self.hidden = self.init_hidden()
        nn.init.xavier_uniform_(self.fc1.weight)
        nn.init.xavier_uniform_(self.fc2.weight)
        nn.init.xavier_uniform_(self.fc3.weight)
        
    def init_hidden(self):
        if self.hidden_init_values == None:
            self.hidden_init_values = (autograd.Variable(torch.randn(1, 1, self.hidden_dim)),
                                       autograd.Variable(torch.randn(1, 1, self.hidden_dim)))
        return self.hidden_init_values

    def forward(self, input_seq):
        lstm_out, self.hidden = self.lstm(
            input_seq.view(len(input_seq), 1, -1), self.hidden)
        tmp1 = F.relu(self.dropout1(self.fc1(lstm_out.view(len(input_seq), -1))))
        tmp2 = F.relu(self.dropout2(self.fc2(tmp1)))
        _out = self.dropout3(self.fc3(tmp2))
        x = _out
        return x

In [None]:
import random
torch.set_default_tensor_type('torch.cuda.FloatTensor')

bmap = {"A":0, "C":1, "G":2, "T":3}
def one_hot(b):
    t = [[0,0,0,0]]
    i = bmap[b]
    t[0][i] = 1
    return t

print("one-hot encoding for DNA bases")
print("A:", one_hot("A"))
print("C:", one_hot("C"))
print("G:", one_hot("G"))
print("T:", one_hot("T"))

one-hot encoding for DNA bases
A: [[1, 0, 0, 0]]
C: [[0, 1, 0, 0]]
G: [[0, 0, 1, 0]]
T: [[0, 0, 0, 1]]


In [None]:
def sim_error(seq, pi=0.05, pd=0.05, ps=0.01):
    """
    Given an input sequence `seq`, generating another
    sequence with errors. 
    pi: insertion error rate
    pd: deletion error rate
    ps: substitution error rate
    """
    out_seq = []
    for c in seq:
        while 1:
            r = random.uniform(0,1)
            if r < pi:
                out_seq.append(random.choice(["A","C","G","T"]))
            else:
                break
        r -= pi
        if r < pd:
            continue
        r -= pd
        if r < ps:
            out_seq.append(random.choice(["A","C","G","T"]))
            continue
        out_seq.append(c)
    return "".join(out_seq)

### Generate training data and train the model 

In [None]:
num_clusters = 10000
train_consensus_strands = []
train_target = []
for i in range(num_clusters):
    strand = [random.choice(["A","C","G","T"]) for _ in range(120)]
    train_consensus_strands.append(strand)
    strand_t = [one_hot(c) for c in strand]
#     strand_t = Variable(torch.FloatTensor([one_hot(c) for c in strand]))
    train_target.append(strand_t)
print("".join(strand))

print(len(train_consensus_strands))
print(len(train_target))

train_target = torch.Tensor(train_target).cuda()

CTGACACGAGGACCCGTTTCGACCTGCAGCGTGCGACTTATTTTTTCAAACGACGCCAAAACCTCGAACCTAAAGTGCGACTTTTCGGATTGCCTCGATATCATGCGGGTGGAAACACAT
10000
10000


In [None]:
print(train_target.shape)

torch.Size([10000, 120, 1, 4])


In [None]:
train_data = []

for i in range(num_clusters):
    train_clusters = []
    seq = train_consensus_strands[i]
    for j in range(10):
        noisy_seq = sim_error(seq, pi=random.uniform(0.04, 0.06), pd=random.uniform(0.04, 0.06), 
        ps=random.uniform(0.01, 0.03))
        
        noisy_seq_t = [one_hot(c) for c in strand]
#         noisy_seq_t = Variable(torch.FloatTensor([one_hot(c) for c in noisy_seq])).cuda()
        train_clusters.append(noisy_seq_t)
        
    train_data.append(train_clusters)
    
print(len(train_data))
train_data = torch.Tensor(train_data).cuda()

10000


In [None]:
print(train_data.shape)

torch.Size([10000, 10, 120, 1, 4])


In [None]:
# initialize the model
model = ConsNet(32, 12, 12)
model.cuda()
print(model)
model.zero_grad()
model.hidden = model.init_hidden()

# initial the paramerters in the DCNet
for name, param in model.named_parameters():
    if 'bias' in name:
        nn.init.constant_(param, 0.0)
    elif 'weight' in name:
        nn.init.xavier_normal_(param)

ConsNet(
  (lstm): LSTM(4, 32)
  (fc1): Linear(in_features=32, out_features=12, bias=True)
  (dropout1): Dropout(p=0.2, inplace=False)
  (fc2): Linear(in_features=12, out_features=12, bias=True)
  (dropout2): Dropout(p=0.1, inplace=False)
  (fc3): Linear(in_features=12, out_features=4, bias=True)
  (dropout3): Dropout(p=0.1, inplace=False)
)


In [None]:
loss = nn.MSELoss()
initial_lr = 0.1
lr=initial_lr
optimizer = optim.SGD(model.parameters(), lr=initial_lr)

In [None]:
# Training the model
num_epochs = 20

range_ = (1, 121)

for epoch in range(num_epochs):
    model.train()
    for i in range(int(len(train_data))):
        train_loss = 0
        s, e = range_
        optimizer.zero_grad()
        for seq in train_data[i]:
            model.hidden = model.init_hidden()
            model.zero_grad()
            # Noisy clusters (training data)
            seq = seq[s-1:e]
            seq_ = seq.view(-1,4)
            out = model(seq_)
            # Original string (training target)
            seq_target = train_target[i][s-1:e]
            seq_target = seq_target.view(-1, 4)
            # Loss computation
            train_loss += loss(out, seq_target)
            
        # Backward propagation operation
        train_loss.backward()
        optimizer.step()
        
    print("Epoch:", epoch, "Training loss:", train_loss.cpu().item()/len(train_data), "learning rate:", lr)
        
    # Learning rate decay
    if epoch % 2 ==0:
        lr *= 0.95
        optimizer = optim.SGD(model.parameters(), lr=lr)
    
if (num_epochs > 0):
    torch.save(model.state_dict(), "consnet.pt")

Epoch: 0 Training loss: 3.6179283261299135e-05 learning rate: 0.1
Epoch: 1 Training loss: 3.968231379985809e-05 learning rate: 0.095
Epoch: 2 Training loss: 3.3966299891471864e-05 learning rate: 0.095
Epoch: 3 Training loss: 4.36478853225708e-05 learning rate: 0.09025
Epoch: 4 Training loss: 4.020006060600281e-05 learning rate: 0.09025
Epoch: 5 Training loss: 3.661943376064301e-05 learning rate: 0.0857375
Epoch: 6 Training loss: 3.6713653802871704e-05 learning rate: 0.0857375
Epoch: 7 Training loss: 3.752442896366119e-05 learning rate: 0.08145062499999998
Epoch: 8 Training loss: 4.580929279327393e-05 learning rate: 0.08145062499999998
Epoch: 9 Training loss: 3.692469894886017e-05 learning rate: 0.07737809374999999
Epoch: 10 Training loss: 3.812678754329681e-05 learning rate: 0.07737809374999999
Epoch: 11 Training loss: 3.5812777280807496e-05 learning rate: 0.07350918906249998
Epoch: 12 Training loss: 3.5603234171867374e-05 learning rate: 0.07350918906249998
Epoch: 13 Training loss: 4.0

In [None]:
model.load_state_dict(torch.load("consnet.pt"))

<All keys matched successfully>

### Generate the test data 

In [None]:
# DNA clusters to test model
num_test_clusters = 1000
test_consensus_strands = []
test_target = []

for i in range(num_test_clusters):
    strand = [random.choice(["A","C","G","T"]) for _ in range(120)]
    test_consensus_strands.append(strand)
    strand_t = [one_hot(c) for c in strand]
#     strand_t = Variable(torch.FloatTensor([one_hot(c) for c in seq])).cuda()
    test_target.append(strand_t)
    
print("".join(strand))

print(len(test_consensus_strands))
print(len(test_target))

test_target = torch.Tensor(test_target)

ACATATTCTTCTGAGGCTTCCCTGACTTCTCGAGCCTATTTGGTTGCCGCTCGGCTCGGAAGGAGACCATCGGGAGAGGCACTATAAGGTAAGCGGGACCGTCGGAGTTTACAGTCTCTA
100
100


In [None]:
test_data = []

for i in range(num_test_clusters):
    test_clusters = []
    seq = test_consensus_strands[i]
    for j in range(10):
        noisy_seq = sim_error(seq, pi=random.uniform(0.04, 0.06), pd=random.uniform(0.04, 0.06), 
        ps=random.uniform(0.01, 0.03))
        
        noisy_seq_t = [one_hot(c) for c in strand]
#         noisy_seq_t = Variable(torch.FloatTensor([one_hot(c) for c in noisy_seq])).cuda()
        test_clusters.append(noisy_seq_t)
        
    test_data.append(test_clusters)
    
print(len(test_data))
test_data = torch.Tensor(test_data).cuda()

100


In [None]:
print(test_data.shape)

torch.Size([100, 10, 120, 1, 4])


In [None]:
print(test_target.shape)

torch.Size([100, 120, 1, 4])


In [None]:
#{"A":0, "C":1, "G":2, "T":3}

def one_hot_to_vec(one_hot):
    one_hot = np.array(one_hot)
    vec = np.argmax(one_hot, axis = 0)
    return vec

vec = torch.zeros(4).cpu()
vec[0] = 1
print(vec.shape)
print(vec.dtype)

print("Vectors for one hot of DNA bases")
print("A:", one_hot_to_vec(vec))
print("C:", one_hot_to_vec([0, 1, 0, 0]))
print("G:", one_hot_to_vec([0, 0, 1, 0]))
print("T:", one_hot_to_vec([0, 0, 0, 1]))

torch.Size([4])
torch.float32
Vectors for one hot of DNA bases
A: 0
C: 1
G: 2
T: 3


In [None]:
range_ = (1, 121)

model.hidden = model.init_hidden()
accuracy = 0

model.eval()
# Run again
for i in range(len(test_data)):
    test_loss = 0
    s, e = range_
    for seq in test_data[i]:
        start_time = time.time()
        model.hidden = model.init_hidden()
        model.zero_grad()
        # Noisy clusters (test data)
        seq = seq[s-1:e]
        seq_ = seq.view(-1,4)
        out = model(seq_)
        check_time = time.time()
        # Original string (test target)
        seq_target = test_target[i][s-1:e]
        seq_target = seq_target.view(-1, 4)
        
        for j in range(120):
            if (one_hot_to_vec(out[j].detach().cpu().numpy()) == one_hot_to_vec(seq_target[j].detach().cpu().numpy())):
                accuracy = accuracy + 1
        end_time = time.time()
        
print("Time for forward prop per DNA sequence = " + str(check_time - start_time) + " sec")
print("Time for accuracy calculation per DNA sequence = " + str(end_time - check_time) + " sec")
                
print("Accuracy = " + str(accuracy/(test_data.shape[0] * test_data.shape[1] * 120)))

Time for forward prop per DNA sequence = 0.0019271373748779297 sec
Time for accuracy calculation per DNA sequence = 0.00529789924621582 sec
Accuracy = 0.25125
