In [12]:
import pickle

import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

from torch.utils.data import Dataset, DataLoader
from torch.nn import Embedding, RNN
from sklearn.model_selection import train_test_split
from torch.nn.utils.rnn import pad_sequence, pack_padded_sequence, pad_packed_sequence

# Set any global values

In [13]:
torch.manual_seed(1)

<torch._C.Generator at 0x7f5d3259fa10>

In [14]:
device = (
    "cuda"
    if torch.cuda.is_available()
    else "mps"
    if torch.backends.mps.is_available()
    else "cpu"
)
print(f"Using {device} device")

Using cuda device


# Load the training data

In [15]:
with open('training_data.pickle','rb') as f:
    training_data = pickle.load(f)

In [16]:
class_to_ix = {"Found":0, "Unfound":1}
all_letters = "()ab~&|>"

vocab = ['<pad>'] + sorted(set([char for seq in all_letters for char in seq]))


n_classes = len(class_to_ix)
n_letters = len(all_letters)

In [17]:
X = []
Y = []
embed_dim = len(vocab)
embed = Embedding(len(vocab), embed_dim) # embedding_dim = len(vocab)
for element in training_data:
    input = [vocab.index(token) for token in element[0]]
    input_tensor = torch.tensor(input, dtype=torch.int)
    classification = class_to_ix[element[1]]
    X.append(input_tensor)
    output_tensor = torch.tensor(classification, dtype=torch.int)
    Y.append(output_tensor)

In [18]:
class TTPStyleDataset(Dataset):
    def __init__(self, X,Y):
        self.X = X
        self.Y = Y

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        if torch.is_tensor(idx):
            idx = idx.tolist()
        sample = [self.X[idx], self.Y[idx]]
        return sample

In [26]:
X_train, X_test, y_train, y_test = train_test_split(X, Y, test_size=0.2, random_state=42)


input_dataset = TTPStyleDataset(X_train,y_train)

test_dataset = TTPStyleDataset(X_test,y_test)

def my_collate_fn(data):
    (xx,yy) = zip(*data)
    x_lens = [len(x) for x in xx]
    y_lens = [1 for y in yy]

    xx_pad = pad_sequence(xx, batch_first=False, padding_value=0)
    yy_pad = torch.tensor(yy)

    return xx_pad, yy_pad, x_lens, y_lens
    

dataloader = DataLoader(input_dataset, shuffle=True, batch_size=16, num_workers=0, collate_fn=my_collate_fn)
test_dataloader = DataLoader(test_dataset, shuffle=True, batch_size=16, num_workers=0, collate_fn=my_collate_fn)

print(len(dataloader))
print(len(test_dataloader))

8820
2205


# Neural Network

In [20]:
class SoftmaxRNN(nn.RNN):
    def __init_subclass__(cls):
        return super().__init_subclass__()

In [21]:
#input_size - The number of expected features in the input x
rnn = SoftmaxRNN(input_size=embed_dim, hidden_size=2, num_layers=1, nonlinearity='relu', bias=True, batch_first=False, dropout=0.0, bidirectional=False)

In [25]:
num_epochs = 5
learning_rate = 0.005 # If you set this too high, it might explode. If too low, it might not learn
optimizer = torch.optim.SGD(rnn.parameters(), lr=learning_rate)

criterion = nn.CrossEntropyLoss() 
size = len(dataloader.dataset)
for epoch in range(num_epochs):
    for batch,(x_padded, y_padded, x_lens, y_lens) in enumerate(dataloader):
        
        rnn.zero_grad()
        x_embed = embed(x_padded)
        x_packed = pack_padded_sequence(x_embed, x_lens, batch_first=False, enforce_sorted=False)
        output_packed, hidden = rnn(x_packed)
        output, seq_len = pad_packed_sequence(output_packed, batch_first=False)
        hidden = torch.reshape(hidden, (16,2))
        y_padded = torch.reshape(y_padded, (16,))
        y_padded = y_padded.long()
        loss = criterion(hidden, y_padded)
        
        loss.backward()
        optimizer.step()
        

        if batch % 10000 == 0:
            loss, current = loss.item(), (batch + 1) * len(x_padded)
            print(f"loss: {loss:>7f}  [{current:>5d}/{size:>5d}]")


    # Set the model to evaluation mode - important for batch normalization and dropout layers
    # Unnecessary in this situation but added for best practices
    rnn.eval()
    size = len(test_dataloader.dataset)
    num_batches = len(dataloader)
    test_loss, correct = 0, 0

    # Evaluating the model with torch.no_grad() ensures that no gradients are computed during test mode
    # also serves to reduce unnecessary gradient computations and memory usage for tensors with requires_grad=True
    with torch.no_grad():
        for batch,(x_padded, y_padded, x_lens, y_lens) in enumerate(test_dataloader):
            rnn.zero_grad()
            x_embed = embed(x_padded)
            x_packed = pack_padded_sequence(x_embed, x_lens, batch_first=False, enforce_sorted=False)
            output_packed, hidden = rnn(x_packed)
            output, seq_len = pad_packed_sequence(output_packed, batch_first=False)
            hidden = torch.reshape(hidden, (16,2))
            y_padded = torch.reshape(y_padded, (16,))
            y_padded = y_padded.long()
            test_loss += criterion(hidden, y_padded)
            #correct += (pred.argmax(1) == y).type(torch.float).sum().item()
        
        loss, current = loss.item(), (batch + 1) * len(x_padded)
        print(f"loss: {loss:>7f}  [{current:>5d}/{size:>5d}]")

loss: 0.401496  [   29/141120]
loss: 0.683526  [63945/35280]
loss: 0.392026  [   28/35280]
loss: 0.771078  [66150/35280]
loss: 0.692226  [   29/35280]
loss: 0.620746  [59535/35280]
loss: 0.398090  [   29/35280]
loss: 0.576344  [61740/35280]
loss: 0.733978  [   29/35280]
loss: 0.756767  [59535/35280]
