This Lab is made with inspiration & code snippets of:
- https://github.com/yxtay/char-rnn-text-generation
- https://d2l.ai/chapter_recurrent-modern/lstm.html
- https://d2l.ai/chapter_recurrent-neural-networks/rnn-scratch.html

In [None]:
import mxnet as mx
import mxnet.ndarray as F
import mxnet.gluon as gluon
from mxnet.gluon import nn, rnn
from mxnet import autograd

import time
from tqdm import tqdm

from utils.text_helpers import load_corpus, Vocab, batch_generator, encode_text, decode_text, generate_seed, sample_from_probs

In [None]:
class Model(gluon.Block):
    """
    build character embeddings LSTM text generation model.
    """
    def __init__(self, vocab_size, embedding_size=32,
                 rnn_size=128, num_layers=2, drop_rate=0.0, **kwargs):
        super(Model, self).__init__(**kwargs)

        self.args = {
            "vocab_size": vocab_size,
            "embedding_size": embedding_size,
            "rnn_size": rnn_size,
            "num_layers": num_layers,
            "drop_rate": drop_rate}

        with self.name_scope():
            self.encoder = nn.Embedding(vocab_size, embedding_size)
            self.dropout = nn.Dropout(drop_rate)
            self.rnn = rnn.LSTM(rnn_size, num_layers, dropout = drop_rate,
                                input_size = embedding_size)
            self.decoder = nn.Dense(vocab_size, in_units = rnn_size)

    def forward(self, inputs, state):
        # input shape: [seq_len, batch_size]
        seq_len, batch_size = inputs.shape
        embed_seq = self.dropout(self.encoder(inputs))
        # shape: [seq_len, batch_size, embedding_size]
        rnn_out, state = self.rnn(embed_seq, state)
        # rnn_out shape: [seq_len, batch_size, rnn_size]
        # hidden shape: [2, num_layers, batch_size, rnn_size]
        rnn_out = self.dropout(rnn_out)
        # shape: [seq_len, batch_size, rnn_size]
        logits = (self.decoder(rnn_out.reshape((-1, rnn_out.shape[2])))
                  .reshape((seq_len, batch_size, -1)))
        # output shape: [seq_len, batch_size, vocab_size]
        return logits, state

    def begin_state(self, batch_size=1):
        """
        initialises rnn states.
        """
        return self.rnn.begin_state(batch_size)

In [None]:
def train(corpus, vocab, embedding_size=32,
          rnn_size=128, num_layers=2, drop_rate=0.0,
          batch_size = 64, seq_len = 64, num_epochs = 64):
    """
    trains model specfied in args.
    main method for train subcommand.
    """

    print("corpus length: %s, vocabulary size: %s" %(len(corpus), len(vocab)))

    VOCAB_SIZE = len(vocab)

    model = Model(vocab_size=VOCAB_SIZE,
                  embedding_size=embedding_size,
                  rnn_size=rnn_size,
                  num_layers=num_layers,
                  drop_rate=drop_rate)
    model.initialize(mx.init.Xavier())
    model.hybridize()

    # loss function
    loss = gluon.loss.SoftmaxCrossEntropyLoss(batch_axis=1)

    # optimizer
    optimizer = mx.optimizer.Adam(learning_rate=0.001, clip_gradient=5.0)

    # trainer
    trainer = gluon.Trainer(model.collect_params(), optimizer)

    # training start
    num_batches = (len(corpus) - 1) // (batch_size * seq_len)
    data_iter = batch_generator(encode_text(corpus, char2id=vocab), batch_size = batch_size, seq_len=seq_len, vocab = vocab)
    state = model.begin_state(batch_size)

    print("start of training.")
    time_train = time.time()
    for i in range(num_epochs):
        epoch_losses = mx.nd.empty(num_batches)
        time_epoch = time.time()
        # training epoch
        for j in tqdm(range(num_batches), desc="epoch {}/{}".format(i + 1, num_epochs), position=0, leave=True):
            # prepare inputs
            x, y = next(data_iter)
            x = mx.nd.array(x.T)
            y = mx.nd.array(y.T)
            # reset state variables to remove their history
            state = [arr.detach() for arr in state]

            with autograd.record():
                logits, state = model(x, state)
                # calculate loss
                L = loss(logits, y)
                L = F.mean(L)
                epoch_losses[j] = L.asscalar()
                # calculate gradient
                L.backward()
            # apply gradient update
            trainer.step(1)

        # logs
        duration_epoch = time.time() - time_epoch
        print("epoch: %s, duration: %ds, loss: %.6g."
              %(i + 1, duration_epoch, F.mean(epoch_losses).asscalar()))

        # generate text
        seed = generate_seed(corpus)
        generate_text(model, seed, vocab=vocab)

    # training end
    duration_train = time.time() - time_train
    print("end of training, duration: %ds." %duration_train)
    # generate text
    seed = generate_seed(corpus)
    generate_text(model, seed, 1024, 3, vocab=vocab)
    return model

In [None]:
def generate_text(model, seed, length=512, top_n=10, vocab=Vocab):
    """
    generates text of specified length from trained model
    with given seed character sequence.
    """
    print("generating %s characters from top %s choices."%(length, top_n))
    print('generating with seed: "%s".' % (''.join(seed))) # ['a', 'b', 'c'] -> "abc"

    generated = seed
    encoded = mx.nd.array(encode_text(seed, char2id=vocab))
    seq_len = encoded.shape[0]

    x = F.expand_dims(encoded[:seq_len-1], 1)
    # input shape: [seq_len, 1]
    state = model.begin_state()
    # get rnn state due to seed sequence
    _, state = model(x, state)

    next_index = encoded[seq_len-1].asscalar()
    for i in range(length):
        x = mx.nd.array([[next_index]])
        # input shape: [1, 1]
        logit, state = model(x, state)
        # output shape: [1, vocab_size]
        probs = F.softmax(logit)
        next_index = sample_from_probs(probs.asnumpy().squeeze(), top_n)
        # append to sequence
        generated += vocab.to_tokens(next_index) #ID2CHAR[next_index]

    print("generated text: \n%s\n" %(''.join(generated)))
    return generated

In [None]:
# read text
# corpus, vocab = load_corpus('data/time-machine.txt')
corpus, vocab = load_corpus('data/tinyshakespeare.txt')

In [None]:
model = train(corpus, vocab)

In [None]:
# model, seed, length=512, top_n=10, vocab=Vocab
generate_text(model, "to", length=128, top_n=3, vocab=vocab)