## Character-Level LSTM in PyTorch

This network is based off of Andrej Karpathy's post on RNNs and implementation in Torch.The network will train character by character on some text, then generate new text character by character. Here I have trained the model using Shakespeare's poems.

<img src="shakespeare.jpg" width="500">

In [1]:
import torch
import numpy as np
from torch import nn
import torch.nn.functional as F
from collections import Counter

In [2]:
with open('shakespeare.txt', 'r') as f:
    text = f.read()
##text = text.lower()

In [19]:
## There are only 38 unique charecters here in my text 

text = text.lower()
len(set(text))

38

In [20]:
words = sorted(tuple(set(text)))
words_count = Counter(words)
##sorted_words = sorted(words_count, key= words_count.get, reverse = True)

## sorting not required here since this is a char RNN

int2char = dict(enumerate(words_count))
char2int = {ch:ii for ii, ch in int2char.items()}

In [21]:
## encode the text

encoded = []
for ch in text:
    encoded.append(char2int[ch])
encoded = np.array(encoded)

In [22]:
## Converting each charecter into one hot vector

In [23]:
def one_hot(arr,n_labels):
    ##rows = np.multiply(arr.shape[0],arr.shape[1])
    rows = np.multiply(*arr.shape)
    one_hot = np.zeros((rows , n_labels ),dtype = np.int32)
    one_hot[np.arange(one_hot.shape[0]), arr.flatten()] = 1
    one_hot= one_hot.reshape(arr.shape[0],arr.shape[1],n_labels)
    return one_hot

In [24]:
test_seq = np.array([[1,2,3]])
one_hot(test_seq, 5)

array([[[0, 1, 0, 0, 0],
        [0, 0, 1, 0, 0],
        [0, 0, 0, 1, 0]]])

###### We just want batches to fill all the batches. So the total length should be N * M * K
###### where N = Batch_size , M = Seq_len K = No of Batches


In [25]:
def get_batches(arr, batch_size, seq_len):
    total_batches = len(arr)//(batch_size*seq_len)
    total_arr_length = batch_size*seq_len*total_batches  ## N * M * K
    
    arr =  arr [ : total_arr_length]
    
    ## We will use Batch First =True going forward
    ## This would mean the no of columns is equal to M*K
    ## So in one loop we will just yield M columns...and this would happen K times
    arr = arr.reshape(batch_size, -1)
    
    for n in (0, arr.shape[1], seq_len):
        x = arr[:,n:n+seq_len]
        y = np.zeros_like(x)
        print(y.shape)
        try:
            y[:,:-1] = x[:,1:]
            y[:,-1] = arr[:,n+seq_len]
        except IndexError: ## for the last Seq_length
            y[:,:-1] = x[:,1:]
            y[:,-1] = arr[:,0]
            
        yield x,y

In [26]:
def get_batches(arr, batch_size, seq_length):
    '''Create a generator that returns batches of size
       batch_size x seq_length from arr.
       
       Arguments
       ---------
       arr: Array you want to make batches from
       batch_size: Batch size, the number of sequences per batch
       seq_length: Number of encoded chars in a sequence
    '''
    
    batch_size_total = batch_size * seq_length
    # total number of batches we can make
    n_batches = len(arr)//batch_size_total
    
    # Keep only enough characters to make full batches
    arr = arr[:n_batches * batch_size_total]
    # Reshape into batch_size rows
    arr = arr.reshape((batch_size, -1))
    
    # iterate through the array, one sequence at a time
    for n in range(0, arr.shape[1], seq_length):
        # The features
        x = arr[:, n:n+seq_length]
        # The targets, shifted by one
        y = np.zeros_like(x)
        try:
            y[:, :-1], y[:, -1] = x[:, 1:], arr[:, n+seq_length]
        except IndexError:
            y[:, :-1], y[:, -1] = x[:, 1:], arr[:, 0]
        yield x, y

In [27]:
##test get_batches 

batches = get_batches(encoded, 8,100)
x,y = next(batches)

# printing out the first 10 items in a sequence
print('x\n', x[:10, :10])
print('\ny\n', y[:10, :10])

x
 [[31 19 16  1 30 26 25 25 16 31]
 [15 20 25 18  1 24 16 25  8  1]
 [19 16 30 16  1 14 32 29 20 26]
 [26 23  1 20 30  1 23 26 33 16]
 [26 26 22  6  1 31 19 20 30  1]
 [19 16 16  1 29 16 30 26 29 31]
 [16 12 29 16 29  6  0 13 32 31]
 [27 32 31  3 30 31  1 17 26 29]]

y
 [[19 16  1 30 26 25 25 16 31 30]
 [20 25 18  1 24 16 25  8  1  1]
 [16 30 16  1 14 32 29 20 26 32]
 [23  1 20 30  1 23 26 33 16  6]
 [26 22  6  1 31 19 20 30  1 23]
 [16 16  1 29 16 30 26 29 31  9]
 [12 29 16 29  6  0 13 32 31  1]
 [32 31  3 30 31  1 17 26 29 31]]


In [28]:
gpu = torch.cuda.is_available()
if gpu:
    print('Training on GPU')
else:
    print('Not Training on GPU')

Training on GPU


In [29]:
#### Defining the network with PyTorch

In [30]:
class CharRNN(nn. Module):
    def __init__(self,tokens, n_hidden=256, n_layers=2,drop_prob = 0.5,lr = 0.001):
        super().__init__()
        ## Class variables
        self.drop_prob = drop_prob
        self.lr = lr
        self.n_layers = n_layers
        self.n_hidden = n_hidden
        
        ## create class dictionaries 
        self.chars = tokens
        self.int2char = dict(enumerate(self.chars))
        self.char2int = {ch:ii for ii,ch in self.int2char.items()}
        
        ## Create the layers 
        ## I am not using any embedding here 
        input_size = len(self.chars)
        output_size = len(self.chars)
        self.lstm = nn.LSTM(input_size, self.n_hidden, self.n_layers, 
                           dropout = drop_prob, batch_first = True)
        self.dropout = nn.Dropout(drop_prob)
        self.FC = nn.Linear(self.n_hidden, output_size)

    def forward(self,x,hidden):
        
        r_out,hidden = self.lstm(x,hidden)
        r_out = self.dropout(r_out)
        
        r_out = r_out.contiguous().reshape(-1,self.n_hidden)
        
        out = self.FC(r_out)
        
        return out,hidden
    def init_hidden(self,batch_size):
        
        ### initialized to zero, for hidden state and cell state of LSTM
        
        weights = next(self.parameters()).data
        
        if gpu: ## Create Cuda tensors
            hidden = (weights.new(self.n_layers, batch_size, self.n_hidden).zero_().cuda(),
                  weights.new(self.n_layers, batch_size, self.n_hidden).zero_().cuda())
        else:
            hidden = (weights.new(self.n_layers, batch_size, self.n_hidden).zero_().cuda(),
                  weights.new(self.n_layers, batch_size, self.n_hidden).zero_().cuda())
        
        return hidden

In [31]:
## Build the train function

In [33]:
def train(model,data,epochs =1,batch_size = 10,seq_len =10,lr = 0.001,clip = 5, val_frac= 0.1, print_freq = 10):
    
    if gpu:
        model.cuda()
    optimizer = torch.optim.Adam(model.parameters(), lr = lr)
    ##optimizer = torch.optim.Adam(net.parameters(), lr=lr)
    criterion = nn.CrossEntropyLoss()
    model_name = 'char_rnn.net'
    
    
    ##Create train and Validation set 
    val_idx = int(len(data)*(1- val_frac))
    train_data, valid_data = data[:val_idx], data[val_idx:]
    
    #### counter and vocab size
    vocab_size = len(model.chars)
    minimum_val_loss =np.inf
    ##minimum_val_loss =3.1185
    
    
    
    for epoch in range(epochs):
        counter = 0
        train
        
        h = model.init_hidden(batch_size)
        for x,y in get_batches(train_data,batch_size,seq_len):
            model.train()
            counter +=1
            x = one_hot(x,vocab_size)
            inputs = torch.from_numpy(x)
            inputs = inputs.type(torch.FloatTensor)
            labels = torch.from_numpy(y)
            labels = labels.type(torch.FloatTensor)
            if gpu:
                inputs = inputs.cuda()
                labels = labels.cuda()
            h = tuple([each.data for each in h]) ## this is already in cuda if GPU is available
            
            model.zero_grad()
            output,h = model(inputs,h)
            labels = labels.type(torch.cuda.LongTensor)
            loss = criterion(output,labels.view(batch_size*seq_len))
            loss.backward()            
            nn.utils.clip_grad_norm_(model.parameters(), clip)

            optimizer.step()
            
            if counter% print_freq == 0:
                model.eval()
                val_losses =[]
                val_h = model.init_hidden(batch_size)
                for val_x,val_y in get_batches(valid_data,batch_size,seq_len):
                    val_x= one_hot(val_x,vocab_size)
                    inputs, labels = torch.from_numpy(val_x), torch.from_numpy(val_y)
                    inputs = inputs.type(torch.FloatTensor)
                    labels = labels.type(torch.FloatTensor)
                    inputs,labels = inputs.cuda(),labels.cuda()
                    val_h = tuple([each.data for each in val_h])
                    output,val_h = model(inputs,val_h)
                    labels = labels.type(torch.cuda.LongTensor)
                    loss= criterion(output,labels.view(batch_size*seq_len))
                    val_losses.append(loss.item())
  
                print("Epoch: {}/{}...".format(epoch+1, epochs),
                      "Step: {}...".format(counter),
                      "Loss: {:.4f}...".format(loss.item()),
                      "Val Loss: {:.4f}".format(np.mean(val_losses)))
    
                ## Save Model if validation loss goes down
                if (np.mean(val_losses) < minimum_val_loss):
                    minimum_val_loss = np.mean(val_losses)
                    checkpoint = {'n_hidden' : model.n_hidden,
                                  'n_layers' : model.n_layers,
                                   'state_dict': model.state_dict(),
                                   'tokens' :model.chars}
                    print('Saving Model...')
        
                with open(model_name, 'wb') as f:
                    torch.save(checkpoint,f)


In [86]:
##Hyperparameters

n_hidden= 512
n_layers=3
drop_prob = 0.1
lr = 0.001

model = CharRNN(words,n_hidden,n_layers,drop_prob,lr)
print(model)

CharRNN(
  (lstm): LSTM(38, 512, num_layers=3, batch_first=True, dropout=0.1)
  (dropout): Dropout(p=0.1)
  (FC): Linear(in_features=512, out_features=38, bias=True)
)


In [87]:
data= encoded
epochs = 22
batch_size = 100
seq_len = 30
lr = 0.001
clip = 5
val_frac= 0.1
##print_freq = 10

##Train the model
train(model,data,epochs,batch_size,seq_len,lr,clip, val_frac= 0.1 )


Epoch: 1/22... Step: 10... Loss: 3.0081... Val Loss: 3.0300
Saving Model...
Epoch: 1/22... Step: 20... Loss: 2.9948... Val Loss: 3.0066
Saving Model...
Epoch: 2/22... Step: 10... Loss: 2.9874... Val Loss: 2.9965
Saving Model...
Epoch: 2/22... Step: 20... Loss: 2.9881... Val Loss: 2.9948
Saving Model...
Epoch: 3/22... Step: 10... Loss: 2.9860... Val Loss: 2.9926
Saving Model...
Epoch: 3/22... Step: 20... Loss: 2.9853... Val Loss: 2.9920
Saving Model...
Epoch: 4/22... Step: 10... Loss: 2.9493... Val Loss: 2.9551
Saving Model...
Epoch: 4/22... Step: 20... Loss: 2.9540... Val Loss: 2.9584
Epoch: 5/22... Step: 10... Loss: 2.7863... Val Loss: 2.7893
Saving Model...
Epoch: 5/22... Step: 20... Loss: 2.7270... Val Loss: 2.7274
Saving Model...
Epoch: 6/22... Step: 10... Loss: 2.6257... Val Loss: 2.6219
Saving Model...
Epoch: 6/22... Step: 20... Loss: 2.5582... Val Loss: 2.5491
Saving Model...
Epoch: 7/22... Step: 10... Loss: 2.4208... Val Loss: 2.4132
Saving Model...
Epoch: 7/22... Step: 20... L

##### Loading the saved model


In [88]:
 
with open('char_rnn.net', 'rb') as f:
    checkpoint = torch.load(f)
model = CharRNN(checkpoint['tokens'],n_hidden =checkpoint['n_hidden'],n_layers=checkpoint['n_layers'])
model.load_state_dict(checkpoint['state_dict'])

In [89]:
## Prediction using this model

In [90]:
def predict(model, char,h = None, top_k = None):
    ## char is the input charecter here
    ##model.eval() (while call eval during priming)
    vocab_size = len(model.chars)
    x = np.array([[char2int[char]]]) ## multipli dimentional
    x = one_hot(x,vocab_size)
    x = torch.from_numpy(x)
    x = x.type(torch.FloatTensor)
    if gpu:
        x= x.cuda()
    h = tuple([each.data for each in h])
    
    output, h = model(x,h)
    p= F.softmax(output,dim =1).data
    if gpu:
        p = p.cpu()
    if top_k ==None:
        top_ch = np.arange(len(net.chars))
    else:
        p, top_ch = p.topk(top_k)
        top_ch= top_ch.numpy().squeeze()
    p = p.numpy().squeeze()
    ch = np.random.choice(top_ch, p = p/p.sum())
    character = int2char[ch]
    return character,h

In [91]:
## Priming and generating Text

In [92]:
def generate_text(model, size, prime='The', top_k=None):
    if gpu:
        model.cuda()
    chars = [ch for ch in prime]
    h = model.init_hidden(1)  ## Batch size is 1 
    model.eval()
    for ch in prime:
        char,h = predict(model, ch,h , top_k = top_k)
    chars.append(char) ## just add the last one after prime
    
    for i in range(size):
        char,h = predict(model, chars[-1],h , top_k = top_k)
        chars.append(char)
        
    return ''.join(chars)
    

In [136]:
generated_text =generate_text(model, size=200, prime='thee', top_k=6)

In [137]:
print(generated_text)

thee all wors in my self this show,
time's thou ast to me.

fear should a palse, all his prowounged worst breath, and sturn to be tee,
that sime as time's feeds, thou art so tell,
thou art when, when i mak


##### HAHA!! It was fun :)