In [1]:
%pip install torch
%pip install PathLib
%pip install nltk
%pip install wakepy

Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.


In [2]:
import numpy as np
import torch
from torch import nn, Tensor, FloatTensor
import torch.nn.functional as F

In [3]:
import os
import pickle
import re
from collections import Counter

import nltk
import numpy as np
import torch
import pathlib

In [4]:
DATADIR = 'data'
MOD_VAR_DIR = 'models_and_variables'
TRAINED_MODELS_DIR = 'trained_models'
HP_TEXT_DIR = os.path.join(DATADIR, 'harry_potter_text')
dirs=[MOD_VAR_DIR, TRAINED_MODELS_DIR]
for dir in dirs:
    pathlib.Path(dir).mkdir(parents=True, exist_ok=True)

In [5]:
def generate_vocabulary(individual_words, include_special_tokens=False):
    condition_keys = sorted(individual_words)
    result = dict(zip(condition_keys, range(len(condition_keys))))
    return result
   

In [6]:
v=generate_vocabulary(["test","ik"], 1)
print(v)


{'ik': 0, 'test': 1}


In [7]:
def get_hp_text(num_of_books=1):
    text_files = os.listdir(HP_TEXT_DIR)
    path_to_hp_text = os.path.join(MOD_VAR_DIR, "harry_potter_text.pkl")

    filepath = pathlib.PurePath(MOD_VAR_DIR, "harry_potter_text.pkl")

    counter=0
    if not os.path.exists(path_to_hp_text):
        all_text = ""
        for book in text_files:
             path_to_book = os.path.join(HP_TEXT_DIR, book)

             with open(path_to_book, "r", encoding="utf8") as f:
                text = f.readlines()

             text = [line for line in text if "Page" not in line]
             text = " ".join(text).replace("\n", "")
             text = [word for word in text.split(" ") if len(word) > 0]

             text = " ".join(text)
             text = re.sub("[^a-zA-Z0-9-_*.!,? \"\']", "", text)
             all_text+=text
             counter+=1
             if counter==num_of_books:
                 break

        with open(path_to_hp_text, 'wb') as handle:
            pickle.dump(all_text, handle, protocol=pickle.HIGHEST_PROTOCOL)
    else:
        with open(path_to_hp_text, 'rb') as handle:
            all_text = pickle.load(handle)

    return all_text

In [8]:
def get_tokens():
    path_to_tokens = os.path.join(MOD_VAR_DIR, "harry_potter_tokens.pkl")
    if not os.path.exists(path_to_tokens):
        tokens=nltk.word_tokenize(get_hp_text())
        with open(path_to_tokens, 'wb') as handle:
            pickle.dump(tokens, handle, protocol=pickle.HIGHEST_PROTOCOL)
    else:
        with open(path_to_tokens, 'rb') as handle:
            tokens = pickle.load(handle)
    return tokens

In [9]:
def get_2_vocabs():
    path_to_vocab= os.path.join(MOD_VAR_DIR, "harry_potter_vocab.pkl")
    path_to_inv_vocab = os.path.join(MOD_VAR_DIR, "harry_potter_inv_vocab.pkl")
    tokens= get_tokens()
    if not os.path.exists(path_to_vocab):
        vocab=generate_vocabulary(tokens, 1)
        inv_vocab = {v: k for k, v in vocab.items()}

        with open(path_to_vocab, 'wb') as handle:
            pickle.dump(vocab, handle, protocol=pickle.HIGHEST_PROTOCOL)
        with open(path_to_inv_vocab, 'wb') as handle:
            pickle.dump(inv_vocab, handle, protocol=pickle.HIGHEST_PROTOCOL)
    else:
        with open(path_to_vocab, 'rb') as handle:
             vocab= pickle.load(handle)
        with open(path_to_inv_vocab, 'rb') as handle:
             inv_vocab= pickle.load(handle)

    return vocab, inv_vocab

In [10]:
class DataBuilder:
    def __init__(self, seq_len, tokens, word2idx={}, idx2word={}):
        self.seq_len = seq_len
        self.tokens = tokens
        self.number_of_tokens = len(tokens)
        self.char2idx=word2idx
        self.idx2char=idx2word

    def grab_random_sample(self):
        start = np.random.randint(0, self.number_of_tokens - self.seq_len)
        end = start + self.seq_len
        text_slice = self.tokens[start:end]

        input_text = text_slice[:-1]
        label = text_slice[1:]
        input_text = torch.tensor([self.char2idx[c] for c in input_text], dtype=torch.int32)
        label = torch.tensor([self.char2idx[c] for c in label], dtype=torch.int32)

        return input_text, label

    def grab_random_batch(self, batch_size):
        input_texts, labels = [], []

        for _ in range(batch_size):
            input_text, label = self.grab_random_sample()

            input_texts.append(input_text)
            labels.append(label)

        input_texts = torch.stack(input_texts)
        labels = torch.stack(labels)

        return input_texts, labels

In [11]:
import numpy as np
import torch
from torch import nn, Tensor, FloatTensor
import torch.nn.functional as F

In [12]:
class WordLSTMForTextGeneration(nn.Module):
    def __init__(self, word2idx, idx2word,embedding_dim=128, hidden_size=256, n_layers=3):
        super().__init__()
        self.word2idx = word2idx
        self.idx2word = idx2word
        self.embedding_dim = embedding_dim
        self.num_characters = len(word2idx)
        self.hidden_size = hidden_size
        self.n_layers = n_layers

        self.embedding = nn.Embedding(self.num_characters, embedding_dim)
        self.lstm = nn.LSTM(input_size=embedding_dim,
                            hidden_size=hidden_size,
                            num_layers=n_layers,
                            batch_first=True)

        self.fc = nn.Linear(hidden_size, self.num_characters)

        self.softmax = nn.Softmax(dim=-1)

    def forward(self, x):
        x = self.embedding(x)
        output, (h, c) = self.lstm(x)
        logits = self.fc(output)
        return logits

    def write(self, text, max_words, greedy=False):

        idx = torch.tensor([self.word2idx[w] for w in text])
        hidden = torch.zeros(self.n_layers, self.hidden_size)
        cell = torch.zeros(self.n_layers, self.hidden_size)

        for i in range(max_words):

            if i == 0:
                selected_idx = idx
            else:
                selected_idx = idx[-1].unsqueeze(0)

            x = self.embedding(selected_idx)
            out, (hidden, cell) = self.lstm(x, (hidden, cell))

            out = self.fc(out)

            if len(out) > 1:
                out = out[-1, :].unsqueeze(0)

            probs = self.softmax(out)

            if greedy:
                idx_next = torch.argmax(probs).squeeze(0)
            else:
                idx_next = torch.multinomial(probs, num_samples=1).squeeze(0)

            idx = torch.cat([idx, idx_next])

        gen_string = [self.idx2word[int(w)] for w in idx]
        gen_string = " ".join(gen_string)

        return gen_string

In [None]:
from pathlib import Path
from torch import optim, nn

def train_(model, word2idx, idx2word, tokens, config):

    name=model.__class__.__name__
    new_dir =Path(os.path.join(TRAINED_MODELS_DIR, name))
    new_dir.mkdir(parents=True, exist_ok=True)

    iterations = config["iterations"]
    max_len = config["max_len"]
    evaluate_interval = config["evaluate_interval"]
    embedding_dim = config["embedding_dim"]
    hidden_size = config["hidden_size"]
    n_layers = config["n_layers"]
    lr = config["lr"]
    batch_size = config["batch_size"]


    optimizer = optim.AdamW(model.parameters(), lr=lr)
    loss_fn = nn.CrossEntropyLoss()

    dataset = DataBuilder(config["max_len"], tokens, word2idx, idx2word)
    model.train()
    for iteration in range(iterations):

        input_texts, labels = dataset.grab_random_batch(batch_size=batch_size)
        input_texts, labels = input_texts, labels
        print("input_texts")
        print(input_texts.shape)
        optimizer.zero_grad()
        output = model(input_texts)
        output = output.transpose(1,2)

        loss = loss_fn(output, labels.long())

        loss.backward()
        optimizer.step()

        if iteration % evaluate_interval == 0:
            model.eval()
            torch.no_grad()
            print("--------------------------------------")
            print(f"length {config["current_len"]}")
            print(f"Iteration {iteration}")
            print(f"Loss {loss.item()}")
            generated_text = model.write(["Spells"], max_words=50)
            print("--------------------------------------")
            torch.enable_grad()
            model.train()

    torch.save(model.stat_dict(), os.path.join(TRAINED_MODELS_DIR,name+"/model_state.pth"))


In [14]:
def get_config():
    config={}
    config["iterations"]=300
    config["max_len"]=20
    config["evaluate_interval"]=30
    config["embedding_dim"]=128
    config["hidden_size"]=256
    config["n_layers"]=4
    config["lr"]=0.003
    config["batch_size"]=64
    config["bidirectional"]=False
    return config

In [15]:
text = get_hp_text()
tokens = get_tokens()
word2idx, idx2word = get_2_vocabs()
config=get_config()

In [16]:
from wakepy import keep 
model = WordLSTMForTextGeneration(word2idx, idx2word, embedding_dim=config["embedding_dim"],
                                      hidden_size=config["hidden_size"], n_layers=config["n_layers"])
with keep.running():
    train_(model, word2idx, idx2word, tokens, config)


IndexError: index out of range in self