In [18]:
import pandas as pd
import numpy as np
import torch
from torch import nn
import torch.nn.functional as F

In [23]:
# Get jokes as a numpy array
joke_data = pd.read_csv("https://raw.githubusercontent.com/amoudgl/short-jokes-dataset/master/data/reddit-cleanjokes.csv")['Joke']
joke_data = joke_data.to_numpy()

# Create a new array of jokes, with a '\n' interspersed between each joke
jokes = []
for joke in joke_data:
    for ch in joke:
        jokes.append(ch)
    jokes.append('\n')
    
print(jokes[:100])

['W', 'h', 'a', 't', ' ', 'd', 'i', 'd', ' ', 't', 'h', 'e', ' ', 'b', 'a', 'r', 't', 'e', 'n', 'd', 'e', 'r', ' ', 's', 'a', 'y', ' ', 't', 'o', ' ', 't', 'h', 'e', ' ', 'j', 'u', 'm', 'p', 'e', 'r', ' ', 'c', 'a', 'b', 'l', 'e', 's', '?', ' ', 'Y', 'o', 'u', ' ', 'b', 'e', 't', 't', 'e', 'r', ' ', 'n', 'o', 't', ' ', 't', 'r', 'y', ' ', 't', 'o', ' ', 's', 't', 'a', 'r', 't', ' ', 'a', 'n', 'y', 't', 'h', 'i', 'n', 'g', '.', '\n', 'D', 'o', 'n', "'", 't', ' ', 'y', 'o', 'u', ' ', 'h', 'a', 't']


In [20]:
# Check if cuda is available
device = 'cuda' if torch.cuda.is_available() else 'cpu'
print(device)

cuda


In [12]:
def get_batch(arr, batch_size, seq_length):
    ''' Function for generating batches (of inputs and outputs)
    '''
    
    chars_per_batch = batch_size * seq_length
    n_batches = arr.size//chars_per_batch
    
    # Cuts off additional numbers from the end of the array and reshape into batch-size rows
    arr = arr[:n_batches * chars_per_batch]
    arr = arr.reshape((batch_size, -1))
    
    # Iterates through and gets minibatches (targets and features)
    for n in range(0, arr.shape[1], seq_length):
        x = arr[:, n:n+seq_length] # Features
        y = np.zeros_like(x) # Targets
        
        # Y should be x shifted by one
        try:
            y[:, :-1], y[:, -1] = x[:, 1:], arr[:, n+seq_length]
        except IndexError:
            y[:, :-1], y[:, -1] = x[:, 1:], arr[:, 0]
            
        yield x, y

In [21]:
# Function for one_hot encoding numpy arrays using integer-encoded characters
def one_hot_encode(arr, n_labels):
    
    arr = arr.astype('int')
    one_hot = np.zeros((arr.size, n_labels), dtype=np.float32)
    
    one_hot[np.arange(one_hot.shape[0]), arr.flatten()] = 1.
    one_hot = one_hot.reshape((*arr.shape, n_labels))
    
    return one_hot

In [25]:
# Define model architecture
class lstm(nn.Module):
    def __init__(self, chars, batch_size=10, seq_length=10, hidden_size=256, n_layers=2, batch_first=True, dropout_p=0.3, lr=0.001):
        super().__init__()
        
        self.chars = chars
        self.hidden_size = hidden_size
        self.n_layers = n_layers
        self.batch_first = batch_first
        self.dropout_p = dropout_p
        self.lr = lr
        self.n_chars = len(chars)
        
        '''
            Architecture: LSTM layers -> dropout layer -> fully connected layer
            
            chars = set of chars
            hidden_size = the number of features in the hidden state of LSTMs
            
            n_layers = the number of LSTM layers we want to use
            batch_first = whether the batch is so it outputs in this order (batch, seq, feature)
        '''
        
        # Dictionaries for character conversion - we need to load these in our checkpoint in order to convert the output.
        self.int2char = dict(enumerate(chars))
        self.char2int = {ch: ii for ii, ch in self.int2char.items()}
        
        # Layers
        self.lstm = nn.LSTM(self.n_chars, hidden_size, n_layers, dropout=dropout_p, batch_first=batch_first)
        self.dropout = nn.Dropout(dropout_p)
        self.fc = nn.Linear(hidden_size, self.n_chars)
        
    def forward(self, x, hidden):
        
        x, hidden = self.lstm(x, hidden)
        x = self.dropout(x)
        x = x.contiguous().reshape(-1, self.hidden_size)
        x = self.fc(x)
        
        return x, hidden
        
    def init_hidden_state(self, batch_size):
        weight = next(self.parameters()).data
        
        if (device == 'cuda'):
            hidden = (weight.new(self.n_layers, batch_size, self.hidden_size).zero_().cuda(),
                  weight.new(self.n_layers, batch_size, self.hidden_size).zero_().cuda())
        else:
            hidden = (weight.new(self.n_layers, batch_size, self.hidden_size).zero_(),
                      weight.new(self.n_layers, batch_size, self.hidden_size).zero_())
            
        return hidden

In [6]:
# Train model
def train_model(data, chars, net, batch_size=10, seq_length=10,
                epochs=20, val_frac=0.9, lr=0.001, clip=5):
    
    ''' Function for training your model.
    
        data: the array of jokes (text), encoded into integers
        net: the network that will be trained
        batch_size: the number of sequences per batch
        seq_length: number of charaters per sequence (should be shorter than the average joke)
        epochs: the number of epochs
        val_frac: the percentage of data that will be used in the training set (the remaining data
            will be used for testing)
        lr: learning_rate
        clip: gradient clipping
    '''
    
    encoded = np.array([net.char2int[ch] for ch in data])
    
    # Split data into training and testing sets
    val_split = int(len(encoded) * val_frac)
    train_data, val_data = encoded[:val_split], encoded[val_split:]
    
    # Defining criterion and optimizer
    criterion = nn.CrossEntropyLoss()
    opt = torch.optim.Adam(net.parameters(), lr)
    
    # Set up the network
    net.train()
    net = net.to(device)
    
    best_state_dict = net.state_dict()
    counter = 0 # Counts steps

    print("Time for training!")
    
    for epoch in range(epochs):
        hidden = net.init_hidden_state(batch_size)
        val_losses_total = [] # val losses for an entire round of validation (every batch)
        training_losses = [] # val losses for an entire round of training
        
        for x, y in get_batch(train_data, batch_size, seq_length):
            counter += 1
            # Encode and prepare data
            x = one_hot_encode(x, net.n_chars)
            x, y = torch.from_numpy(x), torch.from_numpy(y)
            
            x, y = x.to(device), y.to(device)
                
            hidden = tuple([each.data for each in hidden])
            
            # Forward Pass
            net.zero_grad()
            predictions, hidden = net(x, hidden)
            
            loss = criterion(predictions, y.reshape(batch_size * seq_length).long())
            loss.backward()
            
            nn.utils.clip_grad_norm_(net.parameters(), clip)
            opt.step()
            
            training_losses.append(loss.detach().cpu())
    
            # Validation Pass - Calculate
            if counter % 200 == 0:
                val_h = net.init_hidden_state(batch_size)
                net.eval()
                val_losses = []
                
                for x, y in get_batch(val_data, batch_size, seq_length):
                    x = one_hot_encode(x, net.n_chars)

                    x, y = torch.from_numpy(x), torch.from_numpy(y)
                    
                    x, y = x.to(device), y.to(device)

                    val_h = tuple([each.data for each in hidden])

                    val_pred, val_h = net(x, val_h)
                    val_loss = criterion(val_pred, y.reshape(batch_size * seq_length).long())
                    val_losses.append(val_loss.detach().cpu())

                net.train()
                
                val_loss_mean = np.mean(val_losses)
                # If the validation loss is the smallest, update the best state dict for the checkpoint
                if val_losses_total:
                    if (val_loss_mean < min(val_losses_total)):
                        best_state_dict = net.state_dict()
                
                val_losses_total.append(val_loss_mean)

                print(f"Epoch: {epoch+1}/{epochs}...",
                            f"Step: {counter}...",
                            f"Training Loss: {np.mean(training_losses):.4f}...",
                            f"Validation Loss: {val_loss_mean:.4f}")
                
    print(f"Done! Lowest val loss: {min(val_losses_total)}")
    
    return best_state_dict

In [29]:
# Hyperparameters
batch_size = 30
seq_length = 10

hidden_size = 512
n_layers = 3

dropout_p = 0.5
epochs = 10
val_frac = 0.9

# Get the chars
chars = set([])
for joke in jokes:
    for char in joke:
        chars.add(char)
chars = tuple(chars)

# Checkpoint

net = lstm(chars, batch_size=batch_size, seq_length=seq_length, hidden_size=hidden_size,
              dropout_p=dropout_p, n_layers=n_layers)
state_dict = train_model(jokes, chars, net, epochs=epochs, batch_size=batch_size,
                         seq_length=seq_length, val_frac=val_frac)

checkpoint = {'hidden_size': net.hidden_size,
              'n_layers': net.n_layers,
              'tokens': net.chars,
              'state_dict': state_dict}

Time for training!
Epoch: 1/10... Step: 50... Training Loss: 3.4361... Validation Loss: 3.2140
Epoch: 1/10... Step: 100... Training Loss: 3.3379... Validation Loss: 3.2063
Epoch: 1/10... Step: 150... Training Loss: 3.3143... Validation Loss: 3.2105
Epoch: 1/10... Step: 200... Training Loss: 3.2921... Validation Loss: 3.1842
Epoch: 1/10... Step: 250... Training Loss: 3.2550... Validation Loss: 2.9667
Epoch: 1/10... Step: 300... Training Loss: 3.1885... Validation Loss: 2.7903
Epoch: 1/10... Step: 350... Training Loss: 3.1163... Validation Loss: 2.7145
Epoch: 2/10... Step: 400... Training Loss: 2.5621... Validation Loss: 2.6349
Epoch: 2/10... Step: 450... Training Loss: 2.5580... Validation Loss: 2.5561
Epoch: 2/10... Step: 500... Training Loss: 2.5178... Validation Loss: 2.5216
Epoch: 2/10... Step: 550... Training Loss: 2.5047... Validation Loss: 2.5039
Epoch: 2/10... Step: 600... Training Loss: 2.4781... Validation Loss: 2.4599
Epoch: 2/10... Step: 650... Training Loss: 2.4531... Valid

In [69]:
# change the name, for saving multiple files
model_name = 'lstm_100_epoch.net'

with open(model_name, 'wb') as f:
    torch.save(checkpoint, f)

In [13]:
def predict(net, char, h=None, top_k=None):

        x = np.array([[net.char2int[char]]])
        x = one_hot_encode(x, len(net.chars))
        inputs = torch.from_numpy(x)
        
        # One-hot encode data
        inputs = inputs.to(device)
        
        h = tuple([each.data for each in h])
        
        out, h = net(inputs, h)

        p = F.softmax(out, dim=1).data
        
        if(device == 'cuda'):
            p = p.to('cpu')
        
        if top_k is None:
            top_ch = np.arange(len(net.chars))
        else:
            p, top_ch = p.topk(top_k)
            top_ch = top_ch.numpy().squeeze()
        
        p = p.numpy().squeeze()
        char = np.random.choice(top_ch, p=p/p.sum())
        
        return net.int2char[char], h

In [14]:
def finish_joke(net, size, prime='The', top_k=20):
        
    net = net.to(device)
    net.eval()

    chars = [ch for ch in prime]
    h = net.init_hidden_state(1)
    
    for ch in prime:
        char, h = predict(net, ch, h, top_k=top_k)
        

    chars.append(char)
    
    for ii in range(size):
        char, h = predict(net, chars[-1], h, top_k=top_k)
        
        if (char == '\n'):
            break
        
        chars.append(char)

    return ''.join(chars)

In [26]:
# Load model and predict
with open('lstm_100_epoch.net', 'rb') as f:
    checkpoint = torch.load(f)
    
loaded = lstm(checkpoint['tokens'], hidden_size=checkpoint['hidden_size'], n_layers=checkpoint['n_layers'])
loaded.load_state_dict(checkpoint['state_dict'])

prime = "Three men walked into a bar."
print(finish_joke(loaded, 500, prime=prime, top_k=5))

Three men walked into a bar. As the first day this is a single thing a barber, the bar thinks he has to came home the bar. "What is the munches that?' The bartender said, "Well how do you can't be changed that sir?" "Well, you chouse."
