In [9]:
import tensorflow as tf
from tensorflow.keras.layers.experimental import preprocessing
import tensorflow_hub as hub
from tensorflow.keras.models import Model

In [10]:
import numpy as np
import os, time

# 准备数据

In [11]:
all_chars = open('./novel.txt', 'rb').read().decode(encoding='utf-8')
vocab = sorted(set(all_chars)) # 单词清单

In [12]:
# !pip install tensorflow_text -q

In [13]:
import tensorflow_text as tf_text  # 必不可缺 registers the ops

preprocessor =hub.KerasLayer("https://tfhub.dev/tensorflow/bert_zh_preprocess/3")

In [24]:
preprocessor

# 创建训练的batch

In [14]:
all_chars_unicode = tf.strings.unicode_split(all_chars, input_encoding='UTF-8')
all_chars_dataset = tf.data.Dataset.from_tensor_slices(all_chars_unicode)
seq_length = 128
sequences = all_chars_dataset.batch(seq_length+1, drop_remainder=True) # 定义每个样本的数据

In [15]:
def split_input_target(sequence):
    input_text = tf.strings.reduce_join(sequence[:-1])
    target_text = preprocessor(sequence[1:])["input_word_ids"][0]
    return input_text, target_text

dataset = sequences.map(split_input_target)

In [16]:
BATCH_SIZE = 64
BUFFER_SIZE = 10000

dataset = (
    dataset
    .shuffle(BUFFER_SIZE)
    .batch(BATCH_SIZE, drop_remainder=True)
    .prefetch(tf.data.experimental.AUTOTUNE))

dataset

<PrefetchDataset shapes: ((64,), (64, 128)), types: (tf.string, tf.int32)>

In [17]:
rnn_units = 1024

## 嵌入层
text_input = tf.keras.layers.Input(shape=(), dtype=tf.string)
encoder_inputs = preprocessor(text_input)

encoder = hub.KerasLayer(
  handle="https://tfhub.dev/tensorflow/bert_zh_L-12_H-768_A-12/3",
  trainable=False)

outputs = encoder(encoder_inputs)
sequence_output = outputs['sequence_output'] # 用于计算token的向量
pooled_output = outputs['pooled_output'] # 用于计算句向量

embed = tf.keras.Model(text_input, sequence_output)

## 循环层
gru = tf.keras.layers.GRU(
    rnn_units,
    return_sequences=True, 
    return_state=True)

## 全连接层
d = tf.keras.layers.Dense(vocab_size)

## 前向过程
text_embedding = embed(text_input)
states = gru.get_initial_state(text_embedding)
x, states = gru(text_embedding, initial_state=states)
outputs = d(x)

# 构建模型
model = Model(inputs=text_input, outputs=outputs)


# class nlg_for_fun(tf.keras.Model):


NameError: ignored

In [None]:
model.summary()

# 训练

In [10]:
loss = tf.losses.SparseCategoricalCrossentropy(from_logits=True)
model.compile(optimizer='adam', loss=loss)

# 配置checkpoints
checkpoint_dir = './training_checkpoints_weibo'
checkpoint_prefix = os.path.join(checkpoint_dir, 'ckpt')
checkpoint_callback = tf.keras.callbacks.ModelCheckpoint(
    filepath=checkpoint_prefix,
    save_weights_only=True)

EPOCHS = 50

if os.path.exists('training_checkpoints_weibo/'):
    model.load_weights('training_checkpoints_weibo/ckpt')
    model.fit(dataset, epochs=EPOCHS, callbacks=[checkpoint_callback])
else:
    model.fit(dataset, epochs=EPOCHS, callbacks=[checkpoint_callback])

Epoch 1/50


InvalidArgumentError: ignored

# 预测

In [None]:
class OneStep(tf.keras.Model):
    def __init__(self, model, chars_from_ids, ids_from_chars, temperature=1.0):
        super().__init__()
        self.temperature=temperature
        self.model = model
        self.chars_from_ids = chars_from_ids
        self.ids_from_chars = ids_from_chars

        # Create a mask to prevent "" or "[UNK]" from being generated.
        skip_ids = self.ids_from_chars(['','[UNK]'])[:, None]
        sparse_mask = tf.SparseTensor(
            # Put a -inf at each bad index.
            values=[-float('inf')]*len(skip_ids),
            indices = skip_ids,
            # Match the shape to the vocabulary
            dense_shape=[len(ids_from_chars.get_vocabulary())]) 
        self.prediction_mask = tf.sparse.to_dense(sparse_mask)

    @tf.function
    def generate_one_step(self, inputs, states=None):
        # Convert strings to token IDs.
        input_chars = tf.strings.unicode_split(inputs, 'UTF-8')
        input_ids = self.ids_from_chars(input_chars).to_tensor()

        # Run the model.
        # predicted_logits.shape is [batch, char, next_char_logits] 
        predicted_logits, states =  self.model(inputs=input_ids, states=states, 
                                              return_state=True)
        # Only use the last prediction.
        predicted_logits = predicted_logits[:, -1, :]
        predicted_logits = predicted_logits/self.temperature
        # Apply the prediction mask: prevent "" or "[UNK]" from being generated.
        predicted_logits = predicted_logits + self.prediction_mask

        # Sample the output logits to generate token IDs.
        predicted_ids = tf.random.categorical(predicted_logits, num_samples=1)
        predicted_ids = tf.squeeze(predicted_ids, axis=-1)

        # Convert from token ids to characters
        predicted_chars = self.chars_from_ids(predicted_ids)

        # Return the characters and model state.
        return predicted_chars, states

In [None]:
# 加载本地模型
model.load_weights('training_checkpoints_weibo/ckpt')
one_step_model = OneStep(model, chars_from_ids, ids_from_chars)

start = time.time()
states = None
next_char = tf.constant(['今日'])
result = [next_char]

import random
text_length = random.randint(100, 140)
for n in range(text_length):
    next_char, states = one_step_model.generate_one_step(next_char, states=states)
    result.append(next_char)

result = tf.strings.join(result)

end = time.time()

print(result[0].numpy().decode('utf-8'), '\n\n' + '_'*80)

print(f"\nRun time: {end - start}")

In [None]:
from transformers import pipeline, set_seed

In [None]:
generator = pipeline('text-generation', model='gpt2')