In [None]:
import pandas as pd 
import torch
import nltk
import einops
import numpy as np
import keras.utils as ku 
device = "cuda" if torch.cuda.is_available() else "cpu"

from datasets import load_dataset
import subprocess

try:
    nltk.data.find('wordnet.zip')
except:
    nltk.download('wordnet', download_dir='/kaggle/working/')
    command = "unzip /kaggle/working/corpora/wordnet.zip -d /kaggle/working/corpora"
    subprocess.run(command.split())
    nltk.data.path.append('/kaggle/working/')

from nltk.corpus import wordnet

In [None]:
tokenizer = nltk.WordPunctTokenizer()
lemmatizer = nltk.WordNetLemmatizer()

In [None]:
df = pd.read_csv('/kaggle/input/shakespeare-nlp-analysis-data-engineering/shakespeare_plays.csv')

In [None]:
all_text = []
for sentence in df['text']:
    all_text.append(sentence.lower())

In [None]:
def tokenize_pipeline(sentence):
    tokens= tokenizer.tokenize(sentence)
    return [lemmatizer.lemmatize(token) for token in tokens if token.isalpha()]

In [None]:
all_tokenized_text = ([tokenize_pipeline(sentence) for sentence in all_text])
all_tokenized_words = set(word for sentence in all_tokenized_text for word in sentence)

In [None]:
len(all_tokenized_words)

In [None]:
words_to_ids = {word: idx + 4 for idx, word in enumerate(all_tokenized_words)}

In [None]:
len(words_to_ids) 

In [None]:
max_sequence_len = max([len(x) for x in all_tokenized_text])
max_sequence_len

In [None]:
from datasets import Dataset
dataset = {'Text': all_tokenized_text}
dataset = Dataset.from_dict(dataset)
dataset = dataset.train_test_split(test_size=0.05)
train_dataset, validation_dataset = dataset['train'],dataset['test']


In [None]:
class ShakespeareDataset(torch.utils.data.Dataset):
    def __init__(self, words_to_ids, dataset, max_len=19):
        self.words_to_ids = words_to_ids


        def convert_words_to_ids(example):
            return {'ids': [self.words_to_ids[token] for token in example['Text']]}
            
        self.dataset = dataset.map(convert_words_to_ids)
        self.max_len=max_len
        
    def __len__(self):
        return len(self.dataset)
    
    def __getitem__(self, index):
        examplede = self.dataset[index]
        examplede = examplede['ids']
        sentence = [1] + examplede+ [2]
        if len(sentence)< self.max_len:
            sentence +=[0 for _ in range(self.max_len-len(sentence))] 

        return torch.tensor(sentence)

In [None]:
train_dataset = ShakespeareDataset(words_to_ids,train_dataset)
validation_dataset = ShakespeareDataset(words_to_ids,validation_dataset)


In [None]:
validation_dataset[15].shape

In [None]:
def collate_fn(item):
    x = torch.stack([i for i in item])
    return x

In [None]:
train_dataloader = torch.utils.data.DataLoader(train_dataset,batch_size = 1024, collate_fn=collate_fn)
valid_dataloader = torch.utils.data.DataLoader(validation_dataset,batch_size = 1024, collate_fn=collate_fn)

In [None]:
next(iter(train_dataloader))

In [None]:
class LSTMModel(torch.nn.Module):
    def __init__(self, dictionary_size, hidden_dim, layer_dim,max_seq_len):
        super(LSTMModel, self).__init__()
        self.hidden_dim = hidden_dim
        self.layer_dim = layer_dim
        self.dropout = torch.nn.Dropout(0.3)
        self.embedding = torch.nn.Embedding(dictionary_size,hidden_dim)
        self.lstm = torch.nn.LSTM(input_size=hidden_dim, hidden_size=hidden_dim,num_layers=layer_dim, batch_first=True)
        self.lin = torch.nn.Linear(hidden_dim, hidden_dim)
        self.fc = torch.nn.Linear(hidden_dim, dictionary_size)

    def forward(self, x):
        emb = self.embedding(x)
        out, (hn, cn) = self.lstm(emb)
        out = self.lin(self.dropout(out))
        predicted = self.fc(self.dropout(out)) 
        return predicted

In [None]:
dictionary_size = len(words_to_ids)+4
hidden_dim = 200
layer_dim = 3
max_seq_len=19
model = LSTMModel(dictionary_size, hidden_dim, layer_dim,max_seq_len)
from functools import reduce

def get_num_of_params(
    model : torch.nn.Module
) -> int:
    return sum([reduce(lambda x, y: x * y, cur.shape) for cur in model.parameters()])

get_num_of_params(model)

In [None]:
model = model.to(device)
optimizer = torch.optim.Adam(model.parameters(),lr=1e-3)
criterion = torch.nn.CrossEntropyLoss()

In [None]:
torch.cuda.empty_cache()

In [None]:
from tqdm.auto import tqdm
train_loss =[]
valid_loss =[]
epochs = 30
for epoch in tqdm(range(epochs)):
    train_loss_current = []
    model.train()
    for idx, X in tqdm(enumerate(train_dataloader)):
        preds = model(X[:,:-1].to(device))
        loss = criterion(
        preds.view(-1, dictionary_size), X[:,1:].to(device).contiguous().view(-1))

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()  
        train_loss_current.append(loss.item())

    train_loss.append(np.mean(train_loss_current))
    
    valid_loss_current = []
    model.eval()
    with torch.inference_mode():
        for idx, X in enumerate(valid_dataloader):
            preds = model(X[:,:-1].to(device))
            loss = criterion(
                preds.view(-1, dictionary_size), 
                X[:,1:].to(device).contiguous().view(-1)
            )  
            valid_loss_current.append(loss.item())
    valid_loss.append(np.mean(valid_loss_current))
    
    print(f'Эпоха - {epoch+1}, train_loss - {train_loss[-1]}, valid_loss - {valid_loss[-1]}')

In [None]:
f= '/kaggle/working/Shakespear.model'
torch.save(model.state_dict(), f)

In [None]:
model.load_state_dict(torch.load('/kaggle/input/shakeword/pytorch/default/1/Shakespear.model', weights_only=True,map_location=torch.device('cpu')))
model.eval()

In [None]:
import seaborn as sns
import matplotlib.pyplot as plt

import warnings
warnings.simplefilter(action='ignore', category=FutureWarning)


tr = {i+1:v for i,v in enumerate(train_loss)}
val = {i+1:v for i,v in enumerate(valid_loss)}

sns.lineplot(data=tr, label=str('Train Loss'))
sns.lineplot(data=val, label=str('Validation Loss'))
plt.legend()
plt.show()

In [None]:
model.eval()

In [None]:
def dec(tokens:list,dictionary:dict):
    decoded = []
    for i in tokens: 
        key = next((key for key, value in dictionary.items() if value == i), None)
        decoded.append(key)
    return decoded 

In [None]:
MAX_SEQ_LEN = 18
temp=1
@torch.inference_mode
def generate(tokens) -> torch.Tensor:
    while len(tokens) < MAX_SEQ_LEN and tokens[-1]!=2:
        tokens.append(torch.softmax(model(torch.tensor(tokens).unsqueeze(0).to(device))/temp,-1)[0][-1].argmax().item())
        
    return tokens

In [None]:
def predi(sentence):
    final = [1]
    sentence = tokenize_pipeline(sentence.lower())
    try:
        for i in range(len(sentence)):
            final += [words_to_ids[sentence[i]]]
        example = generate(final)
        return ' '.join(dec(example,words_to_ids)[1:-1])
    except:
        print('Одного из слов в предложении Шекспир не знал...')
    

In [None]:
print(predi('Young and'))


In [None]:
MAX_SEQ_LEN = 18
@torch.inference_mode
def generation(tokens) -> torch.Tensor:
    while len(tokens) < MAX_SEQ_LEN and tokens[-1]!=2:
        preds = model(torch.tensor(tokens).unsqueeze(0).to(device))
        preds_beam = beam_search(preds.to(device),6)[0][2]
        last = preds_beam[-1]
        tokens.append(last)
    return tokens

In [None]:
def beam_search(prediction, k=6):
    batch_size, seq_length, vocab_size = prediction.shape
    log_prob, indices = prediction[:, 0, :].topk(k, sorted=True)
    indices = indices.unsqueeze(-1).to(device)
    for n1 in range(1, seq_length):
        log_prob_temp = log_prob.unsqueeze(-1) + prediction[:, n1, :].unsqueeze(1).repeat(1, k, 1)
        log_prob, index_temp = log_prob_temp.view(batch_size, -1).topk(k, sorted=True)
        idx_begin = index_temp // vocab_size  
        idx_concat = index_temp % vocab_size 
        new_indices = torch.zeros((batch_size, k, n1+1), dtype=torch.int64).to(device)
        for n2 in range(batch_size):
            new_indices[n2, :, :-1] = indices[n2][idx_begin[n2]]
            new_indices[n2, :, -1] = idx_concat[n2]
        indices = new_indices
    return indices

In [None]:
def predictwithbeamsearch(sentence):
    final = [1]
    sentence = tokenize_pipeline(sentence.lower())
    try:
        for i in range(len(sentence)):
            final += [words_to_ids[sentence[i]]]
        example = generation(final)
        return ' '.join(dec(example,words_to_ids)[1:-1])
    except:
        print('Такого слова Шекспир не знал...')


In [None]:
print(predictwithbeamsearch('forever'))
print(predictwithbeamsearch('love'))
print(predictwithbeamsearch('to be or not'))
print(predictwithbeamsearch('hi'))
print(predictwithbeamsearch('leave the'))
print(predictwithbeamsearch('poor child'))