In [6]:
# 在Google Colab中运行时，请确保上传文件到适当的目录
zh_file_path = '/mnt/raw.zh'
en_file_path = '/mnt/raw.en'

# 读取数据
with open(zh_file_path, 'r', encoding='utf-8') as f:
    raw_zh = f.readlines()

with open(en_file_path, 'r', encoding='utf-8') as f:
    raw_en = f.readlines()

print(f"Loaded {len(raw_zh)} Chinese sentences and {len(raw_en)} English sentences.")


Loaded 394066 Chinese sentences and 394066 English sentences.


In [7]:
import tensorflow as tf
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences

# 创建分词器
tokenizer_zh = Tokenizer(filters='')
tokenizer_en = Tokenizer(filters='')

# 训练分词器
tokenizer_zh.fit_on_texts(raw_zh)
tokenizer_en.fit_on_texts(raw_en)

# 将文本转换为序列
seqs_zh = tokenizer_zh.texts_to_sequences(raw_zh)
seqs_en = tokenizer_en.texts_to_sequences(raw_en)

# 填充序列
seqs_zh = pad_sequences(seqs_zh, padding='post')
seqs_en = pad_sequences(seqs_en, padding='post')

# 创建训练和测试数据
train_size = int(0.8 * len(seqs_zh))
X_train, X_test = seqs_zh[:train_size], seqs_zh[train_size:]
y_train, y_test = seqs_en[:train_size], seqs_en[train_size:]

# 将标签进行one-hot编码
vocab_size_en = len(tokenizer_en.word_index) + 1
y_train = pad_sequences(y_train, padding='post', value=0)
y_test = pad_sequences(y_test, padding='post', value=0)


In [8]:
from tensorflow.keras.utils import Sequence
import numpy as np

class DataGenerator(Sequence):
    def __init__(self, X, y, batch_size=16):
        self.X = X
        self.y = y
        self.batch_size = batch_size
        self.indices = np.arange(len(X))
        self.on_epoch_end()

    def __len__(self):
        return int(np.ceil(len(self.X) / self.batch_size))

    def __getitem__(self, index):
        indices = self.indices[index * self.batch_size:(index + 1) * self.batch_size]
        X_batch = self.X[indices]
        y_batch = self.y[indices]
        decoder_input_data = np.zeros_like(y_batch)
        decoder_input_data[:, 1:] = y_batch[:, :-1]
        decoder_target_data = y_batch[:, :, np.newaxis]
        return [X_batch, decoder_input_data], decoder_target_data[:, :, 0]

    def on_epoch_end(self):
        np.random.shuffle(self.indices)


In [10]:
from tensorflow.keras.layers import MultiHeadAttention, LayerNormalization, Dropout, Dense, Embedding, Input
from tensorflow.keras.models import Model

def transformer_encoder(inputs, head_size, num_heads, ff_dim, dropout=0):
    x = inputs
    x = MultiHeadAttention(key_dim=head_size, num_heads=num_heads, dropout=dropout)(x, x)
    x = LayerNormalization(epsilon=1e-6)(x)
    res = x
    x = Dense(ff_dim, activation="relu")(x)
    x = Dense(inputs.shape[-1])(x)
    x = Dropout(dropout)(x)
    return LayerNormalization(epsilon=1e-6)(x + res)

def build_transformer_model(input_shape, vocab_size_zh, vocab_size_en, num_heads=4, ff_dim=32, num_transformer_blocks=2):
    encoder_inputs = Input(shape=input_shape)
    decoder_inputs = Input(shape=(None,))

    # 编码器部分
    x = Embedding(vocab_size_zh, 64)(encoder_inputs)
    for _ in range(num_transformer_blocks):
        x = transformer_encoder(x, head_size=64, num_heads=num_heads, ff_dim=ff_dim)
    encoder_outputs = LayerNormalization(epsilon=1e-6)(x)

    # 解码器部分
    y = Embedding(vocab_size_en, 64)(decoder_inputs)
    for _ in range(num_transformer_blocks):
        y = transformer_encoder(y, head_size=64, num_heads=num_heads, ff_dim=ff_dim)

    # 解码器的输出
    decoder_outputs = Dense(vocab_size_en, activation="softmax")(y)

    return Model([encoder_inputs, decoder_inputs], decoder_outputs)

# 构建Transformer模型
input_shape = (X_train.shape[1],)
transformer_model = build_transformer_model(input_shape, len(tokenizer_zh.word_index)+1, len(tokenizer_en.word_index)+1)
transformer_model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])
transformer_model.summary()

# 创建数据生成器
train_generator = DataGenerator(X_train, y_train, batch_size=16)
val_generator = DataGenerator(X_test, y_test, batch_size=16)

# 训练Transformer模型
transformer_model.fit(train_generator, validation_data=val_generator, epochs=1)


Model: "model_1"
__________________________________________________________________________________________________
 Layer (type)                Output Shape                 Param #   Connected to                  
 input_4 (InputLayer)        [(None, None)]               0         []                            
                                                                                                  
 embedding_3 (Embedding)     (None, None, 64)             1123270   ['input_4[0][0]']             
                                                          4                                       
                                                                                                  
 multi_head_attention_6 (Mu  (None, None, 64)             66368     ['embedding_3[0][0]',         
 ltiHeadAttention)                                                   'embedding_3[0][0]']         
                                                                                            

<keras.src.callbacks.History at 0x794acfd77700>

In [None]:
!pip install

In [31]:
import numpy as np
import tensorflow as tf
from concurrent.futures import ThreadPoolExecutor

# 启用混合精度训练
from tensorflow.keras.mixed_precision import Policy
policy = Policy('mixed_float16')
tf.keras.mixed_precision.set_global_policy(policy)

# 使用模型生成伪标签
def generate_pseudo_labels(model, X, target_length, batch_size=16, max_workers=16):
    pseudo_labels = []
    num_samples = len(X)

    def process_sample(i):
        sample_X = X[i:i+1]
        decoder_input_data = np.zeros((1, sample_X.shape[1]))
        with tf.device('/CPU:0'):  # 使用CPU进行预测
            sample_pseudo_label = model.predict([sample_X, decoder_input_data])
        return np.argmax(sample_pseudo_label, axis=-1)[0]

    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        results = executor.map(process_sample, range(num_samples))

    for result in results:
        result = np.pad(result, (0, target_length - len(result)), 'constant', constant_values=0)
        pseudo_labels.append(result)

    return np.array(pseudo_labels)

# 减少数据量，使用部分数据进行伪标签生成
subset_size = int(0.01 * len(X_test))  # 使用1%的测试数据生成伪标签
X_test_subset = X_test[:subset_size]

# 获取目标长度
target_length = y_train.shape[1]

pseudo_labels = generate_pseudo_labels(transformer_model, X_test_subset, target_length)

# 将原始英文标签和伪标签合并
X_combined = np.concatenate((X_train, X_test_subset), axis=0)
y_combined = np.concatenate((y_train, pseudo_labels), axis=0)

# 创建数据生成器
combined_generator = DataGenerator(X_combined, y_combined, batch_size=8)

# 重新训练模型
transformer_model.fit(combined_generator, epochs=1, validation_data=val_generator)  # 减少重新训练的轮数




<keras.src.callbacks.History at 0x794ad51bf9a0>

In [1]:
# 评估Transformer模型
test_loss, test_acc = transformer_model.evaluate(val_generator)
print(f"Transformer Test accuracy: {test_acc}")

# 演示翻译结果
def translate_sentence(sentence, model, tokenizer_input, tokenizer_output):
    seq = tokenizer_input.texts_to_sequences([sentence])
    seq = pad_sequences(seq, maxlen=X_train.shape[1], padding='post')
    decoder_input_data = np.zeros_like(seq)
    prediction = model.predict([seq, decoder_input_data])
    predicted_seq = np.argmax(prediction, axis=-1)
    translated_sentence = tokenizer_output.sequences_to_texts(predicted_seq)
    return translated_sentence[0]

# 示例翻译
example_sentence = "你好，前任，最近你好嗎？"
translated_sentence = translate_sentence(example_sentence, transformer_model, tokenizer_zh, tokenizer_en)
print(f"Translated: {translated_sentence}")


NameError: name 'transformer_model' is not defined

In [33]:
# 保存模型
model_save_path = '/content/transformer_model.h5'  # 在Colab环境中指定保存路径
transformer_model.save(model_save_path)


  saving_api.save_model(
