In [1]:
import numpy as np
import torch
from torch import nn
import torch.nn.functional as F

In [2]:
# open text file and read in data as `text`
with open('data/anna.txt', 'r') as f:
    text = f.read()

In [3]:
text[:100]

'Chapter 1\n\n\nHappy families are all alike; every unhappy family is unhappy in its own\nway.\n\nEverythin'

In [4]:
# encode the text and map each character to an integer and vice versa

# we create two dictionaries:
# 1. int2char, which maps integers to characters
# 2. char2int, which maps characters to unique integers
chars = tuple(set(text))
int2char = dict(enumerate(chars))
print(int2char)
char2int = {ch: ii for ii, ch in int2char.items()}
print(char2int)
# encode the text
encoded = np.array([char2int[ch] for ch in text])

{0: 'P', 1: ' ', 2: 'f', 3: 'Y', 4: 'X', 5: 'F', 6: '_', 7: 'H', 8: '!', 9: '6', 10: ')', 11: 'y', 12: '5', 13: '&', 14: '\n', 15: 'v', 16: 'Z', 17: 'a', 18: 'u', 19: "'", 20: 'M', 21: 'I', 22: '%', 23: '1', 24: 't', 25: 'V', 26: 'i', 27: 'G', 28: '$', 29: 'h', 30: 's', 31: 'd', 32: 'U', 33: 'l', 34: 'N', 35: '8', 36: '`', 37: '/', 38: '3', 39: 'J', 40: 'r', 41: 'O', 42: 'c', 43: 'E', 44: '9', 45: '0', 46: 'b', 47: '-', 48: '@', 49: 'p', 50: '"', 51: 'n', 52: '(', 53: 'o', 54: 'w', 55: ',', 56: ';', 57: 'm', 58: '*', 59: 'A', 60: 'g', 61: '.', 62: 'k', 63: 'C', 64: 'R', 65: '7', 66: ':', 67: '4', 68: '?', 69: 'L', 70: 'Q', 71: 'j', 72: 'T', 73: 'x', 74: 'e', 75: 'S', 76: 'D', 77: 'B', 78: 'K', 79: 'q', 80: '2', 81: 'z', 82: 'W'}
{'P': 0, ' ': 1, 'f': 2, 'Y': 3, 'X': 4, 'F': 5, '_': 6, 'H': 7, '!': 8, '6': 9, ')': 10, 'y': 11, '5': 12, '&': 13, '\n': 14, 'v': 15, 'Z': 16, 'a': 17, 'u': 18, "'": 19, 'M': 20, 'I': 21, '%': 22, '1': 23, 't': 24, 'V': 25, 'i': 26, 'G': 27, '$': 28, 'h': 29,

In [33]:
encoded[:100]

array([19, 78, 61, 27, 73, 74, 38, 37,  5, 29, 29, 29,  9, 61, 27, 27, 32,
       37, 58, 61, 17, 31, 64, 31, 74, 16, 37, 61, 38, 74, 37, 61, 64, 64,
       37, 61, 64, 31,  6, 74, 21, 37, 74, 72, 74, 38, 32, 37, 56, 23, 78,
       61, 27, 27, 32, 37, 58, 61, 17, 31, 64, 32, 37, 31, 16, 37, 56, 23,
       78, 61, 27, 27, 32, 37, 31, 23, 37, 31, 73, 16, 37, 53, 34, 23, 29,
       34, 61, 32, 81, 29, 29, 40, 72, 74, 38, 32, 73, 78, 31, 23])

In [34]:
def one_hot_encode(arr, n_labels):
    
    # Initialize the the encoded array
    one_hot = np.zeros((arr.size, n_labels), dtype=np.float32)
    

#crea un array: 3 con aggiuna 8 (3,8)
    
    # Fill the appropriate elements with ones
    one_hot[np.arange(one_hot.shape[0]), arr.flatten()] = 1.

    
    # Finally reshape it to get back to the original array
    one_hot = one_hot.reshape((*arr.shape, n_labels))
    
    return one_hot


In [38]:
# check that the function works as expected
test_seq = np.array([[3, 5, 1]])
one_hot = one_hot_encode(test_seq, 8)

print(one_hot)
#print(test_seq)   #[[3 5 1]]
#print(test_seq.shape)#(1, 3)
#print(test_seq.size)   #3
#print(test_seq.flatten())  #[3 5 1]
#print(test_seq) #[[3 5 1]]
#arange che si trova nel codice sopra è come for, ma parte da dove si vuole, primo argomento, passo nel secondo arg.
#l'array 3,5,1 viene posto come primo arg. 

[[[0. 0. 0. 1. 0. 0. 0. 0.]
  [0. 0. 0. 0. 0. 1. 0. 0.]
  [0. 1. 0. 0. 0. 0. 0. 0.]]]


In [None]:
#starting sequence: [1 2 3 4 5 6 7 8 9 10 11 12]
#Batch size: 2   [1 2 3 4 5 6 ]
#               [7 8 9 10 11 12]
#sequence lenght: 3  [1 2 3 ......
#                    [7 8 9 ......


In [46]:
def get_batches(arr, batch_size, seq_length):
    '''Create a generator that returns batches of size
       batch_size x seq_length from arr.
       
       Arguments
       ---------
       arr: Array you want to make batches from
       batch_size: Batch size, the number of sequences per batch
       seq_length: Number of encoded chars in a sequence
    '''
    
    batch_size_total = batch_size * seq_length
    # total number of batches we can make
    n_batches = len(arr)//batch_size_total
    
    # Keep only enough characters to make full batches
    arr = arr[:n_batches * batch_size_total]
    # Reshape into batch_size rows
    arr = arr.reshape((batch_size, -1))
    
    # iterate through the array, one sequence at a time
    for n in range(0, arr.shape[1], seq_length):
        # The features
        x = arr[:, n:n+seq_length]
        # The targets, shifted by one
        y = np.zeros_like(x)
        try:
            y[:, :-1], y[:, -1] = x[:, 1:], arr[:, n+seq_length]
        except IndexError:
            y[:, :-1], y[:, -1] = x[:, 1:], arr[:, 0]
        yield x, y

In [48]:
batches = get_batches(encoded, 8, 50)
x, y = next(batches)
print(batches)

<generator object get_batches at 0x000000FB93C19E48>


In [43]:
# printing out the first 10 items in a sequence
print('x\n', x[:10, :10])
print('\ny\n', y[:10, :10])

x
 [[19 78 61 27 73 74 38 37  5 29]
 [16 53 23 37 73 78 61 73 37 61]
 [74 23 46 37 53 38 37 61 37 58]
 [16 37 73 78 74 37  3 78 31 74]
 [37 16 61 34 37 78 74 38 37 73]
 [ 3 56 16 16 31 53 23 37 61 23]
 [37 52 23 23 61 37 78 61 46 37]
 [48 25 64 53 23 16  6 32 81 37]]

y
 [[78 61 27 73 74 38 37  5 29 29]
 [53 23 37 73 78 61 73 37 61 73]
 [23 46 37 53 38 37 61 37 58 53]
 [37 73 78 74 37  3 78 31 74 58]
 [16 61 34 37 78 74 38 37 73 74]
 [56 16 16 31 53 23 37 61 23 46]
 [52 23 23 61 37 78 61 46 37 16]
 [25 64 53 23 16  6 32 81 37  7]]


In [None]:
# check if GPU is available
train_on_gpu = torch.cuda.is_available()
if(train_on_gpu):
    print('Training on GPU!')
else: 
    print('No GPU available, training on CPU; consider making n_epochs very small.')

In [None]:
class CharRNN(nn.Module):
    
    def __init__(self, tokens, n_hidden=256, n_layers=2,
                               drop_prob=0.5, lr=0.001):
        super().__init__()
        self.drop_prob = drop_prob
        self.n_layers = n_layers
        self.n_hidden = n_hidden
        self.lr = lr
        
        # creating character dictionaries
        self.chars = tokens
        self.int2char = dict(enumerate(self.chars))
        self.char2int = {ch: ii for ii, ch in self.int2char.items()}
        
        ## TODO: define the LSTM
        self.lstm = nn.LSTM(len(self.chars), n_hidden, n_layers, 
                            dropout=drop_prob, batch_first=True)
        
        ## TODO: define a dropout layer
        self.dropout = nn.Dropout(drop_prob)
        
        ## TODO: define the final, fully-connected output layer
        self.fc = nn.Linear(n_hidden, len(self.chars))
      
    
    def forward(self, x, hidden):
        ''' Forward pass through the network. 
            These inputs are x, and the hidden/cell state `hidden`. '''
                
        ## TODO: Get the outputs and the new hidden state from the lstm
        r_output, hidden = self.lstm(x, hidden)
        
        ## TODO: pass through a dropout layer
        out = self.dropout(r_output)
        
        # Stack up LSTM outputs using view
        # you may need to use contiguous to reshape the output
        out = out.contiguous().view(-1, self.n_hidden)
        
        ## TODO: put x through the fully-connected layer
        out = self.fc(out)
        
        # return the final output and the hidden state
        return out, hidden
    
    
    def init_hidden(self, batch_size):
        ''' Initializes hidden state '''
        # Create two new tensors with sizes n_layers x batch_size x n_hidden,
        # initialized to zero, for hidden state and cell state of LSTM
        weight = next(self.parameters()).data
        
        if (train_on_gpu):
            hidden = (weight.new(self.n_layers, batch_size, self.n_hidden).zero_().cuda(),
                  weight.new(self.n_layers, batch_size, self.n_hidden).zero_().cuda())
        else:
            hidden = (weight.new(self.n_layers, batch_size, self.n_hidden).zero_(),
                      weight.new(self.n_layers, batch_size, self.n_hidden).zero_())
        
        return hidden
        

In [None]:
def train(net, data, epochs=10, batch_size=10, seq_length=50, lr=0.001, clip=5, val_frac=0.1, print_every=10):
    ''' Training a network 
    
        Arguments
        ---------
        
        net: CharRNN network
        data: text data to train the network
        epochs: Number of epochs to train
        batch_size: Number of mini-sequences per mini-batch, aka batch size
        seq_length: Number of character steps per mini-batch
        lr: learning rate
        clip: gradient clipping
        val_frac: Fraction of data to hold out for validation
        print_every: Number of steps for printing training and validation loss
    
    '''
    net.train()
    
    opt = torch.optim.Adam(net.parameters(), lr=lr)
    criterion = nn.CrossEntropyLoss()
    
    # create training and validation data
    val_idx = int(len(data)*(1-val_frac))
    data, val_data = data[:val_idx], data[val_idx:]
    
    if(train_on_gpu):
        net.cuda()
    
    counter = 0
    n_chars = len(net.chars)
    for e in range(epochs):
        # initialize hidden state
        h = net.init_hidden(batch_size)
        
        for x, y in get_batches(data, batch_size, seq_length):
            counter += 1
            
            # One-hot encode our data and make them Torch tensors
            x = one_hot_encode(x, n_chars)
            inputs, targets = torch.from_numpy(x), torch.from_numpy(y)
            
            if(train_on_gpu):
                inputs, targets = inputs.cuda(), targets.cuda()

            # Creating new variables for the hidden state, otherwise
            # we'd backprop through the entire training history
            h = tuple([each.data for each in h])

            # zero accumulated gradients
            net.zero_grad()
            
            # get the output from the model
            output, h = net(inputs, h)
            
            # calculate the loss and perform backprop
            loss = criterion(output, targets.view(batch_size*seq_length).long())
            loss.backward()
            # `clip_grad_norm` helps prevent the exploding gradient problem in RNNs / LSTMs.
            nn.utils.clip_grad_norm_(net.parameters(), clip)
            opt.step()
            
            # loss stats
            if counter % print_every == 0:
                # Get validation loss
                val_h = net.init_hidden(batch_size)
                val_losses = []
                net.eval()
                for x, y in get_batches(val_data, batch_size, seq_length):
                    # One-hot encode our data and make them Torch tensors
                    x = one_hot_encode(x, n_chars)
                    x, y = torch.from_numpy(x), torch.from_numpy(y)
                    
                    # Creating new variables for the hidden state, otherwise
                    # we'd backprop through the entire training history
                    val_h = tuple([each.data for each in val_h])
                    
                    inputs, targets = x, y
                    if(train_on_gpu):
                        inputs, targets = inputs.cuda(), targets.cuda()

                    output, val_h = net(inputs, val_h)
                    val_loss = criterion(output, targets.view(batch_size*seq_length).long())
                
                    val_losses.append(val_loss.item())
                
                net.train() # reset to train mode after iterationg through validation data
                
                print("Epoch: {}/{}...".format(e+1, epochs),
                      "Step: {}...".format(counter),
                      "Loss: {:.4f}...".format(loss.item()),
                      "Val Loss: {:.4f}".format(np.mean(val_losses)))

In [None]:
# define and print the net
n_hidden=512
n_layers=2

net = CharRNN(chars, n_hidden, n_layers)
print(net)

In [None]:
batch_size = 128
seq_length = 100
n_epochs = 20 # start smaller if you are just testing initial behavior

# train the model
train(net, encoded, epochs=n_epochs, batch_size=batch_size, seq_length=seq_length, lr=0.001, print_every=10)

In [None]:
# change the name, for saving multiple files
model_name = 'rnn_20_epoch.net'

checkpoint = {'n_hidden': net.n_hidden,
              'n_layers': net.n_layers,
              'state_dict': net.state_dict(),
              'tokens': net.chars}

with open(model_name, 'wb') as f:
    torch.save(checkpoint, f)

In [None]:
def predict(net, char, h=None, top_k=None):
        ''' Given a character, predict the next character.
            Returns the predicted character and the hidden state.
        '''
        
        # tensor inputs
        x = np.array([[net.char2int[char]]])
        x = one_hot_encode(x, len(net.chars))
        inputs = torch.from_numpy(x)
        
        if(train_on_gpu):
            inputs = inputs.cuda()
        
        # detach hidden state from history
        h = tuple([each.data for each in h])
        # get the output of the model
        out, h = net(inputs, h)

        # get the character probabilities
        p = F.softmax(out, dim=1).data
        if(train_on_gpu):
            p = p.cpu() # move to cpu
        
        # get top characters
        if top_k is None:
            top_ch = np.arange(len(net.chars))
        else:
            p, top_ch = p.topk(top_k)
            top_ch = top_ch.numpy().squeeze()
        
        # select the likely next character with some element of randomness
        p = p.numpy().squeeze()
        char = np.random.choice(top_ch, p=p/p.sum())
        
        # return the encoded value of the predicted char and the hidden state
        return net.int2char[char], h

In [None]:
def sample(net, size, prime='The', top_k=None):
        
    if(train_on_gpu):
        net.cuda()
    else:
        net.cpu()
    
    net.eval() # eval mode
    
    # First off, run through the prime characters
    chars = [ch for ch in prime]
    h = net.init_hidden(1)
    for ch in prime:
        char, h = predict(net, ch, h, top_k=top_k)

    chars.append(char)
    
    # Now pass in the previous character and get a new one
    for ii in range(size):
        char, h = predict(net, chars[-1], h, top_k=top_k)
        chars.append(char)

    return ''.join(chars)

In [None]:
print(sample(net, 1000, prime='Anna', top_k=5))