In [112]:
import numpy as np

with open('rnn-challenge-data.npz', 'rb') as f:
    X = np.load(f)
    data_x = X['data_x']
    data_y = X['data_y']
    val_x = X['val_x']
    val_y = X['val_y']
    test_x = X['test_x']

# TRAINING DATA: INPUT (x) AND OUTPUT (y)
print(data_x.shape, data_x.dtype)
print(data_y.shape, data_y.dtype)

# VALIDATION DATA: INPUT (x) AND OUTPUT (y)
print(val_x.shape, val_x.dtype)
print(val_y.shape, val_y.dtype)

# TEST DATA: INPUT (x) ONLY
print(test_x.shape, test_x.dtype)

# PREDICT prediction FROM test_x

(400,) <U400
(400,) int64
(100,) <U1200
(100,) int64
(250,) <U2000


In [113]:
import torch

In [114]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
device
#device = 'cpu'

device(type='cuda')

## gene to vec

In [115]:
#A C G T
def encode_gene(gene: str) -> str:
    gene = gene.replace('A', '1')
    gene = gene.replace('C', '2')
    gene = gene.replace('G', '3')
    gene = gene.replace('T', '4')
    return [int(c) for c in gene]
        
def sequence_2vec(input_data):
    out_data = []
    for seq in input_data:
        out_data.append(encode_gene(seq))
    return np.array(out_data)
                             
data_x_vec = sequence_2vec(data_x)
val_x_vec = sequence_2vec(val_x)
test_x_vec = sequence_2vec(test_x)

In [116]:
print(data_x_vec.shape)
print(val_x_vec.shape)
print(test_x_vec.shape)

(400, 400)
(100, 1200)
(250, 2000)


In [117]:
ctx = {"device": device, "dtype": torch.float32}
def parse_bases(data):
    all_data = []
    for x in data:
        string = []
        for char in x:
            #import ipdb
            string.append({"A":0,"C":1,"G":2,"T":3}[char])
        #ipdb.set_trace()
        tensor = torch.tensor(string)
        one_hot = torch.nn.functional.one_hot(tensor)
        all_data.append(one_hot)
    return torch.stack(all_data).to(**ctx)

parse_bases(val_x).shape

torch.Size([100, 1200, 4])

In [118]:
from torch.nn.utils.rnn import pad_sequence
data_x_ts = parse_bases(data_x)
val_x_ts = parse_bases(val_x)
test_x_ts = parse_bases(test_x)
print(data_x_ts, data_x_ts.shape)


tensor([[[0., 1., 0., 0.],
         [0., 0., 0., 1.],
         [1., 0., 0., 0.],
         ...,
         [0., 0., 0., 1.],
         [1., 0., 0., 0.],
         [0., 0., 1., 0.]],

        [[0., 0., 0., 1.],
         [0., 0., 1., 0.],
         [1., 0., 0., 0.],
         ...,
         [0., 1., 0., 0.],
         [0., 0., 1., 0.],
         [0., 0., 0., 1.]],

        [[1., 0., 0., 0.],
         [0., 0., 1., 0.],
         [0., 0., 0., 1.],
         ...,
         [0., 1., 0., 0.],
         [0., 0., 0., 1.],
         [1., 0., 0., 0.]],

        ...,

        [[1., 0., 0., 0.],
         [0., 0., 1., 0.],
         [0., 0., 0., 1.],
         ...,
         [0., 0., 0., 1.],
         [0., 1., 0., 0.],
         [1., 0., 0., 0.]],

        [[0., 1., 0., 0.],
         [1., 0., 0., 0.],
         [0., 0., 1., 0.],
         ...,
         [0., 0., 0., 1.],
         [0., 0., 1., 0.],
         [1., 0., 0., 0.]],

        [[1., 0., 0., 0.],
         [0., 1., 0., 0.],
         [0., 0., 1., 0.],
         ...,
 

## Build model and train the model

In [119]:
import torch.nn as nn
class RNN(nn.Module):
    def __init__(
        self,
        batch_size,
        hidden_size=5, 
        n_layers=2,
        device='cpu',
    ):
        super(RNN, self).__init__()
        self.n_layers = n_layers
        self.hidden_size = hidden_size
        self.device = device
        self.batch_size = batch_size
        
        self.rnn = nn.GRU(4, hidden_size, n_layers, batch_first=True)   
        self.decoder = nn.Linear(hidden_size, 5)
        self.act = nn.LogSoftmax(dim=-1)
        
    def init_hidden(self):
        return torch.randn(self.n_layers, self.batch_size, self.hidden_size).to(self.device) 
   
    def forward(self, inputs):
        # Avoid breaking if the last batch has a different size
        batch_size = inputs.size(0)
        if batch_size != self.batch_size:
            self.batch_size = batch_size
            
        output, hidden = self.rnn(inputs, self.init_hidden())
        output = self.decoder(output[:, -1, :]).squeeze()
        output = self.act(output)
        return output

In [120]:
#device = 'cpu'
batch_size = 32
model = RNN(
    batch_size=batch_size,
    hidden_size=5,
    n_layers = 2,
    device=device,
)
model = model.to(device)

In [121]:
from torch.utils.data import TensorDataset, DataLoader
train_data = TensorDataset(parse_bases(data_x), torch.tensor(data_y))
train_loader = DataLoader(
    train_data,
    batch_size=batch_size,
    shuffle=True
)
val_data = TensorDataset(parse_bases(val_x), torch.tensor(val_y))
val_loader = DataLoader(
    val_data,
    batch_size=100,
    shuffle=False
)

In [122]:
device

device(type='cuda')

In [123]:
import torch.optim as optim
#nll = nn.CrossEntropyLoss()
#criterion = nn.BCEWithLogitsLoss()
nll = nn.NLLLoss()
optim = optim.Adam(model.parameters(), lr=0.01)

def accuracy(log_prob, category):
    return (log_prob.argmax(dim=-1) == category).sum() / len(category)

In [124]:
from tqdm.notebook import tqdm
import ipdb
n_epochs = 300
best_accuracy = 0.0

for epoch in tqdm(range(n_epochs)):
    
    # Training
    model.train(True)
    for seq, category in train_loader:
        seq, category = seq.to(device), category.to(device)
        optim.zero_grad()
        result = model(seq)
        #ipdb.set_trace()
        loss = nll(result, category)
        loss.backward()
        optim.step()
        
    # Validation
    model.train(False)
    for seq, category in val_loader:
        seq, category = seq.to(device), category.to(device)
        result = model.forward(seq)
        acc = accuracy(result, category).item()
        print(f"Epoch: {epoch}      "
              f"Accuracy: {100*acc:.0f}%    "
              f"Best Accuracy: {100*best_accuracy:.0f}%",
              end="\r"
             )
        if acc >= best_accuracy:
            best_accuracy = acc
            torch.save(model.state_dict(), "best_model.pt")
        

  0%|          | 0/300 [00:00<?, ?it/s]

Epoch: 73      Accuracy: 99%    Best Accuracy: 98%

KeyboardInterrupt: 

In [130]:
loaded_rnn = RNN(batch_size=batch_size)
loaded_rnn = loaded_rnn.to('cpu')
loaded_rnn.load_state_dict(torch.load("best_model.pt"))

<All keys matched successfully>

In [141]:
#parse_bases(test_x).to(device)
prediction = loaded_rnn(parse_bases(test_x).to('cpu')).argmax(dim=-1).detach().numpy()
prediction

array([2, 4, 1, 1, 0, 4, 2, 0, 4, 2, 4, 3, 3, 2, 0, 0, 3, 2, 3, 2, 0, 4,
       2, 4, 0, 3, 2, 1, 1, 4, 1, 1, 1, 1, 1, 0, 4, 3, 1, 3, 2, 2, 2, 4,
       3, 4, 1, 0, 0, 0, 1, 2, 4, 4, 3, 0, 0, 4, 4, 2, 1, 2, 3, 0, 3, 1,
       2, 2, 4, 3, 3, 4, 2, 3, 3, 2, 1, 4, 4, 0, 1, 0, 0, 1, 2, 0, 4, 0,
       4, 2, 3, 3, 2, 3, 2, 3, 4, 1, 2, 1, 2, 4, 2, 1, 0, 3, 3, 1, 3, 3,
       0, 1, 1, 0, 4, 4, 2, 0, 1, 4, 2, 0, 4, 2, 3, 2, 4, 0, 1, 0, 2, 4,
       0, 1, 2, 0, 4, 2, 2, 1, 3, 0, 1, 0, 0, 0, 2, 2, 2, 1, 0, 0, 0, 3,
       3, 4, 4, 4, 2, 1, 1, 0, 3, 1, 0, 1, 2, 2, 1, 3, 4, 4, 1, 3, 1, 3,
       4, 0, 1, 2, 4, 3, 0, 4, 2, 1, 3, 1, 4, 3, 2, 3, 1, 0, 0, 1, 4, 2,
       4, 2, 4, 3, 2, 1, 1, 4, 3, 1, 4, 0, 1, 1, 1, 1, 0, 3, 4, 3, 1, 3,
       4, 3, 1, 3, 1, 0, 2, 4, 2, 3, 0, 4, 4, 3, 0, 2, 3, 3, 0, 3, 0, 4,
       0, 4, 3, 0, 2, 2, 0, 0], dtype=int64)

In [None]:
# MAKE SURE THAT YOU HAVE THE RIGHT FORMAT
assert prediction.ndim == 1
assert prediction.shape[0] == 250

# AND SAVE EXACTLY AS SHOWN BELOW
np.save('prediction_m2m.npy', prediction.astype(int))

# MAKE SURE THAT THE FILE HAS THE CORRECT FORMAT
def validate_prediction_format():
    loaded = np.load('prediction_m2m.npy')
    assert loaded.shape == (250, )
    assert loaded.dtype == int
    assert (loaded <= 4).all()
    assert (loaded >= 0).all()
validate_prediction_format()