In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F

In [None]:
with open('shakespeare.txt','r',encoding='utf8') as f:
    text=f.read()

In [None]:
len(text)

In [None]:
print(text[:1000])

In [None]:
unique_char=set(text)

In [None]:
unique_char

In [None]:
decoder=dict(enumerate(unique_char))

In [None]:
decoder

In [None]:
encoder={char:digit for digit,char in decoder.items()}

In [None]:
encoder

In [None]:
encoded_text=np.array([encoder[char] for char in text])

In [None]:
len(encoded_text)

In [None]:
len(encoded_text)

In [None]:
def one_hot_encoder(encoded_text, num_uni_chars):
    one_hot = np.zeros((encoded_text.size, num_uni_chars))
    one_hot = one_hot.astype(np.float32)
    one_hot[np.arange(one_hot.shape[0]), encoded_text.flatten()] = 1.0
    one_hot = one_hot.reshape((*encoded_text.shape, num_uni_chars))
    return one_hot

In [None]:
op=np.array([1,2,0])

In [None]:
op.flatten()

In [None]:
one_hot_encoder(encoded_text,len(unique_char))


In [None]:
def generate_batches(encoded_text, samp_per_batch=10, seq_len=50):
    

    char_per_batch = samp_per_batch * seq_len
    num_batches_avail = int(len(encoded_text)/char_per_batch)
    encoded_text = encoded_text[:num_batches_avail * char_per_batch]
    encoded_text = encoded_text.reshape((samp_per_batch, -1))
    for n in range(0, encoded_text.shape[1], seq_len):
        x = encoded_text[:, n:n+seq_len]
        y = np.zeros_like(x)
        try:
            y[:, :-1] = x[:, 1:]
            y[:, -1]  = encoded_text[:, n+seq_len] 
        except:
            y[:, :-1] = x[:, 1:]
            y[:, -1] = encoded_text[:, 0]
            
        yield x, y
    

In [None]:
pp=encoded_text[:1000]

In [None]:
generate_batches(pp,samp_per_batch=5,seq_len=10)

In [None]:
print(next(generate_batches(pp,samp_per_batch=5,seq_len=10)))

In [None]:
batch_generator=generate_batches(pp,samp_per_batch=5,seq_len=10)

In [None]:
x,y=next(batch_generator)

In [None]:
print(x)

In [None]:
print(y)

In [None]:
class LSTMPYTorchModel(nn.Module):
    def __init__(self,all_chars,hidden_size=256,num_layers=3,drop_prob=0.5,use_gpu=False):
        super().__init__()
        self.num_layers=num_layers
        self.hidden_size=hidden_size
        self.dropout=drop_prob
        self.all_chars=all_chars
        self.use_gpu=use_gpu
        
        self.decoder=dict(enumerate(all_chars))
        self.encoder={char:keyu for keyu,char in self.decoder.items()}
        self.lstm=nn.LSTM(len(all_chars),hidden_size,num_layers,dropout=drop_prob,batch_first=True)
        self.dropout=nn.Dropout(drop_prob)
        self.linear=nn.Linear(hidden_size,len(all_chars))
        
    def forward(self,X,hidden):
        lstm_output,hidden=self.lstm(X,hidden)
        drop_output=self.dropout(lstm_output)
        drop_output=drop_output.contiguous().view(-1,self.hidden_size)
        final_out=self.linear(drop_output)
        return final_out,hidden
        
    def hidden_state(self,batch):
        if self.use_gpu:
            hidden=(torch.zeros(self.num_layers,batch,self.hidden_size).cuda(),torch.zeros(self.num_layers,batch,self.hidden_size).cuda())
        else:
            hidden=(torch.zeros(self.num_layers,batch,self.hidden_size),torch.zeros(self.num_layers,batch,self.hidden_size))       
        return hidden
                

In [None]:
model=LSTMPYTorchModel(all_chars=unique_char,hidden_size=512,num_layers=3,use_gpu=True)



In [None]:
criterion=nn.CrossEntropyLoss()
optimizer=torch.optim.Adam(model.parameters(),lr=0.001)

In [None]:
train_ind=int(0.9*len(encoded_text))

In [None]:
train_ind

In [None]:
train_data=encoded_text[:train_ind]
val_data=encoded_text[train_ind:]

In [None]:
epochs=100
batch_size=100
seq_len=100

In [None]:
num_char=len(unique_char)

In [None]:
if model.use_gpu:
    model.cuda()
for i in range(epochs):
    
    hidden = model.hidden_state(batch_size)
    
    
    for x,y in generate_batches(train_data,batch_size,seq_len):
        x = one_hot_encoder(x,num_char)
        inputs = torch.from_numpy(x)
        targets = torch.from_numpy(y)
        
        if model.use_gpu:
            
            inputs = inputs.cuda()
            targets = targets.cuda()
        
        hidden = tuple([state.data for state in hidden])
        
        model.zero_grad()
        
        lstm_output, hidden = model.forward(inputs,hidden)
        loss = criterion(lstm_output,targets.view(batch_size*seq_len).long())
        
        loss.backward()
        nn.utils.clip_grad_norm_(model.parameters(),max_norm=5)
        
        optimizer.step()
        