<a href="https://colab.research.google.com/github/khyatidoshi/HumorGeneration/blob/main/Humor.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import numpy as np
import pandas as pd
import torch
import torchtext as tt
from torch.utils.data import DataLoader
from torch import nn, optim
from collections import Counter
import torch.nn.functional as F

In [None]:
joke_dataset = pd.read_csv('new_joke_data.csv')
joke_dataset.head()

Unnamed: 0,ID,Joke
0,0,"[me narrating a documentary about narrators] ""..."
1,1,Telling my daughter garlic is good for you. Go...
2,2,I've been going through a really rough period ...
3,3,"If I could have dinner with anyone, dead or al..."
4,4,Two guys walk into a bar. The third guy ducks.


In [None]:
sequence_length=128

In [None]:
class Dataset:
    def __init__(self, dataset, sequence_length=64):
        self.sequence_length = sequence_length
        self.words = self.load_words(dataset)
        self.uniq_words = self.get_uniq_words()
        self.index_to_word = {index: word for index, word in enumerate(self.uniq_words)}
        self.word_to_index = {word: index for index, word in enumerate(self.uniq_words)}
        self.words_indexes = [self.word_to_index[w] for w in self.words]

    def load_words(self, dataset):
        text = " ".join(dataset['Joke'])
        return text.split(' ')

    def get_uniq_words(self):
        word_counts = Counter(self.words)
        return sorted(word_counts, key=word_counts.get, reverse=True)

    def __len__(self):
        return len(self.words_indexes) - self.sequence_length

    def __getitem__(self, index):
        return (
            torch.tensor(self.words_indexes[index:index + self.sequence_length]),
            torch.tensor(self.words_indexes[index + 1:index + self.sequence_length + 1]),
        )


In [None]:
class Model(nn.Module):
    def __init__(self, ntoken, ninp, nhid, nlayers, dropout=0.5, tie_weights=False):
        super(Model, self).__init__()
        self.ntoken = ntoken
        self.drop = nn.Dropout(dropout)
        self.encoder = nn.Embedding(ntoken, ninp)
        self.rnn = getattr(nn, 'LSTM')(ninp, nhid, nlayers, dropout=dropout)
        self.decoder = nn.Linear(nhid, ntoken)

        if tie_weights:
            if nhid != ninp:
                raise ValueError('When using the tied flag, nhid must be equal to emsize')
            self.decoder.weight = self.encoder.weight

        self.init_weights()
        self.nhid = nhid
        self.nlayers = nlayers

    def init_weights(self):
        initrange = 0.1
        nn.init.uniform_(self.encoder.weight, -initrange, initrange)
        nn.init.zeros_(self.decoder.bias)
        nn.init.uniform_(self.decoder.weight, -initrange, initrange)

    def forward(self, input, hidden):

        # Note that Input comes in as size (50 x 64)
        # This is 50 samples, taking 64 tokens of each joke (indices for each token)
        emb = self.drop(self.encoder(input))
        # We embed the indices as vectors - each index is going be represented by a vector of length 64
        # So that emb now as size (50 x 64 x 64) - each of the 64 tokens is now represented by a vector of length 64

        output = torch.zeros( ( emb.size(0), 64 , self.ntoken) )
        # Rather than append outputs to a list, I'm constructing the full output tensor in advance
        # And then I'll just fill it in


        for i in range(emb.size(0)):
            rnn_output, hidden = self.rnn(emb[i].view(1, 64, -1), hidden)
            # So note that emb[i] is of size (64 (tokens) x 64 (vector for each token))
            # emb[i].view(1, 64, -1) resizes it to [1 (sample) x 64 (tokens) x 64 (vector for each token)]
            # PREVIOUSLY: You had emb[i].view(1, 1, -1), which turned it into a tensor [ 1 x (64*64) ]
            # But this new version keeps each token separated rather than turning it into one really long vector


            # At this point, rnn_output has shape [1, 64, 1024], a sequence of 64 large calculated vectors
            # self.drop(rnn_output) - we drop some components but the size doesn't change
            rnn_output = self.drop(rnn_output)
            char_scores = self.decoder(rnn_output)

            # char_scores has size [1, 64, 163945]
            # For each of the 64 tokens, we have 163945 different values, amounting to a 'score' for each
            # Of the available tokens - this score is a simple linear function of the rnn_output terms

            # We want to turn each of these 'scores' into a probability for each token

            char_prob = F.log_softmax(char_scores, dim=1)
            actual_probs = torch.exp(char_prob)

            # actual_probs is of size [1, 64, 163945], for each of the 64 terms, we have 163945 probabilities
            # one probability for each possible token

            output[i] = actual_probs
            # Sampling the next character based on the probability distribution
            # next_char = torch.multinomial(actual_probs[0], 1)
            # This is going to pick an integer (one of the token indices) based on the probabilities
            # So next_char is going to have shape [64, 1] - one integer for each of the 64 terms
            # output[i] = next_char.view(-1)
            # Saves the selected integer into the output

        # The final shape of output is [50, 64] - for each of the 50 samples, we've generated a prediction of
        # 64 integer indices for each token

        return output, hidden

    def init_state(self, bsz):
        weight = next(self.parameters())
        return (weight.new_zeros(self.nlayers, bsz, self.nhid),
                weight.new_zeros(self.nlayers, bsz, self.nhid))

In [None]:

def train(dataset, model, no_epoc =5 , bs =50, sequence_length = 64 ):
    model.train()

    dataloader = DataLoader(dataset, batch_size=bs)
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=0.005)

    for epoch in range(no_epoc):
        state_h, state_c = model.init_state(sequence_length)

        for batch, (x, y) in enumerate(dataloader):
            optimizer.zero_grad()

            y_pred, (state_h, state_c) = model(x, (state_h, state_c))

            total_loss = 0
            for i in range(50):
              joke_i_loss = 0
              for j in range(64):
                pred_prob = y+pred[i,j,x[i,j]]
                joke_i_loss -= torch.log(pred_prob)


              total_loss += joke_i_loss
            loss = total_loss
            # loss = criterion(y_pred.transpose(1, 2), y)

            state_h = state_h.detach()
            state_c = state_c.detach()

            loss.backward()
            optimizer.step()

            print({ 'epoch': epoch, 'batch': batch, 'loss': loss.item() })

In [None]:
def predict(dataset, model, text, next_words=100):
    model.eval()

    words = text.split(' ')
    state_h, state_c = model.init_state(len(words))

    for i in range(0, next_words):
        x = torch.tensor([[dataset.word_to_index[w] for w in words[i:]]])
        y_pred, (state_h, state_c) = model(x, (state_h, state_c))

        last_word_logits = y_pred[0][-1]
        p = torch.nn.functional.softmax(last_word_logits, dim=0).detach().numpy()
        word_index = np.random.choice(len(last_word_logits), p=p)
        words.append(dataset.index_to_word[word_index])

    return words

In [None]:
df = Dataset(joke_dataset)
ntoken = len(df.uniq_words)

model = Model(ntoken, ninp = 64 , nhid=1024, nlayers=3, dropout=0.5, tie_weights=False)

train(df, model)

 In forward


In [None]:
print(predict(df, model, text='Knock knock. Whos there?'))