# Dependencies

In [1]:
import numpy as np
import matplotlib.pyplot as plt
import random

# Create Dataset

In [2]:
def generate_sentence(min_len=0, max_len=20, pos=True):
    if pos:
        n = np.random.randint(np.ceil(min_len / 3), np.ceil((max_len) / 3))
        sentence = n*"a" + n*"b" + n*"c"
        return sentence, len(sentence)
    else:
        n_0 = np.random.randint(0, max_len)
        n_1 = np.random.randint(0, max_len - n_0 + 1)
        n_2 = np.random.randint(min_len - n_0 - n_1, max_len - n_0 - n_1 + 1)
        sentence = n_0 * "a" + n_1 * "b" + n_2 * "c"
        return sentence, len(sentence)

def create_data(size=10000, balance=0.1, min_len=0, max_len=20, train=True):
    data = []
    sentence_lengths = []

    for i in range(int(size*balance)):
        sentence, sentence_length = generate_sentence(min_len=min_len, max_len=max_len, pos=True)
        while sentence == "aabbcc" and train:
            sentence, sentence_length = generate_sentence(min_len=min_len, max_len=max_len, pos=True)
        if not train:
            sentence, sentence_length = "aabbcc", 6
        data.append((sentence, 1))
        sentence_lengths.append(sentence_length)
    for i in range(int((size - (size*balance)))):
        sentence, sentence_length = generate_sentence(min_len=min_len, max_len=max_len, pos=False)
        data.append((sentence, 0))
        sentence_lengths.append(sentence_length)
    
    random.shuffle(data)
    average_length = sum(sentence_lengths) / len(sentence_lengths)
    return data, average_length

train_data, avg_sent_length_train = create_data(size=1000, balance=0.5, min_len=0, max_len=20, train=True)
val_data, avg_sent_length_val = create_data(size=1000, balance=0.5, min_len=0, max_len=20, train=False)
test_data, avg_sent_length_test = create_data(size=1000, balance=0.5, min_len=21, max_len=40, train=True)

# print(f"Train Data Sample:\n{train_data}")
# print(f"Average Sentence Length:\n{avg_sent_length_train}")
# print(f"Val Data Sample:\n{val_data}")
# print(f"Average Sentence Length:\n{avg_sent_length_val}")
# print(f"Test Data Sample:\n{test_data}")
# print(f"Average Sentence Length:\n{avg_sent_length_test}")


# Encoding

In [3]:
# Dependencies
import torch
import torch.nn as nn
import torch.optim as optim

In [4]:
# Encoding data
char_to_index = {'a':[1,0,0], 'b':[0,1,0], 'c':[0,0,1]}
#index_to_char = {v: k for k, v in char_to_index.items()}

def creat_tensors(data, max_l):
    X = []
    y = []
    ml = 0

    for sent, label in data:
        X.append([char_to_index[char] for char in sent])
        y.append(label)

    # Padding to be able to convert to tensor
    X = [sent + [[0,0,0]] * (max_l - len(sent)) for sent in X]

    X = torch.tensor(X, dtype=torch.float32)
    y = torch.tensor(y, dtype=torch.float32)

    return X, y

X_train, y_train = creat_tensors(train_data, max_l=40)
X_test, y_test = creat_tensors(test_data, max_l=40)
X_val, y_val = creat_tensors(val_data, max_l=40)

# Torch datasets and loaders

In [5]:
from torch.utils.data import TensorDataset, DataLoader

BATCH_SIZE = 64
train_set = TensorDataset(X_train, y_train)
val_set = TensorDataset(X_val, y_val)
testset = TensorDataset(X_test, y_test)

train_set, val_same_set = torch.utils.data.random_split(train_set, [900, 100])


train_loader = DataLoader(train_set, BATCH_SIZE, shuffle=True)
val_loader = DataLoader(val_set, BATCH_SIZE, shuffle=True)
val_same_loader = DataLoader(val_same_set, BATCH_SIZE, shuffle=True)
test_loader = DataLoader(testset, BATCH_SIZE, shuffle=True)

# Setup device

In [6]:
# Set device
if torch.cuda.is_available():
    device = 'cuda:0'
elif torch.backends.mps.is_available():
    device = 'mps:0'
else:
    device = 'cpu'
print('GPU State:', device)

GPU State: mps:0


# Create RNN class with forward layer

In [7]:
class ElmanRNN(nn.Module):
    def __init__(self, INPUT_SIZE, HIDDEN_SIZE, NUM_LAYERS, OUTPUT_SIZE) -> None:
        super(ElmanRNN, self).__init__()
        self.elmanRNN = nn.RNN(INPUT_SIZE, HIDDEN_SIZE, NUM_LAYERS, batch_first=True)
        self.classification = nn.Sequential(
            nn.Linear(HIDDEN_SIZE, OUTPUT_SIZE),
            nn.Sigmoid()
        )

    def forward(self, x):
        out, _ = self.elmanRNN(x)
        logits = self.classification(out[:, -1, :])
        return torch.squeeze(logits)

# Training loop

In [8]:
INPUT_SIZE = len(char_to_index)
HIDDEN_SIZE = 5
NUM_LAYERS = 1
OUTPUT_SIZE = 1
NUM_EPOCHS = 200
lr = 0.003

model = ElmanRNN(INPUT_SIZE, HIDDEN_SIZE, NUM_LAYERS, OUTPUT_SIZE)
criterion = nn.BCELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=lr)

for EPOCH in range(NUM_EPOCHS):
    model.train()
    epoch_loss = 0

    for batch_idx, (inputs, targets) in enumerate(train_loader):
        outputs = model(inputs)
        loss = criterion(outputs, targets)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        epoch_loss += loss.item()
        
    if (EPOCH + 1) % 10 == 0:
        print(f'Epoch [{EPOCH + 1}/{NUM_EPOCHS}], Loss: {epoch_loss / len(train_loader):.4f}')

Epoch [10/200], Loss: 0.6938
Epoch [20/200], Loss: 0.6942
Epoch [30/200], Loss: 0.6930
Epoch [40/200], Loss: 0.6932
Epoch [50/200], Loss: 0.6934
Epoch [60/200], Loss: 0.6932
Epoch [70/200], Loss: 0.6937
Epoch [80/200], Loss: 0.6931
Epoch [90/200], Loss: 0.6942
Epoch [100/200], Loss: 0.6938
Epoch [110/200], Loss: 0.6932
Epoch [120/200], Loss: 0.6939
Epoch [130/200], Loss: 0.6929
Epoch [140/200], Loss: 0.6948
Epoch [150/200], Loss: 0.6932
Epoch [160/200], Loss: 0.6928
Epoch [170/200], Loss: 0.6930
Epoch [180/200], Loss: 0.6933
Epoch [190/200], Loss: 0.6932
Epoch [200/200], Loss: 0.6935


# Evaluation loop

In [9]:
from torcheval.metrics import BinaryF1Score

def test_loop(data_loader, model, loss_fn):
    model.eval()
    SIZE = len(data_loader.dataset)
    NUM_BATCHES = len(data_loader)
    test_loss, correct = 0, 0
    metric = BinaryF1Score()

    with torch.no_grad():
        for inputs, targets in data_loader:
            outputs = model(inputs)
            metric.update(outputs, targets)
            test_loss += loss_fn(outputs, targets).item()
            correct += (torch.round(outputs) == targets).type(torch.float).sum().item()
    
    test_loss /= NUM_BATCHES
    correct /= SIZE

    print(f"Test Error: \n Accuracy: {(100*correct):>0.1f}%, F1-score: {metric.compute()}, Avg loss: {test_loss:>8f} \n")

loss_fn = nn.BCELoss()
test_loop(val_loader, model, loss_fn)

Test Error: 
 Accuracy: 50.0%, F1-score: 0.6666666865348816, Avg loss: 0.693145 



In [10]:
test_loop(test_loader, model, loss_fn)

Test Error: 
 Accuracy: 61.1%, F1-score: 0.7199423909187317, Avg loss: 0.692008 



In [11]:
test_loop(val_same_loader, model, loss_fn)

Test Error: 
 Accuracy: 49.0%, F1-score: 0.6577181220054626, Avg loss: 0.693138 

