In [None]:
import matplotlib as mpl
import matplotlib.pyplot as plt
%matplotlib inline
import numpy as np
import sklearn
import pandas as pd
import os
import sys
import time
import tensorflow as tf

from tensorflow import keras

print(tf.__version__)
print(sys.version_info)
for module in mpl, np, pd, sklearn, tf, keras:
    print(module.__name__, module.__version__)

## 数据处理

In [None]:
input_filepath = './shakespeare.txt'
text = open(input_filepath, 'r').read()

print(len(text))
print(text[0:100])

In [None]:
# 1. 生成词表
# 2. 建立映射 char -> id
# 3. data -> id
# 4. 定义模型的输入和输出

# 1. 生成词表
vocab = sorted(set(text))  # 新建一个set对象, set会将重复的字符去掉因此可以用来当做词表
print(len(vocab))
print(vocab)

In [None]:
# 2. 建立映射 char -> id
char2idx = {char: idx for idx, char in enumerate(vocab)}
print(char2idx)

In [None]:
idx2char = np.array(vocab)
print(idx2char)

In [None]:
# 3. data -> id
text_as_int = np.array([char2idx[c] for c in text])
print(text_as_int[0:10])
print(text[0:10])

In [None]:
# 4. 定义模型的输入和输出
def split_input_target(id_text):
    return id_text[0:-1], id_text[1:]

# 字符数据集
char_dataset = tf.data.Dataset.from_tensor_slices(text_as_int)
seq_length = 100
# 等长分割(长度为seq_length)，不是按照完整的句子/单词结构
# 句子数据集
seq_dataset = char_dataset.batch(seq_length + 1, drop_remainder = True)  # drop_remainder=True,batch做到最后多出来的数据丢掉

for ch_id in char_dataset.take(2):
    print(ch_id, idx2char[ch_id.numpy()])
for seq_id in seq_dataset.take(2):
    print(seq_id)
    print(repr(''.join(idx2char[seq_id.numpy()])))

In [None]:
# 拆分输入和输出
seq_dataset = seq_dataset.map(split_input_target)
for item_input, item_output in seq_dataset.take(1):
    print(item_input.numpy())
    print(item_output.numpy())

## 构建模型

In [None]:
batch_size = 64
buffer_size = 10000
seq_dataset = seq_dataset.shuffle(buffer_size).batch(batch_size, drop_remainder=True)

In [None]:
vocab_size = len(vocab)
embedding_dim = 256
rnn_units = 1024

def build_model(vocab_size, embedding_dim, rnn_units, batch_size):
    model = keras.models.Sequential([
        keras.layers.Embedding(vocab_size, embedding_dim, batch_input_shape=[batch_size, None]),
        keras.layers.SimpleRNN(units=rnn_units, return_sequences=True),
        keras.layers.Dense(vocab_size)
    ])
    return model

model = build_model(vocab_size, embedding_dim, rnn_units, batch_size)

model.summary()

In [None]:
for input_batch, target_batch in seq_dataset.take(1):
    batch_prediction = model(input_batch)
    print(batch_prediction.shape)

In [None]:
# 随机采样
# 采用概率最大的字符作为预测值：贪心策略(greedy)，只能产生一个序列；采用随机采样的方式来输出预测值：随机策略(random),能产生多个序列
sample_indices = tf.random.categorical(logits = batch_prediction[0], # batch_prediction[0]: [100, 65]  # logits：对于分类任务来说，计算softmax之前的值成为logits
                                       num_samples = 1   # 随机采样几个数
                                       )
print(sample_indices.shape)
sample_indices = tf.squeeze(sample_indices, axis=-1)  # 去掉一个多余的维度
print(sample_indices.shape)

In [None]:
print("Input: ", repr("".join(idx2char[input_batch[0]])))
print("Output: ", repr("".join(idx2char[target_batch[0]])))
print("Predictions: ", repr("".join(idx2char[sample_indices])))

In [None]:
# 自定义损失函数
def loss(labels, logits):
    return keras.losses.sparse_categorical_crossentropy(labels, logits, from_logits=True)  # 默认是以概率分布(经过softmax激活函数)和labels为输入,在这里以logits作为输入，因此要设from_logits=True

model.compile(optimizer='adam', loss=loss)
example_loss = loss(target_batch, batch_prediction)
print(example_loss.shape)
print(example_loss.numpy().mean())

In [None]:
# 训练模型 将模型保存下来
output_dir = './text_generation_checkpoints'
if not os.path.exists(output_dir):
    os.mkdir(output_dir)
checkpoint_prefix = os.path.join(output_dir, 'ckpt_{epoch}')
checkpoint_callback = keras.callbacks.ModelCheckpoint(filepath=checkpoint_prefix, save_weights_only=True)
epochs = 100
history = model.fit(seq_dataset, epochs=epochs, callbacks=[checkpoint_callback])

In [None]:
tf.train.latest_checkpoint(output_dir)

In [None]:
# 有checkpoint载入模型  用于预测
model2 = build_model(vocab_size, embedding_dim, rnn_units, batch_size=1)   # batch_size=1: 预测时一次只生成一个句子
model2.load_weights(tf.train.lastest_checkpoint(output_dir))
model2.build(tf.TensorShape([1, None])) # 设置输入的size, 1表示输入一个样本，None指输入变长的句子
model2.summary()

In [None]:
# 文本生成
# start char sequence A,
# A -> model -> b
# A.append(b) -> B
# B -> model -> c
# B.append(c) -> C
# ...
def generate_text(model, start_string, num_generate=1000):
    input_eval = [char2idx[ch] for ch in start_string]
    input_eval = tf.expand_dims(input_eval, 0)  # 增加一个维度，因为模型的输入是[1, None]
    text_generated = []  # 用于存放生成的字符
    model.reset_states()
    for _ in range(num_generate):
        # 1. input_eval model ingerence -> predictions
        # 2. sample -> char ->text_generate
        # 3. update input_eval
        # predictions: [batch_size, len(input_eval), vocab_size]
        predictions = model(input_eval)
        # predictions: [batch_size, len(input_eval), vocab_size] -> [len(input_eval), vocab_size]
        predictions = tf.squeeze(predictions, 0)
        # predicted_ids: [len(input_eval, 1)]
        predicted_ids = tf.random.categorical(predictions, num_samples=1)
        predicted_id = predicted_ids[-1, 0].numpy()
        text_generated.append(idx2char[predicted_id])
        # s, x -> rnn -> s', y 只需要一个字符作为输入
        input_eval = tf.expand_dims([predicted_id], 0)
    return start_string + ''.join(text_generated)
new_text = generate_text(model2, 'All: ')
print(new_text)