In [1]:
try:
  # %tensorflow_version only exists in Colab.
  %tensorflow_version 2.x
except Exception:
  pass

In [2]:
# 禁用詞，包含如下字符的唐詩將被忽略
DISALLOWED_WORDS = ['（', '）', '(', ')', '__', '《', '》', '【', '】', '[', ']']
# 句子最大長度
MAX_LEN = 64
# 最小詞頻
MIN_WORD_FREQUENCY = 8
# 訓練的batch size
BATCH_SIZE = 16
# 數據集路徑
DATASET_PATH = 'https://raw.githubusercontent.com/silverair/TF2.0/master/poetry.txt'
# 每個epoch訓練完成後，隨機生成SHOW_NUM首古詩作為展示
SHOW_NUM = 5
# 共訓練多少個epoch
TRAIN_EPOCHS = 20
# 最佳權重保存路徑
BEST_MODEL_PATH = './best_model.h5'

In [3]:
class Tokenizer:
    """
    分詞器
    """

    def __init__(self, token_dict):
        # 詞->編號的映射
        self.token_dict = token_dict
        # 編號->詞的映射
        self.token_dict_rev = {value: key for key, value in self.token_dict.items()}
        # 詞彙表大小
        self.vocab_size = len(self.token_dict)

    def id_to_token(self, token_id):
        """
        給定一個編號，查找詞彙表中對應的詞
        :param token_id: 帶查找詞的編號
        :return: 編號對應的詞
        """
        return self.token_dict_rev[token_id]

    def token_to_id(self, token):
        """
        給定一個詞，查找它在詞彙表中的編號
        未找到則返回低頻詞[UNK]的編號
        :param token: 帶查找編號的詞
        :return: 詞的編號
        """
        return self.token_dict.get(token, self.token_dict['[UNK]'])

    def encode(self, tokens):
        """
        給定一個字符串s，在頭尾分別加上標記開始和結束的特殊字符，並將它轉成對應的編號序列
        :param tokens: 待編碼字符串
        :return: 編號序列
        """
        # 加上開始標記
        token_ids = [self.token_to_id('[CLS]'), ]
        # 加入字符串編號序列
        for token in tokens:
            token_ids.append(self.token_to_id(token))
        # 加上結束標記
        token_ids.append(self.token_to_id('[SEP]'))
        return token_ids

    def decode(self, token_ids):
        """
        給定一個編號序列，將它解碼成字符串
        :param token_ids: 待解碼的編號序列
        :return: 解碼出的字符串
        """
        # 起止標記字符特殊處理
        spec_tokens = {'[CLS]', '[SEP]'}
        # 保存解碼出的字符的list
        tokens = []
        for token_id in token_ids:
            token = self.id_to_token(token_id)
            if token in spec_tokens:
                continue
            tokens.append(token)
        # 拼接字符串
        return ''.join(tokens)

In [4]:
class PoetryDataGenerator:
    """
    古詩數據集生成器
    """

    def __init__(self, data, random=False):
        # 數據集
        self.data = data
        # batch size
        self.batch_size = batch_size
        # 每個epoch迭代的步數
        self.steps = int(math.floor(len(self.data) / self.batch_size))
        # 每個epoch開始時是否隨機混洗
        self.random = random

    def sequence_padding(self, data, length=None, padding=None):
        """
        將給定數據填充到相同長度
        :param data: 待填充數據
        :param length: 填充後的長度，不傳遞此參數則使用data中的最大長度
        :param padding: 用於填充的數據，不傳遞此參數則使用[PAD]的對應編號
        :return: 填充後的數據
        """
        # 計算填充長度
        if length is None:
            length = max(map(len, data))
        # 計算填充數據
        if padding is None:
            padding = tokenizer.token_to_id('[PAD]')
        # 開始填充
        outputs = []
        for line in data:
            padding_length = length - len(line)
            # 不足就進行填充
            if padding_length > 0:
                outputs.append(np.concatenate([line, [padding] * padding_length]))
            # 超過就進行截斷
            else:
                outputs.append(line[:length])
        return np.array(outputs)

    def __len__(self):
        return self.steps

    def __iter__(self):
        total = len(self.data)
        # 是否隨機混洗
        if self.random:
            np.random.shuffle(self.data)
        # 迭代一個epoch，每次yield一個batch
        for start in range(0, total, self.batch_size):
            end = min(start + self.batch_size, total)
            batch_data = []
            # 逐一對古詩進行編碼
            for single_data in self.data[start:end]:
                batch_data.append(tokenizer.encode(single_data))
            # 填充為相同長度
            batch_data = self.sequence_padding(batch_data)
            # yield x,y
            yield batch_data[:, :-1], tf.one_hot(batch_data[:, 1:], tokenizer.vocab_size)
            del batch_data

    def for_fit(self):
        """
        創建一個生成器，用於訓練
        """
        # 死循環，當數據訓練一個epoch之後，重新迭代數據
        while True:
            # 委託生成器
            yield from self.__iter__()


In [5]:
def generate_random_poetry(tokenizer, model, s=''):
    """
    隨機生成一首詩
    :param tokenizer: 分詞器
    :param model: 用於生成古詩的模型
    :param s: 用於生成古詩的起始字符串，默認為空串
    :return: 一個字符串，表示一首古詩
    """
    # 將初始字符串轉成token
    token_ids = tokenizer.encode(s)
    # 去掉結束標記[SEP]
    token_ids = token_ids[:-1]
    while len(token_ids) < MAX_LEN:
        # 進行預測，只保留第一個樣例（我們輸入的樣例數只有1）的、最後一個token的預測的、不包含[PAD][UNK][CLS]的概率分布
        _probas = model.predict([token_ids, ])[0, -1, 3:]
        # print(_probas)
        # 按照出現概率，對所有token倒序排列
        p_args = _probas.argsort()[::-1][:100]
        # 排列後的概率順序
        p = _probas[p_args]
        # 先對概率歸一
        p = p / sum(p)
        # 再按照預測出的概率，隨機選擇一個詞作為預測結果
        target_index = np.random.choice(len(p), p=p)
        target = p_args[target_index] + 3
        # 保存
        token_ids.append(target)
        if target == 3:
            break
    return tokenizer.decode(token_ids)


def generate_acrostic(tokenizer, model, head):
    """
    隨機生成一首藏頭詩
    :param tokenizer: 分詞器
    :param model: 用於生成古詩的模型
    :param head: 藏頭詩的頭
    :return: 一個字符串，表示一首古詩
    """
    # 使用空串初始化token_ids，加入[CLS]
    token_ids = tokenizer.encode('')
    token_ids = token_ids[:-1]
    # 標點符號，這裡簡單的只把逗號和句號作為標點
    punctuations = ['，', '。']
    punctuation_ids = {tokenizer.token_to_id(token) for token in punctuations}
    # 緩存生成的詩的list
    poetry = []
    # 對於藏頭詩中的每一個字，都生成一個短句
    for ch in head:
        # 先記錄下這個字
        poetry.append(ch)
        # 將藏頭詩的字符轉成token id
        token_id = tokenizer.token_to_id(ch)
        # 加入到列表中去
        token_ids.append(token_id)
        # 開始生成一個短句
        while True:
            # 進行預測，只保留第一個樣例（我們輸入的樣例數只有1）的、最後一個token的預測的、不包含[PAD][UNK][CLS]的概率分布
            _probas = model.predict([token_ids, ])[0, -1, 3:]
            # 按照出現概率，對所有token倒序排列
            p_args = _probas.argsort()[::-1][:100]
            # 排列後的概率順序
            p = _probas[p_args]
            # 先對概率歸一
            p = p / sum(p)
            # 再按照預測出的概率，隨機選擇一個詞作為預測結果
            target_index = np.random.choice(len(p), p=p)
            target = p_args[target_index] + 3
            # 保存
            token_ids.append(target)
            # 只有不是特殊字符時，才保存到poetry裡面去
            if target > 3:
                poetry.append(tokenizer.id_to_token(target))
            if target in punctuation_ids:
                break
    return ''.join(poetry)

In [6]:
# -*- coding: utf-8 -*-
# File    : dataset.py
# Author  : AaronJny
# Time    : 2019/12/30
# Desc    : 構建數據集
from collections import Counter
import math
import numpy as np
import tensorflow as tf
import urllib.request  # the lib that handles the url stuff


# 禁用詞
disallowed_words = DISALLOWED_WORDS
# 句子最大長度
max_len = MAX_LEN
# 最小詞頻
min_word_frequency = MIN_WORD_FREQUENCY
# mini batch 大小
batch_size = BATCH_SIZE


lines = urllib.request.urlopen(DATASET_PATH)
  #print(line.decode('utf-8'))
  #lines = txt.readlines()
lines = [line.decode('utf-8').replace('：', ':') for line in lines]

# 數據集列表
poetry = []
# 逐行處理讀取到的數據
for line in lines:
    # 有且只能有一個冒號用來分割標題
    if line.count(':') != 1:
        continue
    # 後半部分不能包含禁止詞
    __, last_part = line.split(':')
    ignore_flag = False
    for dis_word in disallowed_words:
        if dis_word in last_part:
            ignore_flag = True
            break
    if ignore_flag:
        continue
    # 長度不能超過最大長度
    if len(last_part) > max_len - 2:
        continue
    poetry.append(last_part.replace('\n', ''))

# 統計詞頻
counter = Counter()
for line in poetry:
    counter.update(line)
# 過濾掉低頻詞
_tokens = [(token, count) for token, count in counter.items() if count >= min_word_frequency]
# 按詞頻排序
_tokens = sorted(_tokens, key=lambda x: -x[1])
# 去掉詞頻，只保留詞列表
_tokens = [token for token, count in _tokens]

# 將特殊詞和數據集中的詞拼接起來
_tokens = ['[PAD]', '[UNK]', '[CLS]', '[SEP]'] + _tokens
# 創建詞典 token->id映射關係
token_id_dict = dict(zip(_tokens, range(len(_tokens))))
# 使用新詞典重新建立分詞器
tokenizer = Tokenizer(token_id_dict)
# 混洗數據
np.random.shuffle(poetry)


In [7]:
# 構建模型
model = tf.keras.Sequential([
    # 不定長度的輸入
    tf.keras.layers.Input((None,)),
    # 詞嵌入層
    tf.keras.layers.Embedding(input_dim=tokenizer.vocab_size, output_dim=128),
    # 第一個LSTM層，返回序列作為下一層的輸入
    tf.keras.layers.LSTM(128, dropout=0.5, return_sequences=True),
    # 第二個LSTM層，返回序列作為下一層的輸入
    tf.keras.layers.LSTM(128, dropout=0.5, return_sequences=True),
    # 對每一個時間點的輸出都做softmax，預測下一個詞的概率
    tf.keras.layers.TimeDistributed(tf.keras.layers.Dense(tokenizer.vocab_size, activation='softmax')),
])

# 查看模型結構
model.summary()
# 配置優化器和損失函數
model.compile(optimizer=tf.keras.optimizers.Adam(), loss=tf.keras.losses.categorical_crossentropy)


Model: "sequential"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
embedding (Embedding)        (None, None, 128)         445312    
_________________________________________________________________
lstm (LSTM)                  (None, None, 128)         131584    
_________________________________________________________________
lstm_1 (LSTM)                (None, None, 128)         131584    
_________________________________________________________________
time_distributed (TimeDistri (None, None, 3479)        448791    
Total params: 1,157,271
Trainable params: 1,157,271
Non-trainable params: 0
_________________________________________________________________


In [8]:
class Evaluate(tf.keras.callbacks.Callback):
    """
    在每個epoch訓練完成後，保留最優權重，並隨機生成settings.SHOW_NUM首古詩展示
    """

    def __init__(self):
        super().__init__()
        # 給loss賦一個較大的初始值
        self.lowest = 1e10

    def on_epoch_end(self, epoch, logs=None):
        # 在每個epoch訓練完成後調用
        # 如果當前loss更低，就保存當前模型參數
        if logs['loss'] <= self.lowest:
            self.lowest = logs['loss']
            model.save(BEST_MODEL_PATH)
        # 隨機生成幾首古體詩測試，查看訓練效果
        print()
        for i in range(SHOW_NUM):
            print(generate_random_poetry(tokenizer, model))


# 創建數據集
data_generator = PoetryDataGenerator(poetry, random=True)
# 開始訓練
model.fit_generator(data_generator.for_fit(), steps_per_epoch=data_generator.steps, epochs=TRAIN_EPOCHS,
                    callbacks=[Evaluate()])

Epoch 1/20





不前南如開，不知山行衣。一遠見何外，獨入草海，江樹望意，知上不外，城上人年，獨煙中秋，三落天意，夜何流衣。
石見將中心，不風風開客。中重是山陰，月高未草月。君草重遠裡，南歸心日來。風石來行落，秋人千如在。
金陽草出水，不行子上遲。獨中花聲春，風上雲如衣。不遠風春來，中一寒明微。中不花來同，春入不誰，前作寒情。
歸白里開成，落年出花天。莫書何見風，日人寒天多。還千白為深，金在入上開。天暮見在，樹應無山，我無月子行。
三千水來事，西歸暮上陰。
Epoch 2/20

寂門仙雲城，況葉何到秋。不庭寒來遠，飛此北故來。因吟生天起，月山見竹頭。孤朝金子在，何光更陽知。
初庭一天花重聲，欲人三夜滿仙新。獨朝自頭時風酒，高上看事盡中間。
不色月中子，白日花下名。天落來無客，青山欲城東。草明連見在，歸天長庭開。更雲長相後，終心欲回哀。
西南明頭下，山我更上深。夜草因歸盡，無是更一空。野歸風寒至，雪得正清清。有年高不，玉白去，故日送長秋。
江明獨山，江頭去上難陽，孤花有下五塵。
Epoch 3/20

君聞夜月石，不古雪城衣。遠入山年醉，心時去翠歸。
此心皆自事，春色帶無關。坐起高煙處，長燈生得移。
已是春沙去，年南入月園。看香何月草，夜水白年飛。
何水行離去，看歌夜在離。寒燈長下少，猶白石霜深。野月風秋暮，多明月木雲。唯得江海影，孤必思雲期。
夜晚初聲客，年年照外空。看窗不到散，孤草白陽年。夜事天邊近，山歌半入明。江時逢不早，南去石長中。
Epoch 4/20

江里流陰樹月開，不為同是此雲愁。已聞行去一長淚，不得青陽不可家。
水人風雨雨，猶與亦秋樓。有水空新至，寒風對晚陰。
日月愁飛落，香爐對石初。水池秋月月，不向不紛茶。野迥過猶去，臨猿雨浦長。應知一如住，回古白秋山。
一女多歸道，君思又別清。山生不我晚，日水斷長舟。遠起初歸晚，寒時野海濱。何事無無處，雲雲日處來。
人愛青林別，依聲萬雪分。孤林千里暖，草浪玉流風。馬有分天水，寒猿度水高。為閒重在夢，應日照城扉。
Epoch 5/20

松竹微雲雪後波，閒歸萬海又多醒。可能相恐逢君己，卻向重風一不能。
老此人為在水愁，一來還事不同清。清煙不此西長暮，有上湘頭幾盡遊。
松風清遠草，寒岸隔門間。
青霞去處夜，更見見林中。遠氣經無物，殘堂早處生。夜從多不處，書入海樓催。若有清台日，誰為上釣樓。
南十同如好，天山信別來。春光寒木客，明鳥

<tensorflow.python.keras.callbacks.History at 0x7f7bf1f0c490>

In [19]:
# 加載訓練好的模型
model = tf.keras.models.load_model(BEST_MODEL_PATH)
# 隨機生成一首詩
print(generate_random_poetry(tokenizer, model))
# 給出部分信息的情況下，隨機生成剩餘部分
print(generate_random_poetry(tokenizer, model, s='紅豆生南國，'))
# 生成藏頭詩
print(generate_acrostic(tokenizer, model, head='好想打球'))


我君身未悟，非事出山情。行夢看風霧，孤山起草煙。聽流歸遠路，飛夜草萋淒。願使千年外，誰非造處時。
紅豆生南國，胡台會不無。不知無所料，猶是楚王名。山驛千株近，時深出縣低。無言有此趣，暫見舊山迷。
好盡無逢俗，想君高亦行。打尋經事在，球與虎僧齊。
