https://pytorch.org/tutorials/beginner/nn_tutorial.html

https://github.com/spro/practical-pytorch/tree/master/char-rnn-classification

https://pytorch.org/tutorials/beginner/text_sentiment_ngrams_tutorial.html

Batch RNN
https://pytorch.org/tutorials/beginner/text_sentiment_ngrams_tutorial.html


In [1]:
import re
import numpy as np

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torch.utils.tensorboard import SummaryWriter
torch.manual_seed(42)


#print(torch.cuda.is_available())
#if torch.cuda.is_available() or True:
#    device = torch.device("cuda")          # a CUDA device object
#    y = torch.ones_like(x, device=device)  # directly create a tensor on GPU
#    x = x.to(device)                       # or just use strings ``.to("cuda")``
#    z = x + y
#    print(z)
#    print(z.to("cpu", torch.double))   

<torch._C.Generator at 0x7fdc9447c030>

In [84]:
VOCAB = ['A', 'C', 'T', 'G', 'N']
BASE2IDX = {"A": 0, "C": 1, "T": 2, "G": 3, "N": 4}


def findall(base: str, seq: str):
    idxs = []
    for m in re.finditer(base, seq):
        idxs.append(m.start(0))
    return idxs


def random_seq(l: int, vocab: list = VOCAB) -> str:
    return "".join(np.random.choice(vocab, l))


def random_data(m: int, l: int, c: int) -> tuple:
    """
    Generates random dataset consisting of m sequences of length l with up to c cpg sites per sequence.
    """
    assert c < l

    seqs = [random_seq(l, ) for _ in range(m)]
    cpgs = [[(np.random.choice(findall("C", seqs[i])), np.random.random()) for _ in range(np.random.randint(0, c))] for i in range(m)]
    exprs = [float(np.random.binomial(n=1, p=x.count('N')/len(x), size=1)[0]) for x in seqs]

    return seqs, cpgs, exprs


def base2tensor(base: str, vocab: list = VOCAB) -> torch.tensor:
    tensor = torch.zeros(1, len(vocab))
    tensor[BASE2IDX[base]][0] = 1
    return tensor


def seq2tensor(seq: str, vocab: list = VOCAB) -> torch.tensor:
    tensor = torch.zeros(len(seq), 1, len(vocab))  # important choice for preserving shape compatibility with hidden layer.
    for idx, base in enumerate(seq):
        tensor[idx][0][BASE2IDX[base]] = 1  # that extra 1 dimension is because PyTorch assumes everything is in batches - we’re just using a batch size of 1 here.
    return tensor


def methylate_seq(seq_tensor: torch.tensor, loc: int, value: float, meth_idx: int, mask_idx: int) -> torch.tensor:
    assert (seq_tensor[loc][0][meth_idx] > 0) or (seq_tensor[loc][0][mask_idx] > 0)# assert it is a "C" in first place
    seq_tensor[loc][0][meth_idx] = value
    return seq_tensor


class PromoterDataset(Dataset):
    """"""

    def __init__(self, data: tuple):
        
        self.seqs, self.cpgs, self.exprs = data
        
        # get some utile attributes
        self.vocab = sorted(set("".join(self.seqs)))  # returns list
        self.vocab_size = len(self.vocab)
        self.base_to_idx = {}
        for i, base in enumerate(VOCAB):
            self.base_to_idx[base] = i
        
        self.meth_base = "C"  #set methylated base
        self.mask_base = "N"
        self.meth_idx = self.base_to_idx[self.meth_base]
        self.mask_idx = self.base_to_idx[self.mask_base]
        
        
    def __len__(self):
        return len(self.exprs)


    def __getitem__(self, idx):
        """Getter."""
        seq = seq2tensor(self.seqs[idx])
        cpgs = self.cpgs[idx]
        expr = self.exprs[idx]

        for cpg in cpgs:
            # insert cpg value in row for meth_base (cytosine)
            cpg_loc = cpg[0]
            cpg_value = cpg[1]
            seq = methylate_seq(seq, cpg_loc, cpg_value, self.meth_idx, self.mask_idx)

        return seq, expr
            

m = 100
l = 50
c = 5
VOCAB = ['A', 'C', 'T', 'G', 'N']
BASE2IDX = {"A": 0, "C": 1, "T": 2, "G": 3, "N": 4}
data = random_data(m, l, c)

# test_loader, train_loader
train_set = PromoterDataset(data)
train_loader = DataLoader(train_set, batch_size=4, shuffle=True, num_workers=0)

In [152]:
class RNNRegressor(nn.Module):

    def __init__(self, input_dim, hidden_dim, output_dim):
        super(RNNRegressor, self).__init__()
        self.input_dim = input_dim
        self.hidden_dim = hidden_dim
        self.output_dim = output_dim
        self.i2h = nn.Linear(input_dim + hidden_dim, hidden_dim)
        self.i2o = nn.Linear(input_dim + hidden_dim, output_dim)
        self.tanh = nn.Tanh()
    
    def forward(self, input, hidden):
        
        #input = input.view(-1, input.shape[1])
        # TODO: Modify code to accept batch tensors <loc, batch_nr, index>. Can add init hidden in here.
        
        combined = torch.cat((input, hidden), 1)
        hidden = self.i2h(combined)
        output = self.i2o(combined)
        output = self.tanh(output)

        return output, hidden
    
    def initHidden(self):
        return torch.zeros(1, self.hidden_dim)

HIDDEN_DIM = 10
rnn = RNNRegressor(train_set.vocab_size, HIDDEN_DIM, 1)



writer = SummaryWriter('runs/RNN_playground')
writer.add_graph(rnn, (seq2tensor("ACGTT")[1], torch.zeros(1, HIDDEN_DIM)))
writer.close()

In [153]:
criterion = nn.MSELoss()
optimizer = optim.SGD(rnn.parameters(), lr=0.005)


def train(output_tensor, seq_tensor):
    ''''''
    
    hidden = rnn.initHidden()
    
    # zero the grad buffers
    rnn.zero_grad()
    optimizer.zero_grad()
    
    # forward pass
    for i in range(seq_tensor.shape[0]):
        output, hidden = rnn(seq_tensor[i], hidden)
    
    # compute loss and backward pass
    loss = criterion(output, output_tensor)
    loss.backward()
    
    # update params
    optimizer.step()
    
    return output, loss.item()

train(torch.tensor([-0.0117]), seq2tensor("ACGTN"))
print(seq2tensor("ACGTN").shape, torch.tensor([-0.0118]).shape)

torch.Size([5, 1, 5]) torch.Size([1])


In [156]:
NUM_EPOCHS = 5

# TODO: not really minibatch for now
for epoch in range(NUM_EPOCHS):
    
    running_loss = 0.0
    for i, data in enumerate(train_loader, 0):
        
        # get inputs
        # TODO: Account for dtypes, otherwise get incompatible!
        seq_tensor, expr = data
        seq_tensor = seq_tensor[0]
        expr = expr[0].view(1)
        expr = expr.type(torch.FloatTensor)  
        
        # train on example
        output, loss = train(expr[0], seq_tensor)
        
        # print statistics
        running_loss += loss
        if i % 2000 == 1999:    # print every 2000 mini-batches
            print('[%d, %5d] loss: %.3f' %
                  (epoch + 1, i + 1, running_loss / 2000))
            running_loss = 0.0

print('Finished Training')

# save model
PATH = './models/test_model'
torch.save(rnn.state_dict(), PATH)

Finished Training


In [None]:
# Predict loop

# correct = 0
# total = 0
# with torch.no_grad():
#     for data in testloader:
#         images, labels = data
#         outputs = net(images)
#         _, predicted = torch.max(outputs.data, 1)
#         total += labels.size(0)
#         correct += (predicted == labels).sum().item()

# print('Accuracy of the network on the 10000 test images: %d %%' % (
#     100 * correct / total))