<a href="https://colab.research.google.com/github/monilchheda/manning-live-project-building-domain-specific-language-models/blob/master/Character_LM_with_AllenNLP.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [0]:
import pandas as pd
import numpy as np

In [0]:
# load the dataset
df_raw = pd.read_csv('https://alexip-ml.s3.amazonaws.com/stackexchange_812k.tokenized.csv.gz', compression='gzip').sample(frac = 1, random_state = 8).reset_index(drop = True)

In [0]:
text = ''.join(df_raw.text.values).lower()

# simple but efficient way to split string into list of characters
all_characters = [s for s in text]
unique_characters = np.unique(all_characters) 
print(unique_characters)


In [0]:
from collections import Counter

char_count = Counter(all_characters)
print(char_count.most_common())

# limit the allowed characters to MAX_VOCAB_SIZE
MAX_VOCAB_SIZE = 40
valid_characters = [t[0] for t in  char_count.most_common(MAX_VOCAB_SIZE)]
valid_characters.sort()


In [0]:
POSTS_TYPE = 'title'
DF_SAMPLE_COUNT = 10000

In [0]:
# subsample the original dataset

df = df_raw[(df_raw.category == POSTS_TYPE)].sample(DF_SAMPLE_COUNT).reset_index(drop = True)

print("df.shape: ", df.shape)

print(df.text.sample(2).values)


Install AllenNLP  if needed (by uncommenting)

In [0]:
!pip install allennlp

In [0]:
import re
from typing import Dict, List, Tuple, Set

import torch
import torch.optim as optim
from allennlp.common.file_utils import cached_path
from allennlp.common.util import START_SYMBOL, END_SYMBOL
from allennlp.data.fields import TextField
from allennlp.data.instance import Instance
from allennlp.data.iterators import BasicIterator
from allennlp.data.token_indexers import TokenIndexer, SingleIdTokenIndexer
from allennlp.data.tokenizers import Token, CharacterTokenizer
from allennlp.data.vocabulary import Vocabulary, DEFAULT_PADDING_TOKEN
from allennlp.models import Model
from allennlp.modules.seq2seq_encoders import PytorchSeq2SeqWrapper
from allennlp.modules.text_field_embedders import TextFieldEmbedder, BasicTextFieldEmbedder
from allennlp.modules.token_embedders import Embedding
from allennlp.nn.util import get_text_field_mask, sequence_cross_entropy_with_logits
from allennlp.training.trainer import Trainer


In [0]:
tokenizer = CharacterTokenizer()

In [0]:
train_set = df.text.apply(lambda txt : tokenizer.tokenize(txt.lower())).values

In [0]:

def tokens_to_instance(tokens: List[Token], token_indexers: Dict[str, TokenIndexer]):
    tokens = list(tokens)
    tokens.insert(0, Token(START_SYMBOL))
    tokens.append(Token(END_SYMBOL))

    input_field  = TextField(tokens[:-1], token_indexers)
    output_field = TextField(tokens[1:], token_indexers)
    return Instance({'input_tokens': input_field, 'output_tokens': output_field})        


In [0]:
token_indexers = {'tokens': SingleIdTokenIndexer()}
instances = [tokens_to_instance(tokens, token_indexers) for tokens in train_set]

In [0]:
token_counts = {char: 1 for char in valid_characters}
vocab = Vocabulary({'tokens': token_counts})


In [0]:
EMBEDDING_SIZE = 32
HIDDEN_SIZE = 256
BATCH_SIZE = 128

In [0]:
class RNNLanguageModel(Model):
    def __init__(self,
                 embedder: TextFieldEmbedder,
                 hidden_size: int,
                 max_len: int,
                 vocab: Vocabulary) -> None:
        super().__init__(vocab)

        self.embedder = embedder

        # initialize a Seq2Seq encoder, LSTM
        self.rnn = PytorchSeq2SeqWrapper(
            torch.nn.LSTM(EMBEDDING_SIZE, HIDDEN_SIZE, batch_first=True))

        self.hidden2out = torch.nn.Linear(in_features=self.rnn.get_output_dim(), out_features=vocab.get_vocab_size('tokens'))
        self.hidden_size = hidden_size
        self.max_len = max_len

    def forward(self, input_tokens, output_tokens):

        mask = get_text_field_mask(input_tokens)
        embeddings = self.embedder(input_tokens)
        rnn_hidden = self.rnn(embeddings, mask)
        out_logits = self.hidden2out(rnn_hidden)
        loss = sequence_cross_entropy_with_logits(out_logits, output_tokens['tokens'], mask)

        return {'loss': loss}

    def generate(self) -> Tuple[List[Token], torch.tensor]:

        start_symbol_idx = self.vocab.get_token_index(START_SYMBOL, 'tokens')
        end_symbol_idx = self.vocab.get_token_index(END_SYMBOL, 'tokens')
        padding_symbol_idx = self.vocab.get_token_index(DEFAULT_PADDING_TOKEN, 'tokens')

        log_likelihood = 0.
        words = []
        state = (torch.zeros(1, 1, self.hidden_size), torch.zeros(1, 1, self.hidden_size))

        word_idx = start_symbol_idx

        for i in range(self.max_len):
            tokens = torch.tensor([[word_idx]])

            embeddings = self.embedder({'tokens': tokens})
            output, state = self.rnn._module(embeddings, state)
            output = self.hidden2out(output)

            log_prob = torch.log_softmax(output[0, 0], dim=0)

            dist = torch.exp(log_prob)

            word_idx = start_symbol_idx

            while word_idx in {start_symbol_idx, padding_symbol_idx}:
                word_idx = torch.multinomial(
                    dist, num_samples=1, replacement=False).item()

            log_likelihood += log_prob[word_idx]

            if word_idx == end_symbol_idx:
                break

            token = Token(text=self.vocab.get_token_from_index(word_idx, 'tokens'))
            words.append(token)

        return words, log_likelihood

In [0]:
token_embedding = Embedding(num_embeddings=vocab.get_vocab_size('tokens'), embedding_dim=EMBEDDING_SIZE)

embedder = BasicTextFieldEmbedder({"tokens": token_embedding})

model = RNNLanguageModel(embedder=embedder, hidden_size=HIDDEN_SIZE, max_len=80, vocab=vocab)

In [0]:
iterator = BasicIterator(batch_size=BATCH_SIZE)
iterator.index_with(vocab)

optimizer = optim.Adam(model.parameters(), lr=5.e-3)

In [0]:
trainer = Trainer(model=model,
                  optimizer=optimizer,
                  iterator=iterator,
                  train_dataset=instances,
                  num_epochs=5)
trainer.train()

In [0]:
def predict(text: str, model: Model) -> float:
    tokenizer = CharacterTokenizer()
    tokens = tokenizer.tokenize(text)
    
    token_indexers = {'tokens': SingleIdTokenIndexer()}
    instance = tokens_to_instance(tokens, token_indexers)
    output = model.forward_on_instance(instance)
    print(output)

In [0]:
sentence = "In a fixed-effects model only time-varying variables can be used."
predict(sentence, model)

sentence = "I know a pretty little place in Southern California, down San Diego way."
predict(sentence, model)

sentence = "This that is noon but yes apple whatever did regression variable"
predict(sentence, model)


In [0]:
for _ in range(50):
    tokens, _ = model.generate()
    print(''.join(token.text for token in tokens))