In [1]:
from random import shuffle
import time
import torch
from sklearn.model_selection import train_test_split

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
BATCH_SIZE = 10
STRING_SIZE = 20
NUM_EPOCHS = 30
LEARNING_RATE = 0.01
FILE_NAME = "data/cook_book.txt"
DEVICE = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
CAESAR_OFFSET = 3

Alphabet class

In [3]:
class Alphabet(object):

    def __init__(self):
        self.letters = ""
        self.idx = []
        self.char_dict = {}

    def __len__(self):
        return len(self.letters)

    def __contains__(self, item):
        return item in self.letters

    def __getitem__(self, item):
        if isinstance(item, int):
            return self.idx[item % len(self.letters)]
        elif isinstance(item, str):
            return self.char_dict[item]

    def __str__(self):
        letters = "".join(self.letters)
        # print(letters)
        return f"Alphabet is:\n {letters}\n {len(letters)} chars"

    def load_from_file(self, file_path):
        with open(file_path) as file:
            while True:
                text = file.read(STRING_SIZE)
                if not text:
                    break
                for ch in text:
                    if ch not in self.letters:
                        self.letters += ch
            self.idx = sorted(self.letters)
            self.char_dict = {c: i for i, c in enumerate(self.idx)}
            # print(self.char_dict)


ALPHABET = Alphabet()
ALPHABET.load_from_file(FILE_NAME)
print(ALPHABET)

Alphabet is:
 п»їThe ProjctGunbgBkfCsdip,yIa
LwUSmlv.Y-:ARD1320[#689]EO/()*FHJNK_VMx4WвЃ·„‚€ВІѓіq№z;5Ђ™7Qњќ!"%X'$
 99 chars


Sentence dataset class

In [4]:
class SentenceDataset(torch.utils.data.Dataset):

    def __init__(self, raw_data, alphabet):
        super().__init__()
        self._len = len(raw_data)
        self.y = torch.tensor(
            [[alphabet[ch] for ch in line] for line in raw_data]
        ).to(DEVICE)
        self.x = torch.tensor(
            [[i + CAESAR_OFFSET for i in line] for line in self.y]
        ).to(DEVICE)
    
    def __len__(self):
        return self._len

    def __getitem__(self, idx):
        return self.x[idx], self.y[idx]

In [5]:
def get_text_array(file_path, step):
    text_array = []
    with open(file_path) as file:
        while True:
            text = file.read(STRING_SIZE)
            if not text:
                break
            text_array.append(text)
    del text_array[-1]
    return text_array

Prepare data & dataloaders

In [6]:
raw_data = get_text_array(FILE_NAME, STRING_SIZE)

train_data, test_data = train_test_split(raw_data, test_size=0.2, shuffle=True)
test_data, val_data = train_test_split(test_data, test_size=0.5, shuffle=True)

Y_val = torch.tensor([[ALPHABET[ch] for ch in line] for line in val_data])
X_val = torch.tensor([[i + CAESAR_OFFSET for i in line] for line in Y_val])

train_dl = torch.utils.data.DataLoader(
    SentenceDataset(
        train_data, ALPHABET
    ),
    batch_size=BATCH_SIZE,
    shuffle=True,
    drop_last=True
)
test_dl = torch.utils.data.DataLoader(
    SentenceDataset(
        test_data, ALPHABET
    ),
    batch_size=BATCH_SIZE,
    shuffle=True,
    drop_last=True
)

RNN architecture

In [7]:
class RNNModel(torch.nn.Module):
    
    def __init__(self):
        super().__init__()
        self.embedding = torch.nn.Embedding(len(ALPHABET) + CAESAR_OFFSET, 32)
        self.rnn = torch.nn.RNN(32, 128, batch_first=True)
        self.linear = torch.nn.Linear(128, len(ALPHABET) + CAESAR_OFFSET)

    def forward(self, sentence, state=None):
        embedding = self.embedding(sentence)
        o, h = self.rnn(embedding)
        return self.linear(o)

In [8]:
model = RNNModel().to(DEVICE)
loss = torch.nn.CrossEntropyLoss().to(DEVICE)
optimizer = torch.optim.SGD(model.parameters(), lr=LEARNING_RATE)

Train and test

In [9]:
for epoch in range(NUM_EPOCHS):
    train_loss, train_acc, iter_num = .0, .0, .0
    start_epoch_time = time.time()
    model.train()
    for x_in, y_in in train_dl:
        # x_in = x_in
        y_in = y_in.view(1, -1).squeeze()
        # print(y_in.shape)
        optimizer.zero_grad()
        # print(x_in.shape)
        out = model.forward(x_in)
        
        out = out.view(-1, len(ALPHABET) + CAESAR_OFFSET)
        # print(out.shape)
        l = loss(out, y_in)
        train_loss += l.item()
        batch_acc = (out.argmax(dim=1) == y_in)
        train_acc += batch_acc.sum().item() / batch_acc.shape[0]
        l.backward()
        optimizer.step()
        iter_num += 1
    print(
        f"Epoch: {epoch}, loss: {train_loss:.4f}, acc: "
        f"{train_acc / iter_num:.4f}",
        end=" | "
    )
    test_loss, test_acc, iter_num = .0, .0, .0
    model.eval()
    for x_in, y_in in test_dl:
        x_in = x_in
        y_in = y_in.view(1, -1).squeeze()
        out = model.forward(x_in).view(-1, len(ALPHABET) + CAESAR_OFFSET)
        l = loss(out, y_in)
        test_loss += l.item()
        batch_acc = (out.argmax(dim=1) == y_in)
        test_acc += batch_acc.sum().item() / batch_acc.shape[0]
        iter_num += 1
    print(
        f"test loss: {test_loss:.4f}, test acc: {test_acc / iter_num:.4f} | "
        f"{time.time() - start_epoch_time:.2f} sec."
    )

Epoch: 0, loss: 1136.5024, acc: 0.6970 | test loss: 69.7348, test acc: 0.8471 | 2.95 sec.
Epoch: 1, loss: 388.4121, acc: 0.8839 | test loss: 36.7283, test acc: 0.9007 | 2.27 sec.
Epoch: 2, loss: 238.1180, acc: 0.9284 | test loss: 25.9778, test acc: 0.9423 | 2.24 sec.
Epoch: 3, loss: 177.3970, acc: 0.9471 | test loss: 20.5806, test acc: 0.9468 | 2.33 sec.
Epoch: 4, loss: 145.2379, acc: 0.9515 | test loss: 17.4306, test acc: 0.9504 | 2.32 sec.
Epoch: 5, loss: 125.1562, acc: 0.9532 | test loss: 15.2395, test acc: 0.9529 | 2.37 sec.
Epoch: 6, loss: 110.8735, acc: 0.9581 | test loss: 13.5989, test acc: 0.9585 | 2.29 sec.
Epoch: 7, loss: 99.7608, acc: 0.9653 | test loss: 12.3847, test acc: 0.9654 | 2.27 sec.
Epoch: 8, loss: 90.8163, acc: 0.9705 | test loss: 11.3588, test acc: 0.9708 | 2.26 sec.
Epoch: 9, loss: 83.2151, acc: 0.9729 | test loss: 10.4238, test acc: 0.9723 | 2.34 sec.
Epoch: 10, loss: 76.7401, acc: 0.9742 | test loss: 9.6609, test acc: 0.9733 | 3.54 sec.
Epoch: 11, loss: 71.0801

Examlpe from val

In [10]:
idx = 44
val_results = model(X_val.to(DEVICE)).argmax(dim=2)
val_acc = (val_results == Y_val.to(DEVICE)).flatten()
val_acc = (val_acc.sum() / val_acc.shape[0]).item()
out_sentence = "".join([ALPHABET[i.item()] for i in val_results[idx]])
true_sentence = "".join([ALPHABET[i.item()] for i in Y_val[idx]])
print(f"Validation accuracy is : {val_acc:.4f}")
print("-" * 20)
print(f"Validation sentence is: \"{out_sentence}\"")
print("-" * 20)
print(f"True sentence is:       \"{true_sentence}\"")

Validation accuracy is : 0.9912
--------------------
Validation sentence is: "over the top.
Cover "
--------------------
True sentence is:       "over the top.
Cover "
