In [None]:
import torch
from torch import nn
import torch.nn.functional as F
import numpy as np
import matplotlib.pyplot as plt

In [None]:
with open('../data/nlp_files/shakespeare.txt', encoding='utf-8') as f:
    text = f.read()

In [None]:
type(text)

In [None]:
len(text)

In [None]:
text[:1000]

In [None]:
all_characters = set(text)
len(all_characters)

In [None]:
# have to create a pair of encoder and decoder similar to two look up tables:

# num --> letter
decoder = dict(enumerate(all_characters))

# letter --> num
encoder = {char:ind for ind, char in decoder.items()}

In [None]:
encoder

In [None]:
decoder

In [None]:
encoded_text = np.array([encoder[char] for char in text])
encoded_text[:200]

In [None]:
def one_hot_encoder(encoded_text, num_unique_chars):
    # in this problem input 'encoded_text' is batch of all encoded text
    # in this problem input 'num_unique_chars' is len(set(text))
    
    one_hot = np.zeros((encoded_text.size, num_unique_chars))
    one_hot = one_hot.astype(np.float32)
    one_hot[np.arange(one_hot.shape[0]), encoded_text.flatten()] = 1.0
    one_hot = one_hot.reshape((*encoded_text.shape, num_unique_chars))
    
    return one_hot

In [None]:
# sample to test one_hot_encoder
arr = np.array([1,2,0])
one_hot_encoder(arr, 3)

In [None]:
def generate_batches(encoded_text, sample_per_batch=10, seq_len=50):
    # X: encoded text of lenght seq_len
    # Y: X shifted by one
    
    # how many chars per batch?
    char_per_batch = sample_per_batch * seq_len
    
    # how many batches can we make, given the len of encoded text?
    number_batches_avail = int(len(encoded_text)/char_per_batch)
    
    # cut off the end of the encoded text, that won't fit evenly into a batch
    encoded_text = encoded_text[:number_batches_avail*char_per_batch]
    
    encoded_text = encoded_text.reshape((sample_per_batch, -1))
    
    for n in range(0, encoded_text.shape[1], seq_len):
        x = encoded_text[:,n:n+seq_len]
        y = np.zeros_like(x)
        
        try:
            y[:,:-1] = x[:,1:]
            y[:,-1] = encoded_text[:,n+seq_len]
        except:
            y[:,:-1] = x[:,1:]
            y[:,-1] = encoded_text[:,0]
            
        yield x,y

In [None]:
# sample to test generate_batches function:
sample_text = np.arange(30)
print(sample_text)
print('----------------')
batch_generator = generate_batches(sample_text, sample_per_batch=2, seq_len=5)
x,y = next(batch_generator)
print('x1:',x)
print('y1:',y)
print('----------------')
x,y = next(batch_generator)
print('x2:',x)
print('y2:',y)

In [None]:
class Model(nn.Module):
    def __init__(self, all_chars, num_hidden=256, num_layers=4, drop_prob=0.5, use_gpu=False):
        super().__init__()
        self.drop_prob = drop_prob
        self.num_layers = num_layers
        self.num_hidden = num_hidden
        self.use_gpu = use_gpu
        
        self.all_chars = all_chars
        self.decoder = dict(enumerate(all_chars))
        self.encoder = {char:ind for ind, char in decoder.items()}
        
        self.lstm = nn.LSTM(input_size=len(all_chars), hidden_size=num_hidden, num_layers=num_layers, dropout=drop_prob, batch_first=True)
        
        self.dropout = nn.Dropout(drop_prob)
        
        self.fc_linear = nn.Linear(in_features=num_hidden, out_features=len(self.all_chars))
        
    def forward(self, x, hidden):
        lstm_out, hidden = self.lstm(x, hidden)
        drop_output = self.dropout(lstm_out)
        drop_output = drop_output.contiguous().view(-1, self.num_hidden)
        final_out = self.fc_linear(drop_output)
        
        return final_out, hidden
    
    def hidden_state(self, batch_size):
        if self.use_gpu:
            hidden = (torch.zeros(self.num_layers,batch_size,self.num_hidden).cuda(),
                     torch.zeros(self.num_layers,batch_size,self.num_hidden).cuda())
        else:
            hidden = (torch.zeros(self.num_layers,batch_size,self.num_hidden),
                     torch.zeros(self.num_layers,batch_size,self.num_hidden))
        return hidden

In [None]:
model = Model(all_chars=all_characters,
             num_hidden=512,
             num_layers=3,
             drop_prob=0.5,
             use_gpu=True)
total_params = []
for p in model.parameters():
    total_params.append(int(p.numel()))
total_params

In [None]:
sum(total_params)

In [None]:
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
criterion = nn.CrossEntropyLoss()

In [None]:
train_percent = 0.9
train_ind = int(len(encoded_text)*train_percent)

train_data = encoded_text[:train_ind]
val_data = encoded_text[train_ind:]

In [None]:
train_data.shape

In [None]:
epochs = 50
batch_size = 128
seq_len = 100
tracker = 0
num_char = max(encoded_text)+1

In [None]:
model.train()

if model.use_gpu:
    model.cuda()
    
for i in range(epochs):
    hidden = model.hidden_state(batch_size)
    
    for x, y in generate_batches(train_data, batch_size, seq_len):
        tracker += 1
        x = one_hot_encoder(x, num_char)
        
        inputs = torch.from_numpy(x)
        targets = torch.from_numpy(y)
        
        if model.use_gpu:
            inputs = inputs.cuda()
            targets = targets.cuda()
            
        hidden = tuple([state.data for state in hidden])
        
        model.zero_grad()
        lstm_output, hidden = model.forward(inputs, hidden)
        loss = criterion(lstm_output, targets.view(batch_size*seq_len).long())
        loss.backward()
        
        nn.utils.clip_grad_norm(model.parameters(), max_norm=5)
        optimizer.step()
        
        if tracker % 25 == 0:
            val_hidden = model.hidden_state(batch_size)
            val_losses = []
            model.eval()
            
            for x, y in generate_batches(val_data, batch_size, seq_len):
                x = one_hot_encoder(x, num_char)
                
                inputs = torch.from_numpy(x)
                targets = torch.from_numpy(y)
                
                if model.use_gpu:
                    inputs = inputs.cuda()
                    targets = targets.cuda()
                    
                val_hidden = tuple([state.data for state in val_hidden])

                lstm_output, val_hidden = model.forward(inputs, val_hidden)
                val_loss = criterion(lstm_output, targets.view(batch_size*seq_len).long())
                
                val_losses.append(val_loss.item())
                
            model.train()
            
            print(f'Epoch: {i}, Step:{tracker}, Val Loss: {val_loss.item()}')

In [None]:
torch.save(model.state_dict(), '../models/my_nlp_rnn_shaks.pt')

In [None]:
model.load_state_dict(torch.load('../models/Shakespeare_Model.net'))
model.eval()

In [None]:
def predict_next_char(model, char, hidden=None, k=1):
    encoded_text = model.encoder[char]
    encoded_text = np.array([[encoded_text]])
    encoded_text = one_hot_encoder(encoded_text, len(model.all_chars))

    inputs = torch.from_numpy(encoded_text)

    if model.use_gpu:
        inputs = inputs.cuda()

    hidden = tuple([state.data for state in hidden])
    lstm_out, hidden = model(inputs, hidden)
    
    probs = F.softmax(lstm_out, dim=1).data

    if model.use_gpu:
        probs = probs.cpu()

    probs, index_positions = probs.topk(k)

    index_positions = index_positions.numpy().squeeze()

    probs = probs.numpy().flatten()

    probs = probs/probs.sum()

    char = np.random.choice(index_positions, p=probs)

    return model.decoder[char], hidden



In [None]:
def generate_text(model, size, seed='The', k=1):
    if model.use_gpu:
        model.cuda()
    else:
        model.cpu()

    model.eval()
    output_chars = [c for c in seed]

    hidden = model.hidden_state(1)

    for char in seed:
        char, hidden = predict_next_char(model, char, hidden, k=k)
    
    output_chars.append(char)

    for i in range(size):
        char, hidden = predict_next_char(model, output_chars[-1], hidden, k=k)
        output_chars.append(char)
    
    return ''.join(output_chars)

In [None]:
print(generate_text(model, 1000, seed='The', k=3))