<a href="https://colab.research.google.com/github/kimtiraj/deep-learning-pytorch/blob/main/LSTM_text_generation.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install torch



In [2]:
import torch
import pandas as pd
import argparse
import numpy as np
from torch import nn, optim
from collections import Counter
from torch.utils.data import DataLoader

In [3]:
class Model(nn.Module):
    def __init__(self, dataset):
        super(Model, self).__init__()
        self.lstm_size = 128
        self.embedding_dim = 128
        self.num_layers = 3

        n_vocab = len(dataset.uniq_words)
        self.embedding = nn.Embedding(
            num_embeddings=n_vocab,
            embedding_dim=self.embedding_dim,
        )
        self.lstm = nn.LSTM(
            input_size=self.lstm_size,
            hidden_size=self.lstm_size,
            num_layers=self.num_layers,
            dropout=0.2,
        )
        self.fc = nn.Linear(self.lstm_size, n_vocab)

    def forward(self, x, prev_state):
        embed = self.embedding(x)
        output, state = self.lstm(embed, prev_state)
        logits = self.fc(output)
        return logits, state

    def init_state(self, sequence_length):
        return (torch.zeros(self.num_layers, sequence_length, self.lstm_size),
                torch.zeros(self.num_layers, sequence_length, self.lstm_size))

In [4]:
class Dataset(torch.utils.data.Dataset):
    def __init__(
        self,
        args,
    ):
        self.args = args
        self.words = self.load_words()
        self.uniq_words = self.get_uniq_words()

        self.index_to_word = {index: word for index, word in enumerate(self.uniq_words)}
        self.word_to_index = {word: index for index, word in enumerate(self.uniq_words)}

        self.words_indexes = [self.word_to_index[w] for w in self.words]

    def load_words(self):
        train_df = pd.read_csv('/content/reddit-cleanjokes.csv')
        text = train_df['Joke'].str.cat(sep=' ')
        return text.split(' ')

    def get_uniq_words(self):
        word_counts = Counter(self.words)
        return sorted(word_counts, key=word_counts.get, reverse=True)

    def __len__(self):
        return len(self.words_indexes) - self.args.sequence_length

    def __getitem__(self, index):
        return (
            torch.tensor(self.words_indexes[index:index+self.args.sequence_length]),
            torch.tensor(self.words_indexes[index+1:index+self.args.sequence_length+1]),
        )

In [5]:
def train(dataset, model, args):
    model.train()

    dataloader = DataLoader(dataset, batch_size=args.batch_size)
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=0.001)

    for epoch in range(args.max_epochs):
        state_h, state_c = model.init_state(args.sequence_length)

        for batch, (x, y) in enumerate(dataloader):
            optimizer.zero_grad()

            y_pred, (state_h, state_c) = model(x, (state_h, state_c))
            loss = criterion(y_pred.transpose(1, 2), y)

            state_h = state_h.detach()
            state_c = state_c.detach()

            loss.backward()
            optimizer.step()

            print({ 'epoch': epoch, 'batch': batch, 'loss': loss.item() })

In [6]:
def predict(dataset, model, text, next_words=100):
    model.eval()

    words = text.split(' ')
    state_h, state_c = model.init_state(len(words))

    for i in range(0, next_words):
        x = torch.tensor([[dataset.word_to_index[w] for w in words[i:]]])
        y_pred, (state_h, state_c) = model(x, (state_h, state_c))

        last_word_logits = y_pred[0][-1]
        p = torch.nn.functional.softmax(last_word_logits, dim=0).detach().numpy()
        word_index = np.random.choice(len(last_word_logits), p=p)
        words.append(dataset.index_to_word[word_index])

    return words

In [11]:
parser = argparse.ArgumentParser()
parser.add_argument('--max-epochs', type=int, default=10)
parser.add_argument('--batch-size', type=int, default=256)
parser.add_argument('--sequence-length', type=int, default=4)
args, unknown = parser.parse_known_args()

dataset = Dataset(args)
model = Model(dataset)

train(dataset, model, args)
print(predict(dataset, model, text='Knock knock. Whos there?'))

{'epoch': 0, 'batch': 0, 'loss': 8.835580825805664}
{'epoch': 0, 'batch': 1, 'loss': 8.842241287231445}
{'epoch': 0, 'batch': 2, 'loss': 8.834406852722168}
{'epoch': 0, 'batch': 3, 'loss': 8.818205833435059}
{'epoch': 0, 'batch': 4, 'loss': 8.808088302612305}
{'epoch': 0, 'batch': 5, 'loss': 8.794114112854004}
{'epoch': 0, 'batch': 6, 'loss': 8.794868469238281}
{'epoch': 0, 'batch': 7, 'loss': 8.768697738647461}
{'epoch': 0, 'batch': 8, 'loss': 8.754875183105469}
{'epoch': 0, 'batch': 9, 'loss': 8.698174476623535}
{'epoch': 0, 'batch': 10, 'loss': 8.62510871887207}
{'epoch': 0, 'batch': 11, 'loss': 8.517964363098145}
{'epoch': 0, 'batch': 12, 'loss': 8.363943099975586}
{'epoch': 0, 'batch': 13, 'loss': 8.288195610046387}
{'epoch': 0, 'batch': 14, 'loss': 8.013922691345215}
{'epoch': 0, 'batch': 15, 'loss': 7.950191497802734}
{'epoch': 0, 'batch': 16, 'loss': 7.803077220916748}
{'epoch': 0, 'batch': 17, 'loss': 7.756598472595215}
{'epoch': 0, 'batch': 18, 'loss': 7.628942966461182}
{'ep