In [22]:
import string

class Vectorizer:
    def standardize(self, text):
        text = text.lower()
        return "".join(char for char in text if char not in string.punctuation)
    
    def tokenize(self, text):
        text = self.standardize(text)
        return text.split()
    
    def make_vocabulary(self, dataset):
        self.vocabulary = {"": 0, "[UNK]": 1}
        for text in dataset:
            text = self.standardize(text)
            tokens = self.tokenize(text)
            for token in tokens:
                if token not in self.vocabulary:
                    self.vocabulary[token] = len(self.vocabulary)
        self.inverse_vocabulary = dict(
            (v, k) for k, v in self.vocabulary.items()
        )
    
    def encode(self, text):
        text = self.standardize(text)
        tokens = self.tokenize(text)
        return [self.vocabulary.get(token, 1) for token in tokens]
    
    def decode(self, int_sequence):
        return " ".join(
            self.inverse_vocabulary.get(i, '[UNK]') for i in int_sequence
        )
    
vectorizer = Vectorizer()
dataset = [
    "i write, erase, rewrite",
    "Erase again, and then",
    "A poppy blooms."
]
vectorizer.make_vocabulary(dataset)

In [23]:
test_sentence = "I write, rewrite, and still rewrite again"
encoded_sentence = vectorizer.encode(test_sentence)
print(encoded_sentence)
decoded_sentence = vectorizer.decode(encoded_sentence)
print(decoded_sentence)

[2, 3, 5, 7, 1, 5, 6]
i write rewrite and [UNK] rewrite again


In [24]:
# TextVectorization层
# 文本标准化：转换为小写字母并删除标点符号
# token化：利用空格进行拆分
from tensorflow.keras.layers import TextVectorization
text_vectorization = TextVectorization(output_mode='int')
text_vectorization.adapt(dataset)


In [25]:
vocabulary = text_vectorization.get_vocabulary()
print(vocabulary)
test_sentence = "I write, rewrite, and still rewrite again"
encoded_sentence = text_vectorization(test_sentence)
print(encoded_sentence)
inverse_vocab = dict(enumerate(vocabulary))
decoded_sentence = " ".join(inverse_vocab[int(i)] for i in encoded_sentence)
print(decoded_sentence)

['', '[UNK]', 'erase', 'write', 'then', 'rewrite', 'poppy', 'i', 'blooms', 'and', 'again', 'a']
tf.Tensor([ 7  3  5  9  1  5 10], shape=(7,), dtype=int64)
i write rewrite and [UNK] rewrite again


In [None]:
# 验证集
import os, pathlib, shutil, random

base_dir = pathlib.Path("aclImdb")
val_dir = base_dir / "val"
train_dir = base_dir / "train"
for category in ('neg', 'pos'):
    os.makedirs(val_dir / category)
    files = os.listdir(train_dir / category)
    random.Random(1337).shuffle(files)
    num_val_samples = int(0.2 * len(files))
    val_files = files[-num_val_samples:]
    for fname in val_files:
        shutil.move(train_dir / category / fname, val_dir /category / fname)

In [27]:
# 构建Dataset对象
from tensorflow import keras
batch_size = 32

train_ds = keras.utils.text_dataset_from_directory('aclImdb/train', batch_size=batch_size)
val_ds = keras.utils.text_dataset_from_directory('aclImdb/val', batch_size=batch_size)
test_ds = keras.utils.text_dataset_from_directory('aclImdb/test', batch_size=batch_size)

Found 20000 files belonging to 2 classes.
Found 5000 files belonging to 2 classes.
Found 25000 files belonging to 2 classes.


In [28]:
for inputs, targets in train_ds:
    print('inputs.shape:', inputs.shape)
    print('inputs.dtype:', inputs.dtype)
    print('targets.shape:', targets.shape)
    print('targets.dtype:', targets.dtype)
    print('inputs[0]:', inputs[0])
    print('targets[0]:', targets[0])
    break


inputs.shape: (32,)
inputs.dtype: <dtype: 'string'>
targets.shape: (32,)
targets.dtype: <dtype: 'int32'>
inputs[0]: tf.Tensor(b"I have read the novel Reaper of Ben Mezrich a fews years ago and last night I accidentally came to see this adaption.<br /><br />Although it's been years since I read the story the first time, the differences between the novel and the movie are humongous. Very important elements, which made the whole thing plausible are just written out or changed to bad.<br /><br />If the plot sounds interesting to you: go and get the novel. Its much, much, much better.<br /><br />Still 4 out of 10 since it was hard to stop watching because of the great basic plot by Ben Mezrich.", shape=(), dtype=string)
targets[0]: tf.Tensor(0, shape=(), dtype=int32)


In [29]:
# 单个单词（一元语法）
# 20000适用于文本分类的合适的词表大小
text_vectorization = TextVectorization(
    max_tokens=20000,
    output_mode='multi_hot',
)
text_only_train_ds = train_ds.map(lambda x, y: x)
text_vectorization.adapt(text_only_train_ds)

binary_1gram_train_ds = train_ds.map(
    lambda x, y: (text_vectorization(x), y),
    num_parallel_calls=4
)
binary_1gram_val_ds = val_ds.map(
    lambda x, y: (text_vectorization(x), y),
    num_parallel_calls=4
)
binary_1gram_test_ds = test_ds.map(
    lambda x, y: (text_vectorization(x), y),
    num_parallel_calls=4
)

In [30]:
for inputs, targets in binary_1gram_train_ds:
    print('inputs.shape:', inputs.shape)
    print('inputs.dtype:', inputs.dtype)
    print('targets.shape:', targets.shape)
    print('targets.dtype:', targets.dtype)
    print('inputs[0]:', inputs[0])
    print('targets[0]:', targets[0])
    break

inputs.shape: (32, 20000)
inputs.dtype: <dtype: 'float32'>
targets.shape: (32,)
targets.dtype: <dtype: 'int32'>
inputs[0]: tf.Tensor([1. 1. 1. ... 0. 0. 0.], shape=(20000,), dtype=float32)
targets[0]: tf.Tensor(0, shape=(), dtype=int32)


In [31]:
from tensorflow import keras 
from tensorflow.keras import layers

def get_model(max_token=20000, hidden_dim=16):
    inputs = keras.Input(shape=(max_token, ))
    x = layers.Dense(hidden_dim, activation='relu')(inputs)
    x = layers.Dropout(0.5)(x)
    outputs = layers.Dense(1, activation='sigmoid')(x)
    model = keras.Model(inputs, outputs)
    model.compile(optimizer='rmsprop', loss='binary_crossentropy', metrics=['accuracy'])
    return model

In [32]:
# 对一元语法二进制模型进行训练和测试
model = get_model()
model.summary()
callbacks = [
    keras.callbacks.ModelCheckpoint('binary_1gram.keras', save_best_only=True)
]
# .cache()缓存到内存中，可以复用预处理的结果
model.fit(binary_1gram_train_ds.cache(),
          validation_data=binary_1gram_val_ds.cache(),
          epochs=10,
          callbacks=callbacks)
model = keras.models.load_model('binary_1gram.keras')
print(f'Test acc: {model.evaluate(binary_1gram_test_ds)[1]:.3f}')

Model: "model_1"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_10 (InputLayer)       [(None, 20000)]           0         
                                                                 
 dense_19 (Dense)            (None, 16)                320016    
                                                                 
 dropout_1 (Dropout)         (None, 16)                0         
                                                                 
 dense_20 (Dense)            (None, 1)                 17        
                                                                 
Total params: 320,033
Trainable params: 320,033
Non-trainable params: 0
_________________________________________________________________
Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10
Test acc: 0.884


In [33]:
# 二元语法
text_vectorization = TextVectorization(
    ngrams=2,
    max_tokens=20000,
    output_mode='multi_hot'
)
# 对二元语法二进制模型进行训练和测试
text_vectorization.adapt(text_only_train_ds)
binary_2gram_train_ds = train_ds.map(
    lambda x, y: (text_vectorization(x), y),
    num_parallel_calls=4
)
binary_2gram_val_ds = val_ds.map(
    lambda x, y: (text_vectorization(x), y),
    num_parallel_calls=4
)
binary_2gram_test_ds = test_ds.map(
    lambda x, y: (text_vectorization(x), y),
    num_parallel_calls=4
)

model = get_model()
model.summary()
callbacks = [
    keras.callbacks.ModelCheckpoint('binary_2gram.keras', save_best_only=True)
]
# .cache()缓存到内存中，可以复用预处理的结果
model.fit(binary_2gram_train_ds.cache(),
          validation_data=binary_2gram_val_ds.cache(),
          epochs=10,
          callbacks=callbacks)
model = keras.models.load_model('binary_2gram.keras')
print(f'Test acc: {model.evaluate(binary_2gram_test_ds)[1]:.3f}')

Model: "model_2"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_11 (InputLayer)       [(None, 20000)]           0         
                                                                 
 dense_21 (Dense)            (None, 16)                320016    
                                                                 
 dropout_2 (Dropout)         (None, 16)                0         
                                                                 
 dense_22 (Dense)            (None, 1)                 17        
                                                                 
Total params: 320,033
Trainable params: 320,033
Non-trainable params: 0
_________________________________________________________________
Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10
Test acc: 0.891


In [13]:
# 二元语法的TF-IDF编码
text_vectorization = TextVectorization(     
    ngrams=2,     
    max_tokens=20000,     
    output_mode="tf_idf" 
)
# 训练和测试
text_vectorization.adapt(text_only_train_ds)
tfidf_2gram_train_ds = train_ds.map(
    lambda x, y: (text_vectorization(x), y),
    num_parallel_calls=4
)
tfidf_2gram_val_ds = val_ds.map(
    lambda x, y: (text_vectorization(x), y),
    num_parallel_calls=4
)
tfidf_2gram_test_ds = test_ds.map(
    lambda x, y: (text_vectorization(x), y),
    num_parallel_calls=4
)

model = get_model()
model.summary()
callbacks = [
    keras.callbacks.ModelCheckpoint('tfidf_2gram.keras', save_best_only=True)
]
# .cache()缓存到内存中，可以复用预处理的结果
model.fit(tfidf_2gram_train_ds.cache(),
          validation_data=tfidf_2gram_val_ds.cache(),
          epochs=10,
          callbacks=callbacks)
model = keras.models.load_model('tfidf_2gram.keras')
print(f'Test acc: {model.evaluate(tfidf_2gram_test_ds)[1]:.3f}')

Model: "model_3"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_4 (InputLayer)        [(None, 20000)]           0         
                                                                 
 dense_6 (Dense)             (None, 16)                320016    
                                                                 
 dropout_3 (Dropout)         (None, 16)                0         
                                                                 
 dense_7 (Dense)             (None, 1)                 17        
                                                                 
Total params: 320,033
Trainable params: 320,033
Non-trainable params: 0
_________________________________________________________________
Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10
Test acc: 0.888


In [14]:
# 模型包含TextVectorization层
inputs = keras.Input(shape=(1,), dtype='string')
processed_inputs = text_vectorization(inputs)
outputs = model(processed_inputs)
inference_model = keras.Model(inputs, outputs)

import tensorflow as tf
raw_text_data = tf.convert_to_tensor([
    ["That was an excellent movie, I loved it."],
])
predictions = inference_model(raw_text_data)
print(f"{float(predictions[0] * 100):.2f} percent positive")

92.25 percent positive


In [34]:
# 准备序列模型数据集
from tensorflow.keras import layers

max_length = 600
max_tokens = 20000
text_vectorization = layers.TextVectorization(
    max_tokens=max_tokens,
    output_mode='int',
    output_sequence_length=max_length
)
text_vectorization.adapt(text_only_train_ds)
int_train_ds = train_ds.map(
    lambda x, y: (text_vectorization(x), y),
    num_parallel_calls=4
)
int_val_ds = val_ds.map(
    lambda x, y: (text_vectorization(x), y),
    num_parallel_calls=4
)
int_test_ds = test_ds.map(
    lambda x, y: (text_vectorization(x), y),
    num_parallel_calls=4
)

In [16]:
# 构建于one-hot编码的向量序列之上的序列模型
# 双向LSTM + 分类层
import tensorflow as tf
inputs = keras.Input(shape=(None,), dtype='int64')
# 编码为20000维的二进制向量
embedded = tf.one_hot(inputs, depth=max_tokens)
x = layers.Bidirectional(layers.LSTM(32))(embedded)
x = layers.Dropout(0.5)(x)
outputs = layers.Dense(1, activation='sigmoid')(x)
model = keras.Model(inputs, outputs)
model.compile(optimizer='rmsprop', loss='binary_crossentropy', metrics=['accuracy'])
model.summary()

Model: "model_5"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_6 (InputLayer)        [(None, None)]            0         
                                                                 
 tf.one_hot (TFOpLambda)     (None, None, 20000)       0         
                                                                 
 bidirectional (Bidirectiona  (None, 64)               5128448   
 l)                                                              
                                                                 
 dropout_4 (Dropout)         (None, 64)                0         
                                                                 
 dense_8 (Dense)             (None, 1)                 65        
                                                                 
Total params: 5,128,513
Trainable params: 5,128,513
Non-trainable params: 0
_________________________________________________

In [19]:
# 训练一个简单的序列模型
callbacks = [     
    keras.callbacks.ModelCheckpoint("one_hot_bidir_lstm.keras",                                    
                                    save_best_only=True)
]
model.fit(int_train_ds,
          validation_data=int_val_ds, 
          epochs=10,           
          callbacks=callbacks) 
model = keras.models.load_model("one_hot_bidir_lstm.keras")
print(f"Test acc: {model.evaluate(int_test_ds)[1]:.3f}")
# 速度非常慢，因为输入量很大，每个样本均为(600, 20000)
# 测试精度也很低

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10
Test acc: 0.867


In [22]:
# 从头楷书训练一个使用Embedding层的模型
inputs = keras.Input(shape=(None,), dtype='int64')
embedded = layers.Embedding(input_dim=max_tokens, output_dim=256, mask_zero=True)(inputs)
x = layers.Bidirectional(layers.LSTM(32))(embedded)
x = layers.Dropout(0.5)(x)
outputs = layers.Dense(1, activation='sigmoid')(x)
model = keras.Model(inputs, outputs)
model.compile(optimizer='rmsprop', loss='binary_crossentropy', metrics=['accuracy'])
model.summary()
callbacks = [
    keras.callbacks.ModelCheckpoint('embeddings_bidir_gru.keras', save_best_only=True)    
]
model.fit(int_train_ds, validation_data=int_val_ds, epochs=10, callbacks=callbacks)
model = keras.models.load_model('embeddings_bidir_gru.keras')
print(f"Text acc: {model.evaluate(int_test_ds)[1]:.3f}")

Model: "model_7"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_9 (InputLayer)        [(None, None)]            0         
                                                                 
 embedding_1 (Embedding)     (None, None, 256)         5120000   
                                                                 
 bidirectional_2 (Bidirectio  (None, 64)               73984     
 nal)                                                            
                                                                 
 dropout_6 (Dropout)         (None, 64)                0         
                                                                 
 dense_10 (Dense)            (None, 1)                 65        
                                                                 
Total params: 5,194,049
Trainable params: 5,194,049
Non-trainable params: 0
_________________________________________________

In [24]:
# 解析GloVe词嵌入文件
import numpy as np
path_to_glove_file = 'glove.6B.100d.txt'

embeddings_index = {}
with open(path_to_glove_file) as f:
    for line in f:
        word, coefs = line.split(maxsplit=1)
        coefs = np.fromstring(coefs, "f", sep=" ")
        embeddings_index[word] = coefs
print(f'found {len(embeddings_index)} word vectors.')

found 400000 word vectors.


In [25]:
# 构建一个可以加载到Embedding层中的嵌入模型
embedding_dim = 100
vocabulary = text_vectorization.get_vocabulary()
# 单词到其词表索引的映射
word_index = dict(zip(vocabulary, range(len(vocabulary))))

# 准备一个矩阵，后续用GloVe向量填充
embedding_matrix = np.zeros((max_tokens, embedding_dim))
for word, i in word_index.items():
    # 用索引为i的单词的词向量填充矩阵中的第i个元素
    if i < max_tokens:
        embedding_vector = embeddings_index.get(word)
    if embedding_vector is not None:
        embedding_matrix[i] = embedding_vector
# 使用Constant初始化方法在Embedding层中加载预训练词嵌入
embedding_layer = layers.Embedding(
    max_tokens,
    embedding_dim,
    embeddings_initializer=keras.initializers.Constant(embedding_matrix),
    trainable=False,
    mask_zero=True
)

In [26]:
# 使用与训练Embedding层的模型
inputs = keras.Input(shape=(None,), dtype='int64')
embedded = embedding_layer(inputs)
x = layers.Bidirectional(layers.LSTM(32))(embedded)
x = layers.Dropout(0.5)(x) 
outputs = layers.Dense(1, activation="sigmoid")(x)
model = keras.Model(inputs, outputs) 
model.compile(optimizer="rmsprop",               
              loss="binary_crossentropy",               
              metrics=["accuracy"])
model.summary() 
callbacks = [     
    keras.callbacks.ModelCheckpoint("glove_embeddings_sequence_model.keras",                                    
                                    save_best_only=True)
]
model.fit(int_train_ds, validation_data=int_val_ds, epochs=10,           
          callbacks=callbacks) 
model = keras.models.load_model("glove_embeddings_sequence_model.keras")
print(f"Test acc: {model.evaluate(int_test_ds)[1]:.3f}")

Model: "model_8"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_10 (InputLayer)       [(None, None)]            0         
                                                                 
 embedding_2 (Embedding)     (None, None, 100)         2000000   
                                                                 
 bidirectional_3 (Bidirectio  (None, 64)               34048     
 nal)                                                            
                                                                 
 dropout_7 (Dropout)         (None, 64)                0         
                                                                 
 dense_11 (Dense)            (None, 1)                 65        
                                                                 
Total params: 2,034,113
Trainable params: 34,113
Non-trainable params: 2,000,000
____________________________________________

In [16]:
# Transformer编码器
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers

class TransformerEncoder(layers.Layer):
    def __init__(self, embed_dim, dense_dim, num_heads, **kwargs):
        super().__init__(**kwargs)
        # 输入词元向量的尺寸
        self.embed_dim = embed_dim
        self.dense_dim = dense_dim
        self.num_heads = num_heads
        self.attention = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim
        )
        self.dense_proj = keras.Sequential(
            [layers.Dense(dense_dim, activation='relu'),
             layers.Dense(embed_dim,)]
        )
        self.layernorm_1 = layers.LayerNormalization()
        self.layernorm_2 = layers.LayerNormalization()

    def call(self, inputs, mask=None):
        # 增加掩码维数
        if mask is not None:
            mask = mask[: tf.newaxis, :]
        attention_output = self.attention(
            inputs, inputs, attention_mask=mask
        )
        proj_input = self.layernorm_1(inputs + attention_output)
        proj_output = self.dense_proj(proj_input)
        return self.layernorm_2(proj_input + proj_output)
    
    # 实现序列化，以便保存模型
    def get_config(self):
        config = super().get_config()
        config.update({
            "embed_dim": self.embed_dim,
            "num_heads": self.num_heads,
            "dense_dim": self.dense_dim
        })
        return config

In [36]:
# 将Transform编码器用于文本分类
vocab_size = 20000
embed_dim = 256
num_heads = 2
dense_dim = 32

inputs = keras.Input(shape=(None,), dtype='int64')
x = layers.Embedding(vocab_size, embed_dim)(inputs)
x = TransformerEncoder(embed_dim, dense_dim, num_heads)(x)
# 将每个序列转换为单个向量
x = layers.GlobalMaxPool1D()(x)
x = layers.Dropout(0.5)(x)
outputs = layers.Dense(1, activation='sigmoid')(x)
model = keras.Model(inputs, outputs)
model.compile(optimizer='rmsprop', loss='binary_crossentropy', metrics=['accuracy'])
model.summary()

Model: "model_3"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_12 (InputLayer)       [(None, None)]            0         
                                                                 
 embedding_9 (Embedding)     (None, None, 256)         5120000   
                                                                 
 transformer_encoder_9 (Tran  (None, None, 256)        543776    
 sformerEncoder)                                                 
                                                                 
 global_max_pooling1d_1 (Glo  (None, 256)              0         
 balMaxPooling1D)                                                
                                                                 
 dropout_3 (Dropout)         (None, 256)               0         
                                                                 
 dense_25 (Dense)            (None, 1)                 257 

In [37]:
# 训练并评估基于Transformer编码器的模型
callbacks = [     
    keras.callbacks.ModelCheckpoint("transformer_encoder.keras",                                     
                                    save_best_only=True)
]
model.fit(int_train_ds, 
          validation_data=int_val_ds, 
          epochs=20,           
          callbacks=callbacks) 
model = keras.models.load_model(     
    "transformer_encoder.keras",     
    custom_objects={"TransformerEncoder": TransformerEncoder}) 
print(f"Test acc: {model.evaluate(int_test_ds)[1]:.3f}")

Epoch 1/20


Epoch 2/20
Epoch 3/20
Epoch 4/20
Epoch 5/20
Epoch 6/20
Epoch 7/20
Epoch 8/20
Epoch 9/20
Epoch 10/20
Epoch 11/20
Epoch 12/20
Epoch 13/20
Epoch 14/20
Epoch 15/20
Epoch 16/20
Epoch 17/20
Epoch 18/20
Epoch 19/20
Epoch 20/20
Test acc: 0.883


In [46]:
# 位置嵌入    
class PositionalEmbedding(layers.Layer):
    def __init__(self, sequence_length, input_dim, output_dim, **kwargs):
        super().__init__(**kwargs)
        self.token_embeddings = layers.Embedding(
            input_dim=input_dim, output_dim=output_dim)
        self.position_embeddings = layers.Embedding(
            input_dim=sequence_length, output_dim=output_dim)
        self.sequence_length = sequence_length
        self.input_dim = input_dim
        self.output_dim = output_dim

    def call(self, inputs):
        length = tf.shape(inputs)[-1]
        positions = tf.range(start=0, limit=length, delta=1)
        embedded_tokens = self.token_embeddings(inputs)
        embedded_positions = self.position_embeddings(positions)
        return embedded_tokens + embedded_positions

    # 生成掩码，从而可以忽略输入中填充的0
    def compute_mask(self, inputs, mask=None):
        return tf.math.not_equal(inputs, 0)

    def get_config(self):
        config = super().get_config()
        config.update({
            "output_dim": self.output_dim,
            "sequence_length": self.sequence_length,
            "input_dim": self.input_dim,
        })
        return config

In [47]:
# 文本分类Transformer
vocab_size = 20000
sequence_length = 600
embed_dim = 256
num_heads = 2
dense_dim = 32

inputs = keras.Input(shape=(None,), dtype='int64')
x = PositionalEmbedding(sequence_length, vocab_size, embed_dim)(inputs)
x = TransformerEncoder(embed_dim, dense_dim, num_heads)(x)
# 将每个序列转换为单个向量
x = layers.GlobalMaxPool1D()(x)
x = layers.Dropout(0.5)(x)
outputs = layers.Dense(1, activation='sigmoid')(x)
model = keras.Model(inputs, outputs)
model.compile(optimizer='rmsprop', loss='binary_crossentropy', metrics=['accuracy'])
model.summary()

Model: "model_7"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_16 (InputLayer)       [(None, None)]            0         
                                                                 
 positional_embedding_3 (Pos  (None, None, 256)        5273600   
 itionalEmbedding)                                               
                                                                 
 transformer_encoder_13 (Tra  (None, None, 256)        543776    
 nsformerEncoder)                                                
                                                                 
 global_max_pooling1d_5 (Glo  (None, 256)              0         
 balMaxPooling1D)                                                
                                                                 
 dropout_7 (Dropout)         (None, 256)               0         
                                                           

In [None]:
callbacks = [     
    keras.callbacks.ModelCheckpoint("full_transformer_encoder.keras",                                     
                                    save_best_only=True)
]
model.fit(int_train_ds, 
          validation_data=int_val_ds, 
          epochs=20,           
          callbacks=callbacks) 
model = keras.models.load_model(     
    "full_transformer_encoder.keras",
    # 加载过程中提供自定义的类     
    custom_objects={"TransformerEncoder": TransformerEncoder, 
                    "PositionalEmbedding": PositionalEmbedding}) 
print(f"Test acc: {model.evaluate(int_test_ds)[1]:.3f}")

### 机器翻译任务

In [52]:
text_file = 'spa-eng/spa.txt'
with open(text_file) as f:
    lines = f.read().split('\n')[:-1]
text_pairs = []
for line in lines:
    english, spanish = line.split('\t')
    spanish = '[start] ' + spanish + ' [end]'
    text_pairs.append((english, spanish))

import random
print(random.choice(text_pairs))

("What if things don't work out?", '[start] ¿Qué pasa si no lo conseguimos? [end]')


In [53]:
# 划分训练集、验证集、测试集
import random
random.shuffle(text_pairs)
num_val_samples = int(0.15 * len(text_pairs))
num_train_samples = len(text_pairs) - 2 * num_val_samples
train_pairs = text_pairs[:num_train_samples]
val_pairs = text_pairs[num_train_samples: num_train_samples + num_val_samples]
test_pairs = text_pairs[num_train_samples + num_val_samples:]

In [55]:
# 向量化
import tensorflow as tf
import string
import re

# 保留[和]，去掉¿
strip_chars = string.punctuation + "¿"
strip_chars = strip_chars.replace("[", "")
strip_chars = strip_chars.replace("]", "")

def custom_standardization(input_string):
    lowercase = tf.strings.lower(input_string)
    return tf.strings.regex_replace(
        lowercase, f"[{re.escape(strip_chars)}]", ""
    )

# 15000个最常见的单词，句子长度限制为20个单词
vocab_size = 15000
sequence_length = 20
# 英语层
source_vectorization = layers.TextVectorization(
    max_tokens=vocab_size,
    output_mode='int',
    output_sequence_length=sequence_length
)
# 西班牙语层
target_vectorization = layers.TextVectorization(
    max_tokens=vocab_size,
    output_mode='int',
    output_sequence_length=sequence_length + 1,
    standardize=custom_standardization
)
train_english_texts = [pair[0] for pair in train_pairs]
train_spanish_texts = [pair[1] for pair in train_pairs]
source_vectorization.adapt(train_english_texts)
target_vectorization.adapt(train_spanish_texts)

In [56]:
# 准备翻译任务的数据集
batch_size = 64

def format_dataset(eng, spa):
    eng = source_vectorization(eng)
    spa = target_vectorization(spa)
    return ({
        "english": eng,
        "spanish": spa[:, :-1]
    }, spa[:, 1:])

def make_dataset(pairs):
    eng_texts, spa_texts = zip(*pairs)
    eng_texts = list(eng_texts)
    spa_texts = list(spa_texts)
    dataset = tf.data.Dataset.from_tensor_slices((eng_texts, spa_texts))
    dataset = dataset.batch(batch_size)
    dataset = dataset.map(format_dataset, num_parallel_calls=4)
    return dataset.shuffle(2048).prefetch(16).cache()

train_ds = make_dataset(train_pairs)
val_ds = make_dataset(val_pairs)

In [57]:
for inputs, targets in train_ds.take(1):
    print(f"inputs['english'].shape: {inputs['english'].shape}")
    print(f"inputs['spanish'].shape: {inputs['spanish'].shape}")
    print(f'targets.shape: {targets.shape}')

inputs['english'].shape: (64, 20)
inputs['spanish'].shape: (64, 20)
targets.shape: (64, 20)


In [58]:
inputs = keras.Input(shape=(sequence_length, ), dtype='int64')
x = layers.Embedding(input_dim=vocab_size, output_dim=128)(inputs)
x = layers.LSTM(32, return_sequences=True)(x)
outputs = layers.Dense(vocab_size, activation='softmax')(x)
model = keras.Model(inputs, outputs)

In [61]:
# GRU编码器
embed_dim = 256
latent_dim = 1024

source = keras.Input(shape=(None,), dtype='int64', name='english')
x = layers.Embedding(vocab_size, embed_dim, mask_zero=True)(source)
# 编码后的源句子即为双向GRU的最后一个输出
encoded_source = layers.Bidirectional(
    layers.GRU(latent_dim), merge_mode='sum'
)(x)

# GRU解码器
past_target = keras.Input(shape=(None,), dtype='int64', name='spanish')
x = layers.Embedding(vocab_size, embed_dim, mask_zero=True)(past_target)
decoder_gru = layers.GRU(latent_dim, return_sequences=True)
x = decoder_gru(x, initial_state=encoded_source)
x = layers.Dropout(0.5)(x)
# 预测下一个词元
target_next_step = layers.Dense(vocab_size, activation='softmax')(x)
# 端到端模型：源句子 和 目标句子 映射为 便宜一个时间步的目标句子
seq2seq_rnn = keras.Model([source, past_target], target_next_step)

In [62]:
# 训练
seq2seq_rnn.compile(
    optimizer='rmsprop',
    loss='sparse_categorical_crossentropy',
    metrics=['accuracy']
)
seq2seq_rnn.fit(train_ds, epochs=15, validation_data=val_ds)

Epoch 1/15
Epoch 2/15
Epoch 3/15
Epoch 4/15
Epoch 5/15
Epoch 6/15
Epoch 7/15
Epoch 8/15
Epoch 9/15
Epoch 10/15
Epoch 11/15
Epoch 12/15
Epoch 13/15
Epoch 14/15
Epoch 15/15


<keras.callbacks.History at 0x2ec8eee1300>

In [63]:
# 翻译新句子
import numpy as np

# 准备一个字典，将词元索引预测值映射为字符串词元
spa_vocab = target_vectorization.get_vocabulary()
spa_index_lookup = dict(zip(range(len(spa_vocab)), spa_vocab))

max_decoded_sentence_length = 20
def decode_sequence(input_sentence):
    tokenized_input_sentence = source_vectorization([input_sentence])
    decoded_sentence = '[start]'
    for i in range(max_decoded_sentence_length):
        tokenized_target_sentence = target_vectorization([decoded_sentence])
        next_token_predictions = seq2seq_rnn.predict(
            [tokenized_input_sentence, tokenized_target_sentence]
        )
        sampled_token_index = np.argmax(next_token_predictions[0, i, :])
        sampled_token = spa_index_lookup[sampled_token_index]
        decoded_sentence += " " + sampled_token
        if sampled_token == '[end]':
            break
    return decoded_sentence

test_eng_texts = [pair[0] for pair in test_pairs]
for _ in range(20):
    input_sentence = random.choice(test_eng_texts)
    print('-')
    print(input_sentence)
    print(decode_sequence(input_sentence))

-
He can scarcely write his name.
[start] Él puede haber [UNK] su nombre [end]
-
All the girls helped each other.
[start] todos los se nos la gente se una vez [end]
-
Tom asked Mary for a loan.
[start] tom pidió a mary que le una [UNK] [end]
-
We don't like this house.
[start] no nos gusta esta casa [end]
-
We're professors.
[start] somos [UNK] [end]
-
Why do the five yen coin and the fifty yen coin have holes in the center?
[start] por qué las cinco de los y el cinco años se tiene el que el las que en lo [UNK]
-
What's your favorite alcoholic beverage?
[start] cuál es tu canción favorita [end]
-
Try this one.
[start] [UNK] este [end]
-
This is the worst book I've ever read.
[start] este es el libro que he [UNK] hasta leer [end]
-
His dream has finally come true.
[start] su sueño se ha hecho realidad [end]
-
I don't even want to know.
[start] ni siquiera quiero saber [end]
-
We start classes next Monday.
[start] a las escuela de el fin de semana el lunes [end]
-
Tom is creepy.
[start] 

### Transformer解码器

In [67]:
class TransformerDecoder(layers.Layer):
    def __init__(self, embed_dim, dense_dim, num_heads, **kwargs):
        super().__init__(**kwargs)
        self.embed_dim = embed_dim
        self.dense_dim = dense_dim
        self.num_heads = num_heads
        self.attention_1 = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim
        )
        self.attention_2 = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim
        )
        self.dense_proj = keras.Sequential(
            [layers.Dense(dense_dim, activation='relu'),
             layers.Dense(embed_dim),]
        )
        self.layernorm_1 = layers.LayerNormalization()
        self.layernorm_2 = layers.LayerNormalization()
        self.layernorm_3 = layers.LayerNormalization()
        self.supports_masking = True

    def get_config(self):         
        config = super().get_config()         
        config.update({             
            "embed_dim": self.embed_dim,             
            "num_heads": self.num_heads,             
            "dense_dim": self.dense_dim,         
        }) 
        return config
    
    # 生成因果掩码
    def get_causal_attention_mask(self, inputs):
        input_shape = tf.shape(inputs)
        batch_size, sequence_length = input_shape[0], input_shape[1]
        i = tf.range(sequence_length)[:, tf.newaxis]
        j = tf.range(sequence_length)
        # 生成形状为(sequence_length,  sequence_length)的矩阵，其中一半为1，另一半为0
        mask = tf.cast(i >= j, dtype='int32')
        mask = tf.reshape(mask, (1, input_shape[1], input_shape[1]))
        mult = tf.concat(
            [tf.expand_dims(batch_size, -1),
            tf.constant([1, 1], dtype=tf.int32)],
            axis=0
        )
        return tf.tile(mask, mult)
    
    def call(self, inputs, encoder_outputs, mask=None):
        # 因果掩码
        causal_mask = self.get_causal_attention_mask(inputs)
        if mask is not None:
            # 输入掩码
            padding_mask = tf.cast(mask[:, tf.newaxis, :], dtype='int32')
            # 将两个掩码合并
            padding_mask = tf.minimum(padding_mask, causal_mask)
        attention_output_1 = self.attention_1(
            query=inputs,
            value=inputs,
            key=inputs,
            attention_mask=causal_mask
        )
        attention_output_1 = self.layernorm_1(inputs + attention_output_1)
        attention_output_2 = self.attention_2(
            query=attention_output_1,
            value=encoder_outputs,
            key=encoder_outputs,
            attention_mask=padding_mask,
        )
        attention_output_2 = self.layernorm_2(attention_output_1 + attention_output_2)
        proj_output = self.dense_proj(attention_output_2)
        return self.layernorm_3(attention_output_2 + proj_output)

In [68]:
# 端到端Transformer
embed_dim = 256
dense_dim = 2048
num_heads = 8

encoder_inputs = keras.Input(shape=(None,), dtype='int64', name='english')
x = PositionalEmbedding(sequence_length, vocab_size, embed_dim)(encoder_inputs)
encoder_outputs = TransformerEncoder(embed_dim, dense_dim, num_heads)(x)

decoder_inputs = keras.Input(shape=(None,), dtype='int64', name='spanish')
x = PositionalEmbedding(sequence_length, vocab_size, embed_dim)(decoder_inputs)
x = TransformerDecoder(embed_dim, dense_dim, num_heads)(x, encoder_outputs)
x = layers.Dropout(0.5)(x)
decoder_outputs = layers.Dense(vocab_size, activation='softmax')(x)
transformer = keras.Model([encoder_inputs, decoder_inputs], decoder_outputs)
transformer.summary()

Model: "model_12"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 english (InputLayer)           [(None, None)]       0           []                               
                                                                                                  
 spanish (InputLayer)           [(None, None)]       0           []                               
                                                                                                  
 positional_embedding_5 (Positi  (None, None, 256)   3845120     ['english[0][0]']                
 onalEmbedding)                                                                                   
                                                                                                  
 positional_embedding_6 (Positi  (None, None, 256)   3845120     ['spanish[0][0]']         

In [None]:
transformer.compile(     
    optimizer="rmsprop",     
    loss="sparse_categorical_crossentropy",    
      metrics=["accuracy"]) 
transformer.fit(train_ds, epochs=30, validation_data=val_ds)

In [None]:
# 利用transformer模型翻译新句子
spa_vocab = target_vectorization.get_vocabulary()
spa_index_lookup = dict(zip(range(len(spa_vocab)), spa_vocab))
max_decoded_sentence_length = 20

def decode_sequence(input_sentence):     
    tokenized_input_sentence = source_vectorization([input_sentence])    
    decoded_sentence = "[start]" 
    for i in range(max_decoded_sentence_length):         
        tokenized_target_sentence = target_vectorization(            
            [decoded_sentence])[:, :-1]  
        # 对下一个词元进行采样       
        predictions = transformer(             
            [tokenized_input_sentence, tokenized_target_sentence])        
        sampled_token_index = np.argmax(predictions[0, i, :])        
        sampled_token = spa_index_lookup[sampled_token_index]        
        decoded_sentence += " " + sampled_token          
        if sampled_token == "[end]": 
            break
    return decoded_sentence

test_eng_texts = [pair[0] for pair in test_pairs] 
for _ in range(20):     
    input_sentence = random.choice(test_eng_texts)  
    print("-")  
    print(input_sentence)  
    print(decode_sequence(input_sentence))