# Text Generation with RNNs

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


### 1 Dataset
Define the path of the file, you want to read and train the model on


We have attach the text file in zip, if you also use colab, then you should upload the text file as well. 

In [2]:
with open('/content/drive/MyDrive/Colab Notebooks/poetry.txt', 'r') as f:
    poetry_corpus = f.read()


#### Inspect the dataset
Take a look at the first 250 characters in text

In [3]:
poetry_corpus[:100]

'寒随穷律变，春逐鸟声开。\n初风飘带柳，晚雪间花梅。\n碧林青旧竹，绿沼翠新苔。\n芝田初雁去，绮树巧莺来。\n晚霞聊自怡，初晴弥可喜。\n日晃百花色，风动千林翠。\n池鱼跃不同，园鸟声还异。\n寄言博通者，知予物'

In [4]:
len(poetry_corpus)

942681

Replace/remove all chinese punctuations


In [5]:
poetry_corpus = poetry_corpus.replace('\n', ' ').replace('\r', ' ').replace('，', ' ').replace('。', ' ')
poetry_corpus[:100]

'寒随穷律变 春逐鸟声开  初风飘带柳 晚雪间花梅  碧林青旧竹 绿沼翠新苔  芝田初雁去 绮树巧莺来  晚霞聊自怡 初晴弥可喜  日晃百花色 风动千林翠  池鱼跃不同 园鸟声还异  寄言博通者 知予物'

### 2 Process the dataset for the learning task
The task that we want our model to achieve is: given a character, or a sequence of characters, what is the most probable next character?

To achieve this, we will input a sequence of characters to the model, and train the model to predict the output, that is, the following character at each time step. RNNs maintain an internal state that depends on previously seen elements, so information about all characters seen up until a given moment will be taken into account in generating the prediction.

#### Vectorize the text
Before we begin training our RNN model, we'll need to create a numerical representation of our text-based dataset. To do this, we'll generate two lookup tables: one that maps characters to numbers, and a second that maps numbers back to characters. Recall that we just identified the unique characters present in the text.

In [6]:
import numpy as np

class TextConverter(object):
    def __init__(self, text_path, max_vocab=5000):
        """

        Args:
            text_path: text position
            max_vocab: max. # text
        """

        with open(text_path, 'r') as f:
            text = f.read()
        text = text.replace('\n', ' ').replace('\r', ' ').replace('，', ' ').replace('。', ' ')
        
        # remove the repeated char
        vocab = set(text)

        # if # char more than max_vocab, remove those whose freqs are less.
        vocab_count = {}

        # compute freq
        for word in vocab:
            vocab_count[word] = 0
        for word in text:
            vocab_count[word] += 1
        vocab_count_list = []
        for word in vocab_count:
            vocab_count_list.append((word, vocab_count[word]))
        vocab_count_list.sort(key=lambda x: x[1], reverse=True)

        if len(vocab_count_list) > max_vocab:
            vocab_count_list = vocab_count_list[:max_vocab]
        vocab = [x[0] for x in vocab_count_list]
        self.vocab = vocab

        self.word_to_int_table = {c: i for i, c in enumerate(self.vocab)}
        self.int_to_word_table = dict(enumerate(self.vocab))

    @property
    def vocab_size(self):
        return len(self.vocab) + 1

    def word_to_int(self, word):
        if word in self.word_to_int_table:
            return self.word_to_int_table[word]
        else:
            return len(self.vocab)

    def int_to_word(self, index):
        if index == len(self.vocab):
            return '<unk>'
        elif index < len(self.vocab):
            return self.int_to_word_table[index]
        else:
            raise Exception('Unknown index!')

    def text_to_arr(self, text):
        arr = []
        for word in text:
            arr.append(self.word_to_int(word))
        return np.array(arr)

    def arr_to_text(self, arr):
        words = []
        for index in arr:
            words.append(self.int_to_word(index))
        return "".join(words)

In [7]:
convert = TextConverter('/content/drive/MyDrive/Colab Notebooks/poetry.txt', max_vocab=10000)


This gives us an integer representation for each character. Observe that the unique characters (i.e., our vocabulary) in the text are mapped as indices from 0 to len(unique). Let's take a peek at this numerical representation of our dataset:

In [8]:
# orginal char(poetry)
txt_char = poetry_corpus[:11]
print(txt_char)

# convert to integers
# We can also look at how the first part of the text is mapped to an integer representation
print(convert.text_to_arr(txt_char))

寒随穷律变 春逐鸟声开
[ 40 166 358 933 565   0  10 367 108  63  78]


In [9]:
n_step = 20

# length of the given sequence
num_seq = int(len(poetry_corpus) / n_step)

text = poetry_corpus[:num_seq*n_step]

print(num_seq)

47134


#### Defining a method to encode one hot labels

In [10]:
def one_hot_encode(arr, n_labels):
    # Initialize the the encoded array
    one_hot = np.zeros((np.multiply(*arr.shape), n_labels), dtype=np.float32)

    # Fill the appropriate elements with ones
    one_hot[np.arange(one_hot.shape[0]), arr.flatten()] = 1.

    # Finally reshape it to get back to the original array
    one_hot = one_hot.reshape((*arr.shape, n_labels))

    return one_hot

#### Defining a method to make mini-batches for training

In [11]:
def get_batches(arr, batch_size, seq_length):
    '''Create a generator that returns batches of size
       batch_size x seq_length from arr.

       Arguments
       ---------
       arr: Array you want to make batches from
       batch_size: Batch size, the number of sequences per batch
       seq_length: Number of encoded chars in a sequence
    '''

    batch_size_total = batch_size * seq_length
    # total number of batches we can make
    n_batches = len(arr) // batch_size_total

    # Keep only enough characters to make full batches
    arr = arr[:n_batches * batch_size_total]
    # Reshape into batch_size rows
    arr = arr.reshape((batch_size, -1))

    # iterate through the array, one sequence at a time
    for n in range(0, arr.shape[1], seq_length):
        # The features
        x = arr[:, n:n + seq_length]
        # The targets, shifted by one
        y = np.zeros_like(x)
        try:
            y[:, :-1], y[:, -1] = x[:, 1:], arr[:, n + seq_length]
        except IndexError:
            y[:, :-1], y[:, -1] = x[:, 1:], arr[:, 0]
        yield x, y



In [12]:
import torch

arr = convert.text_to_arr(text)
arr = arr.reshape((num_seq, -1))
arr = torch.from_numpy(arr)

print(arr.shape)
print(arr[0, :])

torch.Size([47134, 20])
tensor([ 40, 166, 358, 933, 565,   0,  10, 367, 108,  63,  78,   0,   0, 150,
          4, 441, 284, 182,   0, 131])


In [13]:
class TextDataset(object):
    def __init__(self, arr):
        self.arr = arr

    def __getitem__(self, item):
        x = self.arr[item, :]

        # construct label
        y = torch.zeros(x.shape)
        
        # Use the first character entered as the label for the last input
        y[:-1], y[-1] = x[1:], x[0]
        return x, y

    def __len__(self):
        return self.arr.shape[0]

In [14]:
train_set = TextDataset(arr)

In [15]:
x, y = train_set[0]
print(convert.arr_to_text(x.numpy()))
print(convert.arr_to_text(y.numpy()))

寒随穷律变 春逐鸟声开  初风飘带柳 晚
随穷律变 春逐鸟声开  初风飘带柳 晚寒


### 3 The Recurrent Neural Network (RNN) model

Declaring the model

In [16]:
from torch import nn
from torch.autograd import Variable

use_gpu = True

class VanillaCharRNN(nn.Module):
    def __init__(self, num_classes, embed_dim, hidden_size, 
                 num_layers, dropout):
        super().__init__()
        self.num_layers = num_layers
        self.hidden_size = hidden_size

        self.word_to_vec = nn.Embedding(num_classes, embed_dim)
        self.rnn = nn.GRU(embed_dim, hidden_size, num_layers)
        self.project = nn.Linear(hidden_size, num_classes)

    def forward(self, x, hs=None):
        batch = x.shape[0]
        if hs is None:
            hs = Variable(
                torch.zeros(self.num_layers, batch, self.hidden_size))
            if use_gpu:
                hs = hs.cuda()
        word_embed = self.word_to_vec(x)  # (batch, len, embed)
        word_embed = word_embed.permute(1, 0, 2)  # (len, batch, embed)
        out, h0 = self.rnn(word_embed, hs)  # (len, batch, hidden)
        le, mb, hd = out.shape
        out = out.view(le * mb, hd)
        out = self.project(out)
        out = out.view(le, mb, -1)
        out = out.permute(1, 0, 2).contiguous()  # (batch, len, hidden)
        return out.view(-1, out.shape[2]), h0

Declaring the hyperparameters

In [17]:
from torch.utils.data import DataLoader

batch_size = 128
train_data = DataLoader(train_set, batch_size, True, num_workers=4)
epochs = 20

Define and print the net


###### Check if GPU is available

In [18]:
model = VanillaCharRNN(convert.vocab_size, 512, 512, 2, 0.5)
if use_gpu:
    model = model.cuda()
criterion = nn.CrossEntropyLoss()

basic_optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
optimizer = basic_optimizer

print(model)

VanillaCharRNN(
  (word_to_vec): Embedding(5386, 512)
  (rnn): GRU(512, 512, num_layers=2)
  (project): Linear(in_features=512, out_features=5386, bias=True)
)



#### Declaring the train method

In [19]:
for e in range(epochs):
    train_loss = 0
    for data in train_data:
        x, y = data
        y = y.long()
        if use_gpu:
            x = x.cuda()
            y = y.cuda()
        x, y = Variable(x), Variable(y)

        # Forward.
        score, _ = model(x)
        loss = criterion(score, y.view(-1))

        # Backward.
        optimizer.zero_grad()
        loss.backward()
        # Clip gradient.
        nn.utils.clip_grad_norm(model.parameters(), 5)
        optimizer.step()

        train_loss += loss.item()
    print('epoch: {}, perplexity is: {:.3f}'.format(e+1, np.exp(train_loss / len(train_data))))



epoch: 1, perplexity is: 290.225
epoch: 2, perplexity is: 203.827
epoch: 3, perplexity is: 139.426
epoch: 4, perplexity is: 97.714
epoch: 5, perplexity is: 73.730
epoch: 6, perplexity is: 58.069
epoch: 7, perplexity is: 46.989
epoch: 8, perplexity is: 38.830
epoch: 9, perplexity is: 32.597
epoch: 10, perplexity is: 27.671
epoch: 11, perplexity is: 23.708
epoch: 12, perplexity is: 20.534
epoch: 13, perplexity is: 17.888
epoch: 14, perplexity is: 15.732
epoch: 15, perplexity is: 13.949
epoch: 16, perplexity is: 12.470
epoch: 17, perplexity is: 11.228
epoch: 18, perplexity is: 10.186
epoch: 19, perplexity is: 9.307
epoch: 20, perplexity is: 8.549



##### Defining a method to generate the next character

In [20]:
def predict(model, char, h=None, top_k=None):
    ''' Given a character, predict the next character.
        Returns the predicted character and the hidden state.
    '''

    # tensor inputs
    x = np.array([[char2idx[char]]])
    x = one_hot_encode(x, len(model.vocab))
    inputs = torch.from_numpy(x)

    if (train_on_gpu):
        inputs = inputs.cuda()

    # detach hidden state from history
    h = tuple([each.data for each in h])
    '''TODO: feed the current input into the model and generate output'''
    output, h = model('''TODO''') # TODO

    # get the character probabilities
    p = F.softmax(out, dim=1).data
    if (train_on_gpu):
        p = p.cpu()  # move to cpu

    # get top characters
    if top_k is None:
        top_ch = np.arange(len(model.vocab))
    else:
        p, top_ch = p.topk(top_k)
        top_ch = top_ch.numpy().squeeze()

    # select the likely next character with some element of randomness
    p = p.numpy().squeeze()
    char = np.random.choice(top_ch, p=p / p.sum())

    # return the encoded value of the predicted char and the hidden state
    return idx2char[char], h


In [21]:
def predict(preds, top_n=5):
    top_pred_prob, top_pred_label = torch.topk(preds, top_n, 1)
    top_pred_prob /= torch.sum(top_pred_prob)
    top_pred_prob = top_pred_prob.squeeze(0).cpu().numpy()
    top_pred_label = top_pred_label.squeeze(0).cpu().numpy()
    c = np.random.choice(top_pred_label, size=1, p=top_pred_prob)
    return c

**Declaring to generate new text**

In [26]:
text_len = 30

def generate(begin, model):
  model = model.eval()
  samples = [convert.word_to_int(c) for c in begin]
  input_txt = torch.LongTensor(samples)[None]
  if use_gpu:
    input_txt = input_txt.cuda()
  input_txt = Variable(input_txt)
  _, init_state = model(input_txt)
  result = samples
  model_input = input_txt[:, -1][:, None]
  for i in range(text_len):
    # Get the predicted character and the hidden state. 
    out, init_state = model(model_input, init_state)
    print(init_state)
    pred = predict(out.data)
    model_input = Variable(torch.LongTensor(pred))[None]
    if use_gpu:
        model_input = model_input.cuda()
    result.append(pred[0])
  text = convert.arr_to_text(result)
  print('Generate text is: {}'.format(text))

In [23]:
generate('天青色等烟雨', model)

Generate text is: 天青色等烟雨稠  风生荷叶上雪霜 不觉白云生  不觉初终岳昼风殿 明天山


In [24]:
generate('诗人口耳间流浪', model)

Generate text is: 诗人口耳间流浪 择地各成名诗 本师终应巡诗劫 诗格人诗医诗诗酒论诗诗词诗名


In [27]:
generate('恍然间已诀别', model)

tensor([[[ 0.9970, -1.0000, -0.9967,  ...,  0.3692, -0.9964,  0.4751]],

        [[ 0.9348,  0.9606, -0.6135,  ..., -0.8207,  0.9997,  0.7706]]],
       device='cuda:0', grad_fn=<CudnnRnnBackward>)
tensor([[[ 0.9931, -1.0000, -0.6424,  ...,  0.3629, -0.9962,  0.4757]],

        [[ 0.9348,  0.9995, -0.6179,  ..., -0.8207,  0.9804, -0.9990]]],
       device='cuda:0', grad_fn=<CudnnRnnBackward>)
tensor([[[ 0.9457, -1.0000, -0.5579,  ...,  0.9656,  0.9425, -0.8879]],

        [[ 0.9348, -0.9001, -0.6179,  ..., -0.8207,  0.9988, -0.4127]]],
       device='cuda:0', grad_fn=<CudnnRnnBackward>)
tensor([[[ 0.9942,  0.9775, -0.5530,  ...,  0.9667,  0.9503, -0.9494]],

        [[ 0.9348, -0.9912, -0.6179,  ..., -0.8207,  1.0000,  0.8930]]],
       device='cuda:0', grad_fn=<CudnnRnnBackward>)
tensor([[[ 0.8989,  1.0000, -0.5317,  ..., -0.9234,  0.2358, -0.9974]],

        [[ 0.9348, -0.5223, -0.6179,  ..., -0.8207,  1.0000,  0.2975]]],
       device='cuda:0', grad_fn=<CudnnRnnBackward>)
tensor([[[