<a href="https://colab.research.google.com/github/ttolofari/Character-Level-LSTM/blob/master/Character_Level_LSTM.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

**Character Level LSTM**

In [0]:
import numpy as np
import torch
from torch import nn
import torch.nn.functional as F

In [0]:
# Opening the text file in a read mode

with open('anna.txt', 'r') as f:
    text = f.read()

In [0]:
# Looking at the first 100 characters in the file

text[:100]

'Chapter 1\n\n\nHappy families are all alike; every unhappy family is unhappy in its own\nway.\n\nEverythin'

In [0]:
# Tokenization of the characters

# Using dictionaries to convert the characters to and from integers. Encoding the characters as integers makes it easier for the network

# int2char, this maps integers to characters
# char2int, this maps characters to unique integers

chars = tuple(set(text)) # Creating a set of all the characters in the text
int2char = dict(enumerate(chars))

char2int = {ch: ii for ii, ch in int2char.items()}


# Encoding the text

encoded = np.array([char2int[ch] for ch in text])

In [0]:
# Displaying the output of the encoded characters

encoded[:100]

array([32, 33, 34, 29, 21, 26, 67, 78,  9, 71, 71, 71,  8, 34, 29, 29, 73,
       78, 50, 34, 35, 24, 69, 24, 26, 31, 78, 34, 67, 26, 78, 34, 69, 69,
       78, 34, 69, 24, 63, 26, 56, 78, 26, 65, 26, 67, 73, 78, 62, 80, 33,
       34, 29, 29, 73, 78, 50, 34, 35, 24, 69, 73, 78, 24, 31, 78, 62, 80,
       33, 34, 29, 29, 73, 78, 24, 80, 78, 24, 21, 31, 78, 39, 40, 80, 71,
       40, 34, 73, 64, 71, 71, 10, 65, 26, 67, 73, 21, 33, 24, 80])

In [0]:
# Preprocessing the data
# The LSTM expects an input that is one-hot encoded meaning that each character is converted into an integer and then converted into a column vector where only
#its corresponding integer index will have the value of 1 and the rest vector will be filled with 0's. 
def one_hot_encode(arr, n_labels):
    
    # Initialixe the encoded array
    one_hot = np.zeros((np.multiply(*arr.shape), n_labels), dtype = np.float32)
    
    
    # Fill the appropriate elements with ones
    one_hot[np.arange(one_hot.shape[0]), arr.flatten()] = 1.
    
    # Finally reshape it to get it back to the original array
    
    one_hot = one_hot.reshape((*arr.shape, n_labels))
    
    return one_hot

In [0]:
# Check that the function works as expected

test_seq = np.array([[3, 5, 1]])

one_hot = one_hot_encode(test_seq, 8)

print(one_hot)

[[[0. 0. 0. 1. 0. 0. 0. 0.]
  [0. 0. 0. 0. 0. 1. 0. 0.]
  [0. 1. 0. 0. 0. 0. 0. 0.]]]


In [0]:
# Making mini-batches

def get_batches(arr, batch_size, seq_length):
  
    # Get the number of batches we can make
    
    batch_size_total = batch_size * seq_length
    
    n_batches = len(arr)//batch_size_total
    
    # Keep only enough characters to make full batches
    
    arr = arr[:n_batches * batch_size_total]
    
    # Reshape into batch_size rows
    arr = arr.reshape((batch_size, -1))
    
    
    # Iterate through the array, one sequence at a time 
    
    for n in range(0, arr.shape[1], seq_length):
      
        # The features
        x = arr[:, n : n+seq_length]
        
        # The targets, shifted by one
        
        y = np.zeros_like(x)
        
        try:
            y[:, :-1], y[:, -1] = x[:, 1:], arr[:, n+seq_length]
        except IndexError:
            y[:, :-1], y[:, -1] = x[:, 1:], arr[:, 0]
        yield x, y

In [0]:
# Testing the implementation

batches = get_batches(encoded, 8, 50)
x, y = next(batches)

In [0]:
# Printing out the first 10 items in a sequence
print('x\n', x[:10, :10])
print('\ny\n', y[:10, :10])

x
 [[32 33 34 29 21 26 67 78  9 71]
 [31 39 80 78 21 33 34 21 78 34]
 [26 80 16 78 39 67 78 34 78 50]
 [31 78 21 33 26 78 20 33 24 26]
 [78 31 34 40 78 33 26 67 78 21]
 [20 62 31 31 24 39 80 78 34 80]
 [78 61 80 80 34 78 33 34 16 78]
 [57 14 69 39 80 31 63 73 64 78]]

y
 [[33 34 29 21 26 67 78  9 71 71]
 [39 80 78 21 33 34 21 78 34 21]
 [80 16 78 39 67 78 34 78 50 39]
 [78 21 33 26 78 20 33 24 26 50]
 [31 34 40 78 33 26 67 78 21 26]
 [62 31 31 24 39 80 78 34 80 16]
 [61 80 80 34 78 33 34 16 78 31]
 [14 69 39 80 31 63 73 64 78 76]]


In [0]:
# Defining the model

train_on_gpu = torch.cuda.is_available()
if(train_on_gpu):
    print('Training on GPU')
else:
    print('No GPU available. training on CPU, consider making n_epochs very small.')

Training on GPU


In [0]:
class CharRNN(nn.Module):
  
    def __init__(self, tokens, n_hidden = 256, n_layers = 2, drop_prob = 0.5, lr = 0.001):
        
        super().__init__()
        self.drop_prob = drop_prob
        self.n_layers = n_layers
        self.n_hidden = n_hidden
        self.lr = lr
        
        
        # Creating character dictionaries
        
        self.chars = tokens
        self.int2char = dict(enumerate(self.chars))
        self.char2int = {ch: ii for ii, ch in self.int2char.items()}
        
        # Defining the LSTM layer
        self.lstm = nn.LSTM(len(self.chars), n_hidden, n_layers, dropout = drop_prob, batch_first = True)
        
        # Defining the Dropout
        self.dropout = nn.Dropout(drop_prob)
        
        # Definng the final fully-connected output layer
        self.fc = nn.Linear(n_hidden, len(self.chars))
        
        
    def forward(self, x, hidden):
      
        # Getting the output and new hidden state from the LSTM
        
        r_output, hidden = self.lstm(x, hidden)
        
        # Pass through the dropout layer
        
        out = self.dropout(r_output)
        
        # Stack up LST outputs using view
        
        out = out.contiguous().view(-1, self.n_hidden)
        
        # Put through FC layer
        
        out = self.fc(out)
        
        return out, hidden
      
    
    def init_hidden(self, batch_size):
      
        '''Initializes hidden state'''
        
        # Creates two new tensors with sizes n_layers X batch_size X n_hidden, 
        # initialize to zero, for hidden state and cell state of LSTM
        
        weight = next(self.parameters()).data
        
        if (train_on_gpu):
            hidden = (weight.new(self.n_layers, batch_size, self.n_hidden).zero_().cuda(),
                  weight.new(self.n_layers, batch_size, self.n_hidden).zero_().cuda())
        else:
            hidden = (weight.new(self.n_layers, batch_size, self.n_hidden).zero_(),
                      weight.new(self.n_layers, batch_size, self.n_hidden).zero_())
        return hidden

In [0]:
def train(net, data, epochs = 10, batch_size = 10, seq_length = 50, lr = 0.001, clip=5, val_frac=0.1, print_every=10):
  
    net.train()
    opt = torch.optim.Adam(net.parameters(), lr= lr)
    criterion = nn.CrossEntropyLoss()
    
    # Create training and validation data
    
    val_idx = int(len(data)*(1 - val_frac))
    data, val_data = data[:val_idx], data[val_idx:]
    
    if (train_on_gpu):
        net.cuda()
        
    counter = 0
    n_chars = len(net.chars)
    
    for e in range(epochs):
      
        # Initialize hidden state
        
        h = net.init_hidden(batch_size)
        
        for x, y in get_batches(data, batch_size, seq_length):
            counter += 1
            
            # One_hot encode our data and make them torch tensors
            
            x = one_hot_encode(x, n_chars)
            inputs, targets = torch.from_numpy(x), torch.from_numpy(y)
            
            if (train_on_gpu):
                inputs, targets = inputs.cuda(), targets.cuda()
            
            # Creating a new variable for the hidden state to avoid backpropagation through the entire history
            
            h = tuple([each.data for each in h])
            
            # Zero gradient
            
            net.zero_grad()
            
            # Output from the model
            output, h = net(inputs, h)
            
            # Calculate the loss
            loss = criterion(output, targets.view(batch_size*seq_length))
            
            loss.backward()
            
            # This is to help prevent exploding gradient problem
            nn.utils.clip_grad_norm_(net.parameters(), clip)
            opt.step()
            
            # Loss stat
            
            if counter % print_every == 0:
              
                # Get validation loss
                val_h = net.init_hidden(batch_size)
                val_losses = []
                net.eval()
                for x, y in get_batches(val_data, batch_size, seq_length):
                    # One_hot encode our data and make them torch tensors
                    
                    x = one_hot_encode(x, n_chars)
                    inputs, targets = torch.from_numpy(x), torch.from_numpy(y)
                    
                    # Creating a new variable for the hidden state to avoid backpropagation through the entire history
                    
                    val_h = tuple([each.data for each in val_h])
                    
                    inputs, targets = x, y
                    if (train_on_gpu):
                        inputs, targets = torch.Tensor(inputs).cuda(), torch.Tensor(targets).cuda().long()
                    
                    output, val_h = net(inputs, val_h)
                    val_loss = criterion(output, targets.view(batch_size*seq_length))
                    val_losses.append(val_loss.item())
                    
                net.train() # reset to train mode after iterationg through validation data    
                print("Epoch: {}/{}...".format(e+1, epochs), 
                         "Steps: {}...".format(counter), 
                         "Loss: {:.4f}...".format(loss.item()), 
                         "Val Loss {:.4f}".format(np.mean(val_losses)))
              
                    


In [0]:
# define and print the net
n_hidden=512
n_layers=2

net = CharRNN(chars, n_hidden, n_layers)
print(net)

CharRNN(
  (lstm): LSTM(83, 512, num_layers=2, batch_first=True, dropout=0.5)
  (dropout): Dropout(p=0.5)
  (fc): Linear(in_features=512, out_features=83, bias=True)
)


In [0]:
batch_size = 128
seq_length = 100
n_epochs =  20 # start small if you are just testing initial behavior

# train the model
train(net, encoded, epochs=n_epochs, batch_size=batch_size, seq_length=seq_length, lr=0.001, print_every=10)

Epoch: 1/20... Steps: 10... Loss: 3.2976... Val Loss 3.2488
Epoch: 1/20... Steps: 20... Loss: 3.1530... Val Loss 3.1438
Epoch: 1/20... Steps: 30... Loss: 3.1392... Val Loss 3.1268
Epoch: 1/20... Steps: 40... Loss: 3.1152... Val Loss 3.1195
Epoch: 1/20... Steps: 50... Loss: 3.1443... Val Loss 3.1175
Epoch: 1/20... Steps: 60... Loss: 3.1208... Val Loss 3.1160
Epoch: 1/20... Steps: 70... Loss: 3.1077... Val Loss 3.1148
Epoch: 1/20... Steps: 80... Loss: 3.1251... Val Loss 3.1126
Epoch: 1/20... Steps: 90... Loss: 3.1232... Val Loss 3.1063
Epoch: 1/20... Steps: 100... Loss: 3.1033... Val Loss 3.0929
Epoch: 1/20... Steps: 110... Loss: 3.0837... Val Loss 3.0710
Epoch: 1/20... Steps: 120... Loss: 3.0123... Val Loss 3.0123
Epoch: 1/20... Steps: 130... Loss: 2.9403... Val Loss 2.9098
Epoch: 2/20... Steps: 140... Loss: 2.8083... Val Loss 2.7605
Epoch: 2/20... Steps: 150... Loss: 2.7335... Val Loss 2.6815
Epoch: 2/20... Steps: 160... Loss: 2.6126... Val Loss 2.5767
Epoch: 2/20... Steps: 170... Loss

In [0]:
model_name = 'rnn_x_epoch.net'

checkpoint = {'n_hidden': net.n_hidden,
              'n_layers': net.n_layers,
              'state_dict': net.state_dict(),
              'tokens': net.chars}

with open(model_name, 'wb') as f:
    torch.save(checkpoint, f)

In [0]:
# Making predictions

def predict(net, char, h=None, top_k=None):
        ''' Given a character, predict the next character.
            Returns the predicted character and the hidden state.
        '''
        
        # tensor inputs
        x = np.array([[net.char2int[char]]])
        x = one_hot_encode(x, len(net.chars))
        inputs = torch.from_numpy(x)
        
        if(train_on_gpu):
            inputs = inputs.cuda()
        
        # detach hidden state from history
        h = tuple([each.data for each in h])
        # get the output of the model
        out, h = net(inputs, h)

        # get the character probabilities
        p = F.softmax(out, dim=1).data
        if(train_on_gpu):
            p = p.cpu() # move to cpu
        
        # get top characters
        if top_k is None:
            top_ch = np.arange(len(net.chars))
        else:
            p, top_ch = p.topk(top_k)
            top_ch = top_ch.numpy().squeeze()
        
        # select the likely next character with some element of randomness
        p = p.numpy().squeeze()
        char = np.random.choice(top_ch, p=p/p.sum())
        
        # return the encoded value of the predicted char and the hidden state
        return net.int2char[char], h

In [0]:
# Priming and generating text

def sample(net, size, prime='The', top_k=None):
        
    if(train_on_gpu):
        net.cuda()
    else:
        net.cpu()
    
    net.eval() # eval mode
    
    # First off, run through the prime characters
    chars = [ch for ch in prime]
    h = net.init_hidden(1)
    for ch in prime:
        char, h = predict(net, ch, h, top_k=top_k)

    chars.append(char)
    
    # Now pass in the previous character and get a new one
    for ii in range(size):
        char, h = predict(net, chars[-1], h, top_k=top_k)
        chars.append(char)

    return ''.join(chars)

In [0]:
print(sample(net, 1000, prime='Anna', top_k=5))

Anna, and the letter,
he felt this to the children, and with a strange times had been too talking about
the same to his brother, the study of those coming to her from the serive that was the
strength that his brilliante and an officer who had threw them, and had begun in the same
starching and the memory, they arrived. He stood all the simple of
happiness, and the frown of the same amount of all the sareness of the
conslituse of anything briefly already from him to suppose what she was told any
one of the children. She was sitting over to the stall to this minute.

"What's all strong, as It were to me? You know your second some only three at the
candle of a passion of such cander is sorty to make their position. I'll
speak to you," said Vronsky. "Where you might supper? Thenen your
silence is the conversation was so in the mards, that you, to be for
me to see, I can go in. I've struck you, and so much a staped. I don't say that the
prince he was in her men. I'm a glad with her for answ

In [0]:
# Here we have loaded in a model that trained over 20 epochs `rnn_20_epoch.net`
with open('rnn_x_epoch.net', 'rb') as f:
    checkpoint = torch.load(f)
    
loaded = CharRNN(checkpoint['tokens'], n_hidden=checkpoint['n_hidden'], n_layers=checkpoint['n_layers'])
loaded.load_state_dict(checkpoint['state_dict'])

In [0]:
# Sample using a loaded model
print(sample(loaded, 2000, top_k=5, prime="And Levin said"))

And Levin said:

Her close had been too have a married the strong way. All on the figure of her face
than she was still far importance to herself it seemed so as it was a latter and
wanted.

"I'll come to see you," she said, and stricking all the summer.

"I have taken a little and servow, there was the papirs, the particular as the
servant, and shook home, and then the moning is that it's needed in her his
subject, it arrived with their children.... A man in that mistake of
my worl who has a country, as it was something, I should have asked you
out of a station to time to have been must."

"You see through the simple to me."

"I have not the close to her to be a love of more sid to this."

"Well, were what is he took to the presence, but you were saying. It, as you to
speak and take a mother,"
he said, and hiding her sorts and her husband's
classical peasants, smiling.

"You're such most fining-it, I spoke in her, I, after the same to her. Then, to step her, there was
some since this 