# Simple Character Level LSTM Text Generator  

Trains character by character on text, and then generates new text character by character.  The training is done with the public domain text of Leo Tolstoi's novel Anna Karenina.

In [2]:
import torch
from torch import nn
import torch.nn.functional as F 
import numpy as np

## Training Data
Load in the training book text and encode it as numbers

In [12]:
# Read file
with open("anna_karenina_text.txt") as bk:
    txt = bk.read()

# Map each character to intergers and provide 2 way mapping dictionaries
chars = tuple(set(txt))
int2char = dict(enumerate(chars))
char2int = {char: i for i, char in int2char.items() }
encoded = np.array([char2int[char] for char in txt])

# Sanity Check:
print("Beginning of text: ", txt[:50], " ...")
print("--------------------")
print("Beginning of encoded: ", encoded[:50], " ...")


Beginning of text:  Chapter 1


Happy families are all alike; every un  ...
--------------------
Beginning of encoded:  [33 73 42  9 32 18 51 29 52 15 15 15 39 42  9  9 36 29 26 42  5 58  6 58 18
 53 29 42 51 18 29 42  6  6 29 42  6 58 23 18 40 29 18 38 18 51 36 29 63 66]  ...


## Preprocess inputs to be one-hot encoded

In [92]:
def one_hot_encode(array, nr_labels):
    '''
    Function to create one-hot encoded array where each characted is one hot-encoded as a column 
    '''

    # Creat array
    one_hot_array = np.zeros((np.multiply(*array.shape), nr_labels), dtype=np.float32)
    #one_hot_array = np.zeros((np.multiply(*array.shape), nr_labels), dtype=np.long)

    # Fill in ones
    one_hot_array[np.arange(one_hot_array.shape[0]), array.flatten()] = 1

    one_hot_array = one_hot_array.reshape((*array.shape, nr_labels))

    return one_hot_array

## Split the training data into mini-batches and sequences

In [93]:
def create_batches(array, nr_sequences, nr_steps):
    ''' 
    Create batches with number of sequences (nr_sequences) & number of steps (nr_steps) from array
    '''
    # Size of one batch
    batch_size = nr_sequences * nr_steps

    # How many batches can be made
    nr_batches = len(array) // batch_size

    # Crop to integer multiple of batches
    array = array[: nr_batches * batch_size]
    
    # Reshape
    array = array.reshape((nr_sequences, -1))

    # Loop to create sequences
    for i in range(0, array.shape[1], nr_steps):
        x = array[:, i:i + nr_steps]
        y = np.zeros_like(x)
        try:
            y[:, :-1], y[:, -1] = x[:, 1: ], array[:, i + nr_steps]
        except IndexError:
            y[:, :-1], y[:, -1] = x[:, 1: ], array[:, 0]
        yield x,y


### Sanity Check on batches and sequences

In [87]:
batches = create_batches(encoded, 10, 50)
x, y = next(batches)
print("x:")
print(x[:10, :10])
print("y:")
print(y[:10, :10])

x:
[[33 73 42  9 32 18 51 29 52 15]
 [29 42  5 29 66 71 32 29 16 71]
 [38 58 66 34 15 15 70 69 18 53]
 [66 29 24 63 51 58 66 16 29 73]
 [29 58 32 29 58 53 27 29 53 58]
 [29 12 32 29 56 42 53 15 71 66]
 [73 18 66 29 43 71  5 18 29 26]
 [40 29 61 63 32 29 66 71 56 29]
 [32 29 58 53 66 65 32 34 29 79]
 [29 53 42 58 24 29 32 71 29 73]]
y:
[[73 42  9 32 18 51 29 52 15 15]
 [42  5 29 66 71 32 29 16 71 58]
 [58 66 34 15 15 70 69 18 53 27]
 [29 24 63 51 58 66 16 29 73 58]
 [58 32 29 58 53 27 29 53 58 51]
 [12 32 29 56 42 53 15 71 66  6]
 [18 66 29 43 71  5 18 29 26 71]
 [29 61 63 32 29 66 71 56 29 53]
 [29 58 53 66 65 32 34 29 79 73]
 [53 42 58 24 29 32 71 29 73 18]]


## Define the LSTM Network

In [95]:
class characterLSTM(nn.Module):
    '''
    Character Level Text Generator LSTM
    '''
    def __init__(self, chars, nr_steps = 100, nr_hidden = 256, nr_layers = 2, dropout_prob = 0.5, lr = 0.001):

        super().__init__()
        self.dropout_prob = dropout_prob
        self.nr_layers = nr_layers
        self.nr_hidden = nr_hidden
        self.lr = lr

        # Make mapping dictionaries
        self.chars = chars
        self.int2char = dict(enumerate(chars))
        self.char2int = {char: i for i, char in self.int2char.items() }

        # LSTM Cell Definition
        self.lstm = nn.LSTM(len(self.chars), self.nr_hidden, self.nr_layers, dropout = self.dropout_prob, batch_first = True)

        # Add dropout layer to reduce likelihood of overfitting
        self.dropout = nn.Dropout(self.dropout_prob)

        # Final linear fully connected layer for output
        self.fc = nn.Linear(self.nr_hidden, len(self.chars))

        # Initialize weights
        self.initialize_weights()

    def forward(self, x, hc):
        '''
        Forward pass thru the network with inputs (x) and hidden cell state (hc)
        '''

        # Get x and hidden state from LSTM
        x, (h, c) = self.lstm(x, hc)

        # Pass thru dropout
        x = self.dropout(x)

        # Stack up LSTM outputs
        print("x.size()[0] before: ", x.size()[0])
        print("x.size()[1] before: ", x.size()[1])
        print("self.nr_hidden: ", self.nr_hidden)
        # x = x.view(x.size()[0] * x.size()[1], self.nr_hidden)
        x = x.reshape(x.size()[0] * x.size()[1], self.nr_hidden)
        print("x.size()[0] after: ", x.size()[0])
        print("x.size()[1] after: ", x.size()[1])

        # Pass thru fully connected layer
        x = self.fc(x)

        # Return x and hidden state
        return x, (h, c)

    def predict(self, char, h=None,  top_k=None):
        '''
        Predict next character
        '''

        if h is None:
            h = self.initialize_hidden_layer(1)

        x = np.array([[self.char2int[char]]])
        x = one_hot_encode(x, len(self.chars))
        
        # Convert to input Tensor
        inputs = torch.from_numpy(x).type(torch.FloatTensor)

        # Run forward pass
        h = tuple([each.data for each in h])
        out, h = self.forward(inputs, h)
    
        # Run thru Softmax
        p = F.softmax(out, dim=1).data

        if top_k is None:
            top_char = np.arange(len(self.chars))
        else:
            p, top_char = p.topk(top_k)
            top_char = top_char.numpy().squeeze()

        p = p.numpy().squeeze()
        char = np.random.choice(top_char, p = p/p.sum())

        return self.int2char[char], h

    def initialize_weights(self):
        ''' 
        Initialize fully connected layer weights 
        '''
        
        # Set bias tensor = 0
        self.fc.bias.data.fill_(0)
        # Random fully connected weights
        self.fc.weight.data.uniform_(-1, 1)
        
    def initialize_hidden_layer(self, nr_sequences):
        ''' 
        Initializes hidden state 
        '''
        # Tensors with sizes nr_layers x nr_sequences x nr_hidden initialzied to zero
        weight = next(self.parameters()).data
        return (weight.new(self.nr_layers, nr_sequences, self.nr_hidden).zero_(),
                weight.new(self.nr_layers, nr_sequences, self.nr_hidden).zero_())


# Train the network

Define training function

In [121]:
def train(net, train_data, nr_epochs=10, nr_sequences=10, nr_steps=50, lr=0.001, gradient_clip=5, val_data_fraction=0.1, print_every=10):
    ''' 
    Train the network     
    '''
    
    net.train()
    # Use Adam and Cross Entropy Loss
    opt = torch.optim.Adam(net.parameters(), lr=lr)
    criterion = nn.CrossEntropyLoss()
    
    # Separate data and validation data
    val_idx = int(len(train_data) * (1-val_data_fraction) )
    train_data, val_data = train_data[:val_idx], train_data[val_idx:]
        
    counter = 0
    nr_chars = len(net.chars)
    for e in range(nr_epochs):
        h = net.initialize_hidden_layer(nr_sequences)
        for x, y in create_batches(train_data, nr_sequences, nr_steps):
            counter += 1
            #print("type(x): ", type(x))
            #print("x.shape: ", x.shape)

            # One-hot encode train_data
            x = one_hot_encode(x, nr_chars)
            #print("after one_hot_encode:")
            #print("type(x): ", type(x))
            #print("x.shape: ", x.shape)
            #print("type(x[0,0,0]): ", type(x[0,0,0]))
            
            # Convert to Tensor
            inputs, targets = torch.from_numpy(x).type(torch.LongTensor), torch.from_numpy(y).type(torch.LongTensor)
            print("type(inputs): ", type(inputs))
            print("inputs.shape: ", inputs.shape)            

            # New variables for the hidden state
            h = tuple([each.data for each in h])
            print("type(h): ", type(h))
            print("type(h[0]):", type(h[0]))
            print("h: ", h)
            net.zero_grad()
            
            # Forward Pass
            output, h = net.forward(inputs, h)

            # Calculate loss
            loss = criterion(output, targets.view(nr_sequences * nr_steps))

            # Back propagate
            loss.backward()
            
            # Use clip_grad_norm to prevent exploding gradient problem
            nn.utils.clip_grad_norm_(net.parameters(), gradient_clip)

            opt.step()
            
            if counter % print_every == 0:
                
                # Get validation loss
                val_h = net.init_hidden(nr_sequences)
                val_losses = []
                for x, y in create_batches(val_data, nr_sequences, nr_steps):
                    # One-hot encode data
                    x = one_hot_encode(x, nr_chars)
                    # Conver to Tensors
                    x, y = torch.from_numpy(x).type(torch.FloatTensor), torch.from_numpy(y).type(torch.FloatTensor)
                    
                    # New variables for the hidden state
                    val_h = tuple([each.data for each in val_h])
                    
                    inputs, targets = x, y

                    output, val_h = net.forward(inputs, val_h)
                    val_loss = criterion(output, targets.view(nr_sequences * nr_steps))                
                    val_losses.append(val_loss.item())
                
                print("Epoch #: {}/{}...".format(e+1, nr_epochs),
                      "Step #: {}...".format(counter),
                      "Loss: {:.4f}...".format(loss.item()),
                      "Validation Loss: {:.4f}".format(np.mean(val_losses)))

Do actual training


In [122]:
# Clean up old stuff
if 'net' in locals():
    del net

# Instantiate new network
net = characterLSTM(chars, nr_hidden=512, nr_layers=2)

# Print the network
print(net)

characterLSTM(
  (lstm): LSTM(83, 512, num_layers=2, batch_first=True, dropout=0.5)
  (dropout): Dropout(p=0.5, inplace=False)
  (fc): Linear(in_features=512, out_features=83, bias=True)
)


In [123]:
nr_sequences = 128
nr_steps = 100

# Train
train(net, encoded, nr_epochs=1, nr_sequences = nr_sequences, nr_steps = nr_steps, lr=0.001, print_every=10)

type(inputs):  <class 'torch.Tensor'>
inputs.shape:  torch.Size([128, 100, 83])
type(h):  <class 'tuple'>
type(h[0]): <class 'torch.Tensor'>
h:  (tensor([[[0., 0., 0.,  ..., 0., 0., 0.],
         [0., 0., 0.,  ..., 0., 0., 0.],
         [0., 0., 0.,  ..., 0., 0., 0.],
         ...,
         [0., 0., 0.,  ..., 0., 0., 0.],
         [0., 0., 0.,  ..., 0., 0., 0.],
         [0., 0., 0.,  ..., 0., 0., 0.]],

        [[0., 0., 0.,  ..., 0., 0., 0.],
         [0., 0., 0.,  ..., 0., 0., 0.],
         [0., 0., 0.,  ..., 0., 0., 0.],
         ...,
         [0., 0., 0.,  ..., 0., 0., 0.],
         [0., 0., 0.,  ..., 0., 0., 0.],
         [0., 0., 0.,  ..., 0., 0., 0.]]]), tensor([[[0., 0., 0.,  ..., 0., 0., 0.],
         [0., 0., 0.,  ..., 0., 0., 0.],
         [0., 0., 0.,  ..., 0., 0., 0.],
         ...,
         [0., 0., 0.,  ..., 0., 0., 0.],
         [0., 0., 0.,  ..., 0., 0., 0.],
         [0., 0., 0.,  ..., 0., 0., 0.]],

        [[0., 0., 0.,  ..., 0., 0., 0.],
         [0., 0., 0.,  ...

RuntimeError: Expected object of scalar type Long but got scalar type Float for argument #3 'mat2' in call to _th_addmm_out

## Save the Model

In [66]:
model_name = "lstm_try_1.net"

checkpoint = {'nr_hidden': net.nr_hidden,
              'nr_layers': net.nr_layers,
              'state_dict': net.state_dict(),
              'chars': net.chars}

with open(model_name, 'wb') as f:
    torch.save(checkpoint, f)

## Load the model back

In [67]:
model_name = "lstm_try_1.net"
with open(model_name, 'rb') as f:
    checkpoint = torch.load(f)

loaded_net = characterLSTM(checkpoint['chars'], nr_hidden = checkpoint['nr_hidden'], nr_layers = checkpoint['nr_layers'])
loaded_net.load_state_dict(checkpoint['state_dict'])



<All keys matched successfully>

## Generate

In [68]:
def generate(net, size, prime='In the beginning', top_k=None):

    net.eval()

    # Start with prime characters
    chars = [c for c in prime]
    h = net.initialize_hidden_layer(1)
    for c in prime:
        char, h = net.predict(c, h, top_k=top_k)

    chars.append(char)
    
    # Pass in previous character and generate the next
    for i in range(size):
        char, h = net.predict(chars[-1], h, top_k=top_k)
        chars.append(char)

    return ''.join(chars)

In [69]:
print(generate(net, 2000, top_k=5))

nr_hidden:  512
x.size()[0]:  1
x.size()[1]:  1
seld.nr_hidden:  512
x.size()[0]:  1
x.size()[1]:  1
seld.nr_hidden:  512
x.size()[0]:  1
x.size()[1]:  1
seld.nr_hidden:  512
x.size()[0]:  1
x.size()[1]:  1
seld.nr_hidden:  512
x.size()[0]:  1
x.size()[1]:  1
seld.nr_hidden:  512
x.size()[0]:  1
x.size()[1]:  1
seld.nr_hidden:  512
x.size()[0]:  1
x.size()[1]:  1
seld.nr_hidden:  512
x.size()[0]:  1
x.size()[1]:  1
seld.nr_hidden:  512
x.size()[0]:  1
x.size()[1]:  1
seld.nr_hidden:  512
x.size()[0]:  1
x.size()[1]:  1
seld.nr_hidden:  512
x.size()[0]:  1
x.size()[1]:  1
seld.nr_hidden:  512
x.size()[0]:  1
x.size()[1]:  1
seld.nr_hidden:  512
x.size()[0]:  1
x.size()[1]:  1
seld.nr_hidden:  512
x.size()[0]:  1
x.size()[1]:  1
seld.nr_hidden:  512
x.size()[0]:  1
x.size()[1]:  1
seld.nr_hidden:  512
x.size()[0]:  1
x.size()[1]:  1
seld.nr_hidden:  512
x.size()[0]:  1
x.size()[1]:  1
seld.nr_hidden:  512
x.size()[0]:  1
x.size()[1]:  1
seld.nr_hidden:  512
x.size()[0]:  1
x.size()[1]:  