In [1]:
import numpy as np
import torch
import torch.nn.functional as F
from torch import nn

In [2]:
# open text
with open('../../deep-learning-v2-pytorch/recurrent-neural-networks/char-rnn/data/anna.txt', 'r') as f:
    text = f.read()

In [3]:
# first 100
text[:100]

'Chapter 1\n\n\nHappy families are all alike; every unhappy family is unhappy in its own\nway.\n\nEverythin'

In [4]:
# Tokenization
chars = tuple(set(text))
int2char = dict(enumerate(chars))
char2int = {ch: ii for ii, ch in int2char.items()}

# encode
encoded = np.array([char2int[ch] for ch in text])

In [5]:
encoded[:100]

array([77, 68, 70, 25,  5, 55, 81, 60, 39, 82, 82, 82, 53, 70, 25, 25, 26,
       60, 79, 70, 23, 27,  8, 27, 55,  7, 60, 70, 81, 55, 60, 70,  8,  8,
       60, 70,  8, 27, 64, 55,  6, 60, 55,  9, 55, 81, 26, 60, 40, 76, 68,
       70, 25, 25, 26, 60, 79, 70, 23, 27,  8, 26, 60, 27,  7, 60, 40, 76,
       68, 70, 25, 25, 26, 60, 27, 76, 60, 27,  5,  7, 60, 42, 24, 76, 82,
       24, 70, 26,  2, 82, 82, 71,  9, 55, 81, 26,  5, 68, 27, 76])

In [6]:
# one hot encode
def one_hot_encode(arr, labels):
    one_hot = np.zeros((np.multiply(*arr.shape),labels), dtype=np.float32 )
    one_hot[np.arange(one_hot.shape[0]), arr.flatten()] = 1
    one_hot = one_hot.reshape((*arr.shape, labels))
    return one_hot

In [8]:
test_seq = np.array([[1,2,5]])
one_hot = one_hot_encode(test_seq, 8)
one_hot

array([[[0., 1., 0., 0., 0., 0., 0., 0.],
        [0., 0., 1., 0., 0., 0., 0., 0.],
        [0., 0., 0., 0., 0., 1., 0., 0.]]], dtype=float32)

In [9]:
def get_batches(arr, batch_size, seq_length):
    # get the number of full batches
    batch_size_total = batch_size * seq_length
    n_batches = len(arr)//batch_size_total
    # keep only enough chars to make full batches
    arr = arr[:n_batches * batch_size_total]
    # reshape into {batch_size} rows
    arr = arr.reshape((batch_size, -1))
    # loop through the batches using a seq length of 3
    for n in range(0, arr.shape[1], seq_length):
        # features
        x = arr[:, n:n+seq_length]
        # targets
        y = np.zeros_like(x)
        try:
            y[:, :-1], y[:, -1] = x[:, 1:], arr[:, n+seq_length]
        except IndexError:
            y[:, :-1], y[:, -1] = x[:, 1:], arr[:, 0]
        yield x, y

In [10]:
batches = get_batches(encoded, 8, 50)
x, y = next(batches)

In [11]:
# printing out the first 10 items in a sequence
print('x\n', x[:10, :10])
print('\ny\n', y[:10, :10])

x
 [[77 68 70 25  5 55 81 60 39 82]
 [ 7 42 76 60  5 68 70  5 60 70]
 [55 76 11 60 42 81 60 70 60 79]
 [ 7 60  5 68 55 60 57 68 27 55]
 [60  7 70 24 60 68 55 81 60  5]
 [57 40  7  7 27 42 76 60 70 76]
 [60 78 76 76 70 60 68 70 11 60]
 [63 32  8 42 76  7 64 26  2 60]]

y
 [[68 70 25  5 55 81 60 39 82 82]
 [42 76 60  5 68 70  5 60 70  5]
 [76 11 60 42 81 60 70 60 79 42]
 [60  5 68 55 60 57 68 27 55 79]
 [ 7 70 24 60 68 55 81 60  5 55]
 [40  7  7 27 42 76 60 70 76 11]
 [78 76 76 70 60 68 70 11 60  7]
 [32  8 42 76  7 64 26  2 60 28]]


## Implementation

In [12]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'

In [13]:
class CharRNN(nn.Module):
    def __init__(self, tokens, n_hidden=256, n_layers=2, drop_prob=0.3, lr=0.001):
            super().__init__()
            self.drop_prob = drop_prob
            self.n_layers = n_layers
            self.n_hidden = n_hidden
            self.lr = lr
            
            #create char dictionaries
            self.chars = tokens
            self.int2char = dict(enumerate(self.chars))
            self.char2int = {ch: ii for ii, ch in self.int2char.items()}
            
            # model layers
            self.lstm = nn.LSTM(len(self.chars),n_hidden, n_layers, dropout=drop_prob, batch_first=True)
            self.dropout = nn.Dropout(drop_prob)
            self.fc = nn.Linear(n_hidden, len(self.chars))
            
    def forward(self, x, hidden):
        r_output, hidden = self.lstm(x, hidden)
        out = self.dropout(r_output)
        out = out.reshape(-1, self.n_hidden)
        out = self.fc(out)
        return out, hidden
    
    def init_hidden(self, batch_size):
        '''initializes hidden state'''
        weight = next(self.parameters()).data
        hidden = (weight.new(self.n_layers, batch_size, self.n_hidden).zero_().to(device),
                  weight.new(self.n_layers, batch_size, self.n_hidden).zero_().to(device))

        return hidden
                

In [14]:
def train(net, data, epochs=10, batch_size=10, seq_length=50, lr=0.001, clip=5, val_frac=0.1, print_every=10):
    ''' Training a network 
    
        Arguments
        ---------
        
        net: CharRNN network
        data: text data to train the network
        epochs: Number of epochs to train
        batch_size: Number of mini-sequences per mini-batch, aka batch size
        seq_length: Number of character steps per mini-batch
        lr: learning rate
        clip: gradient clipping
        val_frac: Fraction of data to hold out for validation
        print_every: Number of steps for printing training and validation loss
    
    '''
    net.train()
    
    opt = torch.optim.Adam(net.parameters(), lr=lr)
    criterion = nn.CrossEntropyLoss()
    
    # create training and validation data
    val_idx = int(len(data)*(1-val_frac))
    data, val_data = data[:val_idx], data[val_idx:]
    net.to(device)
    counter = 0
    n_chars = len(net.chars)
    for e in range(epochs):
        # initialize hidden state
        h = net.init_hidden(batch_size)
        
        for x, y in get_batches(data, batch_size, seq_length):
            counter += 1
            
            # One-hot encode our data and make them Torch tensors
            x = one_hot_encode(x, n_chars)
            inputs, targets = torch.from_numpy(x), torch.from_numpy(y)
            inputs, targets = inputs.to(device), targets.to(device)
            # Creating new variables for the hidden state, otherwise
            # we'd backprop through the entire training history
            h = tuple([each.data for each in h])

            # zero accumulated gradients
            net.zero_grad()
            
            # get the output from the model
            output, h = net(inputs, h)
            
            # calculate the loss and perform backprop
            loss = criterion(output, targets.view(batch_size*seq_length).long())
            loss.backward()
            # `clip_grad_norm` helps prevent the exploding gradient problem in RNNs / LSTMs.
            nn.utils.clip_grad_norm_(net.parameters(), clip)
            opt.step()
            
            # loss stats
            if counter % print_every == 0:
                # Get validation loss
                val_h = net.init_hidden(batch_size)
                val_losses = []
                net.eval()
                for x, y in get_batches(val_data, batch_size, seq_length):
                    # One-hot encode our data and make them Torch tensors
                    x = one_hot_encode(x, n_chars)
                    x, y = torch.from_numpy(x), torch.from_numpy(y)
                    
                    # Creating new variables for the hidden state, otherwise
                    # we'd backprop through the entire training history
                    val_h = tuple([each.data for each in val_h])
                    
                    inputs, targets = x, y
                    inputs, targets = inputs.to(device), targets.to(device)

                    output, val_h = net(inputs, val_h)
                    val_loss = criterion(output, targets.view(batch_size*seq_length).long())
                
                    val_losses.append(val_loss.item())
                
                net.train() # reset to train mode after iterationg through validation data
                
                print("Epoch: {}/{}...".format(e+1, epochs),
                      "Step: {}...".format(counter),
                      "Loss: {:.4f}...".format(loss.item()),
                      "Val Loss: {:.4f}".format(np.mean(val_losses)))

## Model Instantiation

In [15]:
n_hidden=512
n_layers=2

net = CharRNN(chars, n_hidden, n_layers)
print(net)

CharRNN(
  (lstm): LSTM(83, 512, num_layers=2, batch_first=True, dropout=0.3)
  (dropout): Dropout(p=0.3, inplace=False)
  (fc): Linear(in_features=512, out_features=83, bias=True)
)


## Training

In [None]:
batch_size = 128
seq_length = 100
n_epochs = 20
# train the model
train(net, encoded, epochs=n_epochs, batch_size=batch_size, seq_length=seq_length, lr=0.001, print_every=10)

Epoch: 1/2... Step: 10... Loss: 3.2191... Val Loss: 3.1930
Epoch: 1/2... Step: 20... Loss: 3.1264... Val Loss: 3.1351
Epoch: 1/2... Step: 30... Loss: 3.1263... Val Loss: 3.1233
Epoch: 1/2... Step: 40... Loss: 3.1017... Val Loss: 3.1184
Epoch: 1/2... Step: 50... Loss: 3.1350... Val Loss: 3.1159
Epoch: 1/2... Step: 60... Loss: 3.1070... Val Loss: 3.1115
Epoch: 1/2... Step: 70... Loss: 3.0893... Val Loss: 3.1025
Epoch: 1/2... Step: 80... Loss: 3.0938... Val Loss: 3.0796
Epoch: 1/2... Step: 90... Loss: 3.0468... Val Loss: 3.0273
Epoch: 1/2... Step: 100... Loss: 2.9543... Val Loss: 2.9409
Epoch: 1/2... Step: 110... Loss: 2.8763... Val Loss: 2.8498
Epoch: 1/2... Step: 120... Loss: 2.7378... Val Loss: 2.7862
Epoch: 1/2... Step: 130... Loss: 2.6755... Val Loss: 2.6383
Epoch: 2/2... Step: 140... Loss: 2.5827... Val Loss: 2.5478
Epoch: 2/2... Step: 150... Loss: 2.5160... Val Loss: 2.4915
Epoch: 2/2... Step: 160... Loss: 2.4570... Val Loss: 2.4395
Epoch: 2/2... Step: 170... Loss: 2.4142... Val Lo