In [75]:
import os
os.environ['KERAS_BACKEND'] = 'tensorflow'
import keras
import tensorflow as tf
import pathlib
import random
import string
import re
gpus = tf.config.experimental.list_physical_devices("GPU")
for gpu in gpus:
    tf.config.experimental.set_memory_growth(gpu, True)

In [143]:
# 下载数据
text_file = keras.utils.get_file( fname="spa-eng.zip",  origin="http://storage.googleapis.com/download.tensorflow.org/data/spa-eng.zip",  extract=True, )
text_file = pathlib.Path(text_file).parent / "spa-eng" / "spa.txt"
# 读取所有行，在每一行的 spa 前后加入特殊开始和结束字符
with open(text_file, "r", encoding="utf-8") as f:
	lines = f.readlines()
text_pairs = []
for line in lines:
	eng,spa = line.strip().split("\t")
	spa = "[start] " + spa + " [end]"
	text_pairs.append((eng, spa))
# 打散、选取训练集、验证集、测试集
random.shuffle(text_pairs)
num_val_samples = int(0.15*len(text_pairs))
num_train_samples = len(text_pairs) - 2* num_val_samples
train_pairs = text_pairs[:num_train_samples]
val_pairs = text_pairs[num_train_samples: num_train_samples + num_val_samples]
test_pairs = text_pairs[num_train_samples+num_val_samples:]
for e,s in train_pairs[:3]:
    print(f"英文：{e}，西班牙文：{s}")
print()

# 特殊字符
strip_chars = string.punctuation  + "¿"
strip_chars = strip_chars.replace("[", "")
strip_chars = strip_chars.replace("]", "")   # '!"#$%&\'()*+,-./:;<=>?@\\^_`{|}~¿'
vocab_size = 15000
sequence_length = 20
batch_size = 64
# 英文和西班牙的两个向量化工具
def custom_standardization(input_string):
    lowercase = tf.strings.lower(input_string)
    return tf.strings.regex_replace(lowercase, "[%s]" % re.escape(strip_chars), "")   # '[!"\\#\\$%\\&\'\\(\\)\\*\\+,\\-\\./:;<=>\\?@\\\\\\^_`\\{\\|\\}\\~¿]'
eng_vectorization = tf.keras.layers.TextVectorization(max_tokens=vocab_size, output_mode="int", output_sequence_length=sequence_length)
spa_vectorization = tf.keras.layers.TextVectorization(max_tokens=vocab_size, output_mode="int", output_sequence_length=sequence_length + 1, standardize=custom_standardization)
eng_vectorization.adapt([e for e,s in train_pairs])
spa_vectorization.adapt([s for e,s in train_pairs])
print("英文词典有",eng_vectorization.vocabulary_size())
print("西班牙文词典有",spa_vectorization.vocabulary_size())
print()

# 制作 dataset
def format_dataset(eng, spa):
    eng = eng_vectorization(eng)
    spa = spa_vectorization(spa)
    return {"encoder_inputs": eng, "decoder_inputs": spa[:,:-1]}, spa[:, 1:]

def make_dataset(pairs):
    eng_texts, spa_texts = zip(*pairs)
    eng_texts = list(eng_texts)
    spa_texts = list(spa_texts)
    dataset = tf.data.Dataset.from_tensor_slices((eng_texts, spa_texts))
    return dataset.batch(batch_size).map(format_dataset).cache().shuffle(2048).prefetch(16)

train_ds = make_dataset(train_pairs)
val_ds = make_dataset(val_pairs)
for inputs, targets in train_ds.take(1):
    print(f'inputs["encoder_inputs"].shape: {inputs["encoder_inputs"].shape}')
    print(f'inputs["decoder_inputs"].shape: {inputs["decoder_inputs"].shape}')
    print(f"targets.shape: {targets.shape}")
    print()
    print(inputs["encoder_inputs"][:3])
    print(inputs["decoder_inputs"][:3])
    print(targets[:3])

英文：The natives have to defend their land against invaders.，西班牙文：[start] Los nativos tienen que defender su tierra de los invasores. [end]
英文：Every one of us were given three hundred dollars.，西班牙文：[start] A todos nosotros nos dieron trescientos dólares. [end]
英文：Guess what happened to me.，西班牙文：[start] Adivina qué me pasó. [end]

英文词典有 12098
西班牙文词典有 15000

inputs["encoder_inputs"].shape: (64, 20)
inputs["decoder_inputs"].shape: (64, 20)
targets.shape: (64, 20)

tf.Tensor(
[[  15    5  238   24    3 1035    5    0    0    0    0    0    0    0
     0    0    0    0    0    0]
 [   6  277   88   26  211   42 4891    0    0    0    0    0    0    0
     0    0    0    0    0    0]
 [ 372   17   28 1342   79    0    0    0    0    0    0    0    0    0
     0    0    0    0    0    0]], shape=(3, 20), dtype=int64)
tf.Tensor(
[[   2 1491   17    5   28 5891    3    0    0    0    0    0    0    0
     0    0    0    0    0    0]
 [   2    8 2182   85 8842   30   87 6034    3    0    0    0   

#### 1. 先在西班牙文的句子前后加特殊的开始和结束标志
#### 2. 向量化
#### 3. 使用 0 补充不足或者截断超出的句子
#### 4. [:-1] 为decoder输入，[1:] 为decoder输出，最后的结果不一定有结束标识

In [141]:
# 如果长度为 5
for inputs, targets in train_ds.take(1):
    print(f'inputs["encoder_inputs"].shape: {inputs["encoder_inputs"].shape}')
    print(f'inputs["decoder_inputs"].shape: {inputs["decoder_inputs"].shape}')
    print(f"targets.shape: {targets.shape}")
    print()
    print(inputs["encoder_inputs"][:3])
    print(inputs["decoder_inputs"][:3])
    print(targets[:3])

inputs["encoder_inputs"].shape: (64, 5)
inputs["decoder_inputs"].shape: (64, 5)
targets.shape: (64, 5)

tf.Tensor(
[[   5  190   40   63   54]
 [  41    5  100  172    0]
 [   2 7692  522  435 2856]], shape=(3, 5), dtype=int64)
tf.Tensor(
[[   2  106  276   99   32]
 [   2  106 6568    3    0]
 [   2    9 9132  697 2554]], shape=(3, 5), dtype=int64)
tf.Tensor(
[[ 106  276   99   32   70]
 [ 106 6568    3    0    0]
 [   9 9132  697 2554 3629]], shape=(3, 5), dtype=int64)


In [224]:
class TransformerEncoder(tf.keras.layers.Layer):
    def __init__(self, embed_dim, dense_dim, num_heads, **kwargs):
        super().__init__(**kwargs)
        self.embed_dim = embed_dim
        self.dense_dim = dense_dim
        self.num_heads = num_heads
        self.attention = tf.keras.layers.MultiHeadAttention(num_heads=num_heads, key_dim=embed_dim)
        self.dense_proj = tf.keras.Sequential([
            tf.keras.layers.Dense(dense_dim, activation="relu"),
            tf.keras.layers.Dense(embed_dim)
        ])
        self.layernorm_1 = tf.keras.layers.LayerNormalization()
        self.layernorm_2 = tf.keras.layers.LayerNormalization()
        self.supports_masking = True
        
    def call(self, inputs, mask=None):
        attention_output = self.attention(query=inputs, value=inputs, key=inputs)
        proj_input = self.layernorm_1(inputs + attention_output)
        proj_output = self.dense_proj(proj_input)
        return self.layernorm_2(proj_input + proj_output)

    def get_config(self):
        config = super().get_config()
        config.update({"embed_dim":self.embed_dim, "dense_dim":self.dense_dim, "num_heads":self.num_heads})
        return config

In [156]:
class PositionalEmbedding(tf.keras.layers.Layer):
    def __init__(self, sequence_length, vocab_size, embed_dim, **kwargs):
        super().__init__(**kwargs)
        self.token_embeddings = tf.keras.layers.Embedding(input_dim=vocab_size, output_dim=embed_dim)
        self.position_embeddings = tf.keras.layers.Embedding(input_dim=sequence_length, output_dim=embed_dim)
        self.sequence_length = sequence_length
        self.vocab_size = vocab_size
        self.embed_dim = embed_dim

    def call(self, inputs):
        length = tf.shape(inputs)[-1]
        positions = tf.range(0, length, 1)
        embedded_tokens = self.token_embeddings(inputs)
        embedded_positions = self.position_embeddings(positions)
        return embedded_tokens + embedded_positions

    def compute_mask(self, inputs, mask=None):
        if mask is None:
            return None
        else:
            return tf.not_equal(inputs, 0)

    def get_config(self):
        config = super().get_config()
        config.update({"sequence_length":self.sequence_length, "vocab_size":self.vocab_size, "embed_dim":self.embed_dim})
        return config

In [226]:
class TransformerDecoder(tf.keras.layers.Layer):
    def __init__(self, embed_dim, latent_dim, num_heads, **kwargs):
        super().__init__(**kwargs)
        self.embed_dim = embed_dim
        self.latent_dim = latent_dim
        self.num_heads = num_heads
        self.attention_1 = tf.keras.layers.MultiHeadAttention(num_heads=num_heads, key_dim=embed_dim)
        self.attention_2 = tf.keras.layers.MultiHeadAttention(num_heads=num_heads, key_dim=embed_dim)
        self.dense_proj = tf.keras.Sequential([
            tf.keras.layers.Dense(latent_dim, activation='relu'),
            tf.keras.layers.Dense(embed_dim)
        ])
        self.layernorm_1 = tf.keras.layers.LayerNormalization()
        self.layernorm_2 = tf.keras.layers.LayerNormalization()
        self.layernorm_3 = tf.keras.layers.LayerNormalization()
        self.supports_masking = True

    def call(self, inputs, encoder_outputs, mask=None):
        attention_output_1 = self.attention_1(query=inputs, value=inputs, key=inputs, use_causal_mask=True)
        out_1 = self.layernorm_1(inputs + attention_output_1)
        attention_output_2 = self.attention_2(query=out_1, value=encoder_outputs, key=encoder_outputs)
        out_2 = self.layernorm_2(out_1 + attention_output_2)
        proj_output = self.dense_proj(out_2)
        return self.layernorm_3(out_2 + proj_output)

    def get_config(self):
        config = super().get_config()
        config.update({"embed_dim":self.embed_dim, "latent_dim":self.latent_dim, "num_heads":self.num_heads})
        return config

In [230]:
embed_dim = 256
latent_dim = 2048
num_heads = 8

encoder_inputs = keras.Input(shape=(None,), dtype="int64", name="encoder_inputs")
x = PositionalEmbedding(sequence_length, vocab_size, embed_dim)(encoder_inputs)
encoder_outputs = TransformerEncoder(embed_dim, latent_dim, num_heads)(x)
encoder = keras.Model(encoder_inputs, encoder_outputs)

decoder_inputs = keras.Input(shape=(None,), dtype="int64", name="decoder_inputs")
encoded_seq_inputs = keras.Input(shape=(None, embed_dim), name="decoder_state_inputs")
x = PositionalEmbedding(sequence_length, vocab_size, embed_dim)(decoder_inputs)
x = TransformerDecoder(embed_dim, latent_dim, num_heads)(x, encoded_seq_inputs)
x = tf.keras.layers.Dropout(0.5)(x)
decoder_outputs = tf.keras.layers.Dense(vocab_size, activation="softmax")(x)
decoder = keras.Model([decoder_inputs, encoded_seq_inputs], decoder_outputs)

decoder_outputs = decoder([decoder_inputs, encoder_outputs])
transformer = keras.Model([encoder_inputs, decoder_inputs], decoder_outputs, name="transformer")
transformer.summary()

Model: "transformer"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 encoder_inputs (InputLayer)    [(None, None)]       0           []                               
                                                                                                  
 positional_embedding_12 (Posit  (None, None, 256)   3845120     ['encoder_inputs[0][0]']         
 ionalEmbedding)                                                                                  
                                                                                                  
 decoder_inputs (InputLayer)    [(None, None)]       0           []                               
                                                                                                  
 transformer_encoder_20 (Transf  (None, None, 256)   3155456     ['positional_embedding_

In [232]:
epochs = 100 
transformer.compile("adam", loss="sparse_categorical_crossentropy", metrics=["accuracy"] )
transformer.fit(train_ds, epochs=epochs, validation_data=val_ds, callbacks=[tf.keras.callbacks.EarlyStopping(patience=3)])

Epoch 1/100
Epoch 2/100
Epoch 3/100
Epoch 4/100
Epoch 5/100
Epoch 6/100
Epoch 7/100
Epoch 8/100
Epoch 9/100
Epoch 10/100
Epoch 11/100
Epoch 12/100
Epoch 13/100
Epoch 14/100
Epoch 15/100
Epoch 16/100


<keras.callbacks.History at 0x151a7073910>

In [240]:
spa_vocab = spa_vectorization.get_vocabulary()
spa_index_lookup = dict(zip(range(len(spa_vocab)), spa_vocab))
max_decoded_sentence_length = 20
def decode_sequence(input_sentence):
    tokenized_input_sentence = eng_vectorization([input_sentence])
    decoded_sentence = "[start]"
    for i in range(max_decoded_sentence_length):
        tokenized_target_sentence = spa_vectorization([decoded_sentence])[:, :-1]
        predictions = transformer([tokenized_input_sentence, tokenized_target_sentence])
        sampled_token_index = tf.argmax(predictions[0, i, :]).numpy()
        sampled_token = spa_index_lookup[sampled_token_index]
        decoded_sentence += " " + sampled_token

        if sampled_token == "[end]":
            break
    return decoded_sentence


test_eng_texts = [pair[0] for pair in test_pairs]
for i in range(3):
    print(i)
    input_sentence = test_eng_texts[i]
    print("英文：",input_sentence)
    translated = decode_sequence(input_sentence)
    print("推理：",translated)

0
英文： Do you speak French?
推理： [start] hablas usted francés [end]
1
英文： Tom became desperate.
推理： [start] tom se volvió celoso [end]
2
英文： The police say there's someone pulling string behind the scenes.
推理： [start] la policía dice que alguien lleva camisetas detrás del escenario [end]
