在colab上需要运行
```
!pip install -q transformers
!pip install -q tf_keras
```

In [1]:
# 全局限制
max_words = 10000      # 仅考虑最常用的 10,000 个词
max_len = 200          # 每条评论或截断或补长到长度 200

In [None]:
from tensorflow.keras.datasets import imdb

(x_train, y_train), (x_test, y_test) = imdb.load_data(num_words=max_words)

print("训练集大小:", len(x_train))
print("测试集大小:", len(x_test))

# 经典NLP

In [None]:
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.naive_bayes import MultinomialNB
from sklearn.pipeline import Pipeline

# 2. 构建索引-单词映射，用于将数字序列解码回文本
word_index = imdb.get_word_index()
# 官方文档里 0,1,2,3 分别是特殊标记，这里做一次偏移
index_to_word = {index + 3: word for (word, index) in word_index.items()}
index_to_word[0] = "<PAD>"
index_to_word[1] = "<START>"
index_to_word[2] = "<UNK>"
index_to_word[3] = "<UNUSED>"

def decode_review(sequence):
    """
    将 IMDB 的数字序列解码成可读文本。
    """
    return " ".join(index_to_word.get(i, "?") for i in sequence)

# 3. 将训练和测试数据解码为文本
train_texts = [decode_review(seq) for seq in x_train]
test_texts = [decode_review(seq) for seq in x_test]

# 4. 使用 scikit-learn 创建一个管道：
#    - TfidfVectorizer: 将文本转换为 TF-IDF 特征向量
#    - MultinomialNB:   使用朴素贝叶斯进行分类
pipeline = Pipeline([
    ('tfidf', TfidfVectorizer()),
    ('clf',   MultinomialNB())
])

# 5. 训练模型
pipeline.fit(train_texts, y_train)

# 6. 在测试集上进行评估
accuracy = pipeline.score(test_texts, y_test)
print("在测试集上的准确率:", accuracy)

# 训练一个带Attention的Seq模型

In [13]:
import tensorflow as tf
from tensorflow.keras.preprocessing.sequence import pad_sequences


# 第二步：填充/截断 序列
x_train = pad_sequences(x_train, maxlen=max_len)
x_test = pad_sequences(x_test, maxlen=max_len)


下面的示例使用了 MultiHeadAttention 层（多头自注意力），并且为了方便查看注意力权重，我们指定了 return_attention_scores=True。这样在前向传播时能将「注意力分数矩阵」一起输出。

注意：这个模型非常简单，仅仅为了演示如何调用 MultiHeadAttention，实际使用时可以加上 Position Embedding、层归一化、残差连接等，使之更贴近真正的 Transformer。

In [None]:
from tensorflow.keras import layers, Model

embedding_dim = 32  # 词向量维度
num_heads = 3       # MultiHeadAttention 的头数

# 输入：句子长度 = max_len
inputs = layers.Input(shape=(max_len,))

# 1) Embedding 层，将 [batch_size, max_len] -> [batch_size, max_len, embedding_dim]
x = layers.Embedding(
    input_dim=max_words,
    output_dim=embedding_dim,
    input_length=max_len
)(inputs)

# 2) MultiHeadAttention
# 为了方便获取注意力分数，这里将 return_attention_scores=True
# 注意力输入 Q, K, V 都用同一个张量 x（自注意力）
attention_layer = layers.MultiHeadAttention(
    num_heads=num_heads, 
    key_dim=embedding_dim, 
    #return_attention_scores=True
)

# MultiHeadAttention 的输出是 (attn_output, attn_scores)
attn_output, attn_scores = attention_layer(
    query=x, 
    value=x, 
    key=x, 
    return_attention_scores=True
)

# 3) 这里为了简化，直接对 attn_output 做一个全局平均池化
#    相当于将 [batch_size, max_len, embedding_dim] -> [batch_size, embedding_dim]
pooled = layers.GlobalAveragePooling1D()(attn_output)

# 4) 分类层
outputs = layers.Dense(1, activation='sigmoid')(pooled)

# 构建模型
model = Model(inputs=inputs, outputs=outputs)
model.compile(
    optimizer='adam', 
    loss='binary_crossentropy', 
    metrics=['accuracy']
)

model.summary()


In [None]:
history = model.fit(
    x_train, y_train,
    batch_size=64,
    epochs=4,     
    validation_split=0.2
)


In [None]:
test_loss, test_accuracy = model.evaluate(x_test, y_test)
print(f"测试集准确率: {test_accuracy:.4f}")

## 查看attention机制

In [17]:
# 让 "attention_model" 的输出变成 attn_scores
# 相当于把上面主干网络中的 MultiHeadAttention 重新拿出来输出分数
attention_model = Model(inputs=inputs, outputs=attn_scores)


In [None]:
import numpy as np

sample_input = x_train[:1]  # 取第一条训练样本做演示
attn_scores_out = attention_model.predict(sample_input)

print("注意力分数的形状:", attn_scores_out.shape)


In [None]:
import matplotlib.pyplot as plt


# 创建一个图形，包含n个子图
fig, axes = plt.subplots(1, num_heads, figsize=(15, 4))  # 1行3列的布局，图形大小15x4

# 显示前3个注意力头的矩阵
for i in range(num_heads):
    attention_matrix = attn_scores_out[0][i]  # shape = (200, 200)
    im = axes[i].imshow(attention_matrix, cmap='hot', interpolation='nearest')
    axes[i].set_title(f"Attention Scores (Head={i})")
    fig.colorbar(im, ax=axes[i])

plt.tight_layout()  # 自动调整子图之间的间距
plt.show()


# 如何使用预训练模型 DiBERT

重新加载数据

In [None]:
import tensorflow as tf
import numpy as np
from tensorflow.keras.preprocessing.sequence import pad_sequences


# 1) 填充序列到固定长度 maxlen=256
x_train_padded = pad_sequences(x_train, maxlen=max_len)
x_test_padded  = pad_sequences(x_test, maxlen=max_len)

# 2) 现在 x_train_padded, y_train 就是形状固定的 Numpy array
print(x_train_padded.shape)  # (25000, 256)

# 3) 可以直接 from_tensor_slices
train_ds = tf.data.Dataset.from_tensor_slices((x_train_padded, y_train))
test_ds  = tf.data.Dataset.from_tensor_slices((x_test_padded, y_test))

# 4) 构建 “整数->文本” 的逻辑
word_index = imdb.get_word_index()
reverse_word_index = {v: k for (k, v) in word_index.items()}

def decode_review(int_arr):
    # 由于是定长数组，这里 int_arr 可能含有 padding
    # 你可以只保留非零部分 (如果你把 0 当成 PAD)
    # 这里演示一下简化版写法
    return " ".join([reverse_word_index.get(i - 3, "?") for i in int_arr if i >= 3])

def to_text_fn(int_seq, label):
    text_tensor = tf.py_function(
        func=lambda seq: decode_review(seq.numpy()),
        inp=[int_seq],
        Tout=tf.string
    )
    return text_tensor, label

train_ds_text = train_ds.map(to_text_fn)
test_ds_text  = test_ds.map(to_text_fn)

In [None]:
# 查看第一条数据
for text, label in train_ds_text.take(1):
    print("文本:", text.numpy().decode('utf-8'))
    print("标签:", label.numpy())

In [None]:
# 检查是否有可用的GPU
print("GPU Available: ", tf.config.list_physical_devices('GPU'))

# 允许GPU内存动态增长，避免占用全部显存
gpus = tf.config.experimental.list_physical_devices('GPU')
if gpus:
    try:
        for gpu in gpus:
            tf.config.experimental.set_memory_growth(gpu, True)
    except RuntimeError as e:
        print(e)

In [None]:
from transformers import TFAutoModelForSequenceClassification, AutoTokenizer

# 5. 准备 BERT 模型与 tokenizer
model_name = "distilbert-base-uncased"
tokenizer = AutoTokenizer.from_pretrained(model_name)
bert_model = TFAutoModelForSequenceClassification.from_pretrained(
    model_name,
    num_labels=2  # 二分类
)
# 6. 定义给 BERT 做分词预处理的函数
def bert_preprocess_batch(texts, labels):
    """
    texts: shape=(batch_size,), dtype=string
    labels: shape=(batch_size,), dtype=int64 (或 int32)
    """
    # 先把 texts 转为 Python list[str]
    py_texts = [t.decode("utf-8") for t in texts.numpy()]

    # 对这一批文本做分词
    encoding = tokenizer(
        py_texts,
        truncation=True,
        padding='max_length',
        max_length=128,
        return_tensors='tf'
    )


    return (
        encoding["input_ids"],      # (batch_size, 128)
        encoding["attention_mask"], # (batch_size, 128)
        labels                      # (batch_size,)
    )

def wrap_preprocess_batch(texts, labels):
    # 通过 tf.py_function 调用上面的 Python 函数
    outputs = tf.py_function(
        func=bert_preprocess_batch,
        inp=[texts, labels],
        Tout=[tf.int32, tf.int32, tf.int64]
    )
    # outputs 是长度为 3 的列表: [input_ids, attention_mask, labels]
    # 这三个张量的 shape 现在还都是 (None,) 或 <unknown>

    # 显式设置形状！
    # 假设你 batch(16)，然后 truncation 后的序列长度 = 128
    outputs[0].set_shape((None, 128))  # input_ids
    outputs[1].set_shape((None, 128))  # attention_mask
    outputs[2].set_shape((None,))      # label

    # Keras 的模型需要 {"input_ids":..., "attention_mask":...}, label
    return {
        'input_ids': outputs[0],
        'attention_mask': outputs[1]
    }, outputs[2]


def prepare_bert_dataset(ds, shuffle=False, batch_size=16):
    ds = ds.batch(batch_size)
    ds = ds.map(wrap_preprocess_batch, num_parallel_calls=tf.data.AUTOTUNE)
    ds = ds.prefetch(tf.data.AUTOTUNE)



    return ds

# 7. 构造训练和测试集
train_ds_bert = prepare_bert_dataset(train_ds_text, shuffle=True, batch_size=16)
test_ds_bert = prepare_bert_dataset(test_ds_text, shuffle=False, batch_size=16)


In [None]:

# 8. 编译并训练模型
bert_model.compile(
    optimizer='rmsprop',
    loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
    metrics=['accuracy']
)

# 对BERT模型同样可以使用GPU
with tf.device('/GPU:0'):
    history_bert = bert_model.fit(
        train_ds_bert,
        validation_data=test_ds_bert,
        epochs=1
    )
# 9. 评估
test_loss_bert, test_acc_bert = bert_model.evaluate(test_ds_bert)
print("Final Test Accuracy with DistilBERT:", test_acc_bert)