In [9]:
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
device = torch.device("mps")

In [10]:
with open('../Data/shakespeare.txt') as f:
    text = f.read()

In [11]:
all_char = set(text)
n_unique_char = len(all_char)

In [12]:
decoder = dict(enumerate(all_char))
encoder = {char:ind for ind,char in decoder.items()}
encoded_text = np.array([encoder[char]for char in text])

In [13]:
def one_hot_encoder(encoded_text,n_unique_char):
    one_hot = np.zeros((encoded_text.size,n_unique_char)).astype(np.float32)
    one_hot[np.arange(one_hot.shape[0]),encoded_text.flatten()] = 1.0
    one_hot = one_hot.reshape((*encoded_text.shape,n_unique_char))
    return one_hot   

In [14]:
def generate_batches(encoded_text,sample_per_batch=10,seq_len=50):
    char_per_batch = sample_per_batch * seq_len
    avail_batch = int(len(encoded_text)/char_per_batch)
    encoded_text = encoded_text[:char_per_batch*avail_batch]
    encoded_text = encoded_text.reshape((sample_per_batch,-1))
    
    for n in range(0,encoded_text.shape[1],seq_len):
        x = encoded_text[:,n:n+seq_len]
        y = np.zeros_like(x)
        try : 
            y[:,:-1] = x[:,1:]
            y[:,-1] = encoded_text[:,n+seq_len]
        #for the very last case
        except : 
            y[:,:-1] = x[:,1:]
            y[:,-1] = encoded_text[:,0]
        yield x,y

In [15]:
batch_generator = generate_batches(encoded_text,sample_per_batch=10,seq_len=50)

In [16]:
x,y = next(batch_generator)

In [17]:
class LSTM(nn.Module):
    def __init__(self,all_char,num_hidden=256,num_layers=4,drop_prob=0.5):
        super(LSTM,self).__init__()
        self.all_char = all_char
        self.num_hidden = num_hidden
        self.num_layers = num_layers
        self.drop_prob = drop_prob
        
        self.decoder = dict(enumerate(all_char))
        self.encoder = {char:ind for ind,char in decoder.items()}
        
        self.lstm = nn.LSTM(len(self.all_char),num_hidden,num_layers,dropout = drop_prob,batch_first=True)
        self.fc_linear = nn.Linear(num_hidden,len(self.all_char))
        self.dropout = nn.Dropout(drop_prob)
    def forward(self,x,hidden):
        lstm_out, hidden = self.lstm(x,hidden)
        drop_out = self.dropout(lstm_out)
        drop_out = drop_out.contiguous().view(-1,self.num_hidden)
        final_out = self.fc_linear(drop_out)
        return final_out,hidden
    def init_hidden(self,batch_size):
        hidden = (torch.zeros(self.num_layers,batch_size,self.num_hidden).to(device),
                 torch.zeros(self.num_layers,batch_size,self.num_hidden).to(device))
        return hidden
        

In [None]:
optimizer = torch.optim.Adam(model.parameters(),lr=0.001)
criterion = nn.CrossEntropyLoss()

In [None]:
train_percent = 0.9
train_ind = int(len(encoded_text) * (train_percent))
train_data = encoded_text[:train_ind]
val_data = encoded_text[train_ind:]

In [None]:
num_epoch = 75
batch_size = 100
seq_len = 100
tracker = 0
num_char = max(encoded_text)+1

In [None]:
model.train()

In [None]:
model.train()
#for every epoch
for epoch in range(num_epoch):
    #reset hidden state
    hidden = model.init_hidden(batch_size)
    #go through every x,y in batch_gen obj
    for x,y in generate_batches(val_data,batch_size,seq_len):
        tracker += 1 
        #zero_grad
        model.zero_grad()
        #create input and target
        x = one_hot_encoder(x,num_char)
        inputs = torch.tensor(x).to(device)
        targets = torch.tensor(y).long().to(device)
        hidden = tuple([state.data for state in hidden])
        #now pass through model
        lstm_out,hidden = model.forward(inputs,hidden)
        #calc loss and backprop
        loss = criterion(lstm_out,targets.view(batch_size * seq_len))
        loss.backward()
        nn.utils.clip_grad_norm_(model.parameters(),max_norm=5)
        optimizer.step()
        #for every 25 steps do validation
        if tracker % 25 == 0:
            #put model in eval model
            model.eval()
            val_hidden = model.init_hidden(batch_size)
            val_losses = []
            for x,y in generate_batches(train_data,batch_size,seq_len):
                x = one_hot_encoder(x,num_char)
                inputs = torch.tensor(x).to(device)
                targets = torch.tensor(y).long().to(device)
                val_hidden = tuple([state.data for state in val_hidden])
                lstm_out,val_hidden = model.forward(inputs,val_hidden)
                loss = criterion(lstm_out,targets.view(batch_size*seq_len))
                val_losses.append(loss.item())
            print(f"epoch : {epoch+1} tracker : {tracker} loss : {loss.item()}")
            model.train()
                
        

In [19]:
model_2 = LSTM(all_char,num_hidden=1024,num_layers=4,drop_prob=0.4).to(device)
model_2.load_state_dict(torch.load('/Users/aadityajoshi/Downloads/charmodel-2.pth',map_location=torch.device('cpu')))

  model_2.load_state_dict(torch.load('/Users/aadityajoshi/Downloads/charmodel-2.pth',map_location=torch.device('cpu')))


<All keys matched successfully>

In [20]:
model_2 = model_2.to(device)

In [21]:
def pred_next_char(model,char,hidden=None,k=1):
    encoded_text = model.encoder[char]
    encoded_text = np.array([[encoded_text]]) 
    encoded_text = one_hot_encoder(encoded_text,len(model.all_char))
    #create input by encoding and one_hotting the char
    inputs = torch.tensor(encoded_text).to(device)
    #create hidden state
    hidden = tuple([state.data for state in hidden])
    #make prediction
    lstm_out,hidden = model.forward(inputs,hidden)
    #get probabilities
    probs = F.softmax(lstm_out,dim=1).data
    probs = probs.cpu()
    probs,index_position = probs.topk(k)
    index_position = index_position.numpy().squeeze()
    probs = probs.numpy().flatten()
    probs = probs/probs.sum()
    #choose a char from top k
    char = np.random.choice(index_position,p=probs)
    return model.decoder[char],hidden

In [22]:
def generate_text(model,size,seed = 'the',k=1):
    model = model.to(device)
    model.eval()
    output_char = [c for c in seed]
    hidden = model.init_hidden(1)
    for char in seed:
        char,hidden = pred_next_char(model,char,hidden,k=k)
    output_char.append(char)
    for i in range(size):
        char,hidden = pred_next_char(model,output_char[-1],hidden,k=k)
        output_char.append(char)
    return ' '.join(output_char)

In [23]:
print(generate_text(model_2,100,seed='The ',k=2))

T h e   F v ` 5 v F " ` 2 l ` F B F ` 5 9 7 F W 7 7 o ; F F F F v ` 7 F " R O V u L F v ` 5 v F v ` 7 I F 5 O 7 F 5 V V F 5 W F v ` R D F W ` 5 V v F c 7 F v ` 7 F W v 5 v 7 ; F F F F F F F F F F F F F F F F F
