In [None]:
%pip install spacy
%pip install psutil
!python -m spacy download en_core_web_sm

# llama3.2-1B有点太大了，这个才12.8MB

In [1]:
import time
import json
import math
import torch
import spacy
import psutil
import itertools
import pandas as pd
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from torch.utils.data import Dataset, DataLoader, TensorDataset
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
from collections import Counter

# 数据集命名统一
TEXT_PROCESSED = "processed_text"
LABEL = "label"
TEXT_VECTORIZATION = "vectorization_text"
TEXT_VECTORIZATION_PADDING = "padding_vectorization_text"
FILE_PATH_WTI = "word_to_idx.json"
SENTENCE_LENGTH = 200

#### 云端训练解包

In [None]:
from google.colab import drive
import shutil
drive.mount('/content/drive/')
srcA = '/content/drive/MyDrive/sdxxdl_colab/hw2/processed_data_useful.json'
srcB = '/content/drive/MyDrive/sdxxdl_colab/hw2/word_to_idx.json'
srcC = '/content/drive/MyDrive/sdxxdl_colab/hw2/processed_text.zip'

destA = '/content/processed_data_useful.json'
destB = '/content/word_to_idx.json'
destC = '/content/processed_text.zip'

# 移动文件
shutil.copy(srcA, destA)
shutil.copy(srcB, destB)
shutil.copy(srcC, destC)
!unzip processed_text.zip


In [None]:
import os

cloud_directory = '/content/drive/MyDrive/sdxxdl_colab/hw2/models'

os.makedirs(cloud_directory, exist_ok=True)

for item in os.listdir('/content'):
    item_path = os.path.join('/content', item)

    if item in ['sample_data', 'drive', 'processed_data_useful.zip', '.config']:
        continue

    shutil.copy(item_path, os.path.join(cloud_directory, item))

print("Done")

In [None]:
# 加载数据集
df = (
    pd.read_csv("enron_spam_data.csv")
    .drop(columns=["Date"], axis=1)
    .rename(columns={"Spam/Ham": "label"})
)
df.dropna(inplace=True)
df["text"] = df["Subject"] + " " + df["Message"]
df = df[["text", "label"]]
df["label"] = df["label"].map({"ham": 0, "spam": 1})
print("数据集样例:")
print(df.head())
print("数据集大小:", df.shape)

#### 分词

~~考虑~~使用~~nltk或~~spaCy进行分词

am和pm感觉还是保留一下，有可能垃圾邮件发过来的时间也是一个重要判据（比如可能晚上发更多？）

In [None]:
# 这一步耗时很长
nlp = spacy.load("en_core_web_sm")
def spacy_tokenizer(text):
    doc = nlp(text.lower())  # 转为小写并分词
    # 去除停用词和标点符号
    tokens = [
        token.text
        for token in doc
        if not token.is_stop and not token.is_punct and len(token.text) > 1
    ]
    return " ".join(tokens)


# 应用spaCy分词器到文本列
df[TEXT_PROCESSED] = df["text"].apply(spacy_tokenizer)
df[[TEXT_PROCESSED, LABEL]].to_csv(
    "processed_text.csv", index=False, encoding="utf-8"
)
print("分词结果已保存为 processed_text.csv")

In [None]:
# 提取预处理好的文本数据
df_processed = pd.read_csv("processed_text.csv", encoding="utf-8")
print("是否存在 NaN 值:", df_processed[TEXT_PROCESSED].isna().any())

# 忽略空值
df_processed[TEXT_PROCESSED] = df_processed[TEXT_PROCESSED].fillna("")
print(df_processed.head())

~~数字太多了，对词表影响好大，测试了前面几个出来几乎全是`unknown`，以及先练练模型，看看后续是否需要增大词表到两万~~

一开始把100_000看成了10_000，但也能说明这个对词表的影响不小

In [None]:
PAD = "<pad>"
UNK = "<unk>"

# 将所有分词结果合并为一个大字符串（空格分隔），然后一次性分割成单词列表
all_tokens = list(
    itertools.chain.from_iterable(df_processed[TEXT_PROCESSED].str.split())
)

# all_tokens = [token for token in all_tokens if not token.isdigit()]  # 去除数字

unique_tokens = set(all_tokens)
print(f"无重复的最大词表大小为: {len(unique_tokens)}")

# 统计词频
word_counts = Counter(all_tokens)   # Counter 本身会去重
vocab = [word for word, _ in word_counts.most_common(100000)]

# 添加特殊标记（为填充部分与非词表单词准备）
vocab = [PAD, UNK] + vocab
word_to_idx = {word: idx for idx, word in enumerate(vocab)}

with open(FILE_PATH_WTI, "w", encoding="utf-8") as f:
    json.dump(word_to_idx, f, ensure_ascii=False, indent=4)
    print(f"词表已保存到 {FILE_PATH_WTI}")

print("[Done] Build vocabulary")

### 文本向量化

需要截断过长的句子，为过短的句子从左边填充直到阈值

根据文档提示，这里阈值设定为`200`

In [None]:
# 把分词结果借助词表转换为数字序列
df_processed[TEXT_VECTORIZATION] = df_processed[TEXT_PROCESSED].apply(
    lambda x: [word_to_idx.get(token, word_to_idx[UNK]) for token in x.split()]
)

# 从左到右截，前面的部分可能包含主题等重要内容
def pad_sequence(indices):
    if len(indices) > SENTENCE_LENGTH:
        return indices[:SENTENCE_LENGTH]
    else:
        return [word_to_idx[PAD]] * (SENTENCE_LENGTH - len(indices)) + indices


df_processed[TEXT_VECTORIZATION_PADDING] = df_processed[TEXT_VECTORIZATION].apply(pad_sequence)
print(df_processed.head())


# 缓存一下
df_processed.to_csv("processed_data.csv", index=False, encoding="utf-8")
print("处理后的数据已保存为 processed_data.csv")

# 仅存储 LABEL 和 TEXT_VECTORIZATION_PADDING 两列
df_useful = df_processed[[LABEL, TEXT_VECTORIZATION_PADDING]]

nested_data = df_useful.apply(
    lambda row: {
        "label": row["label"],
        "text": row[TEXT_VECTORIZATION_PADDING],
    },
    axis=1,
).tolist()

with open("processed_data_useful.json", "w", encoding="utf-8") as f:
    json.dump(nested_data, f, ensure_ascii=False, indent=4)

print("有效数据已保存 processed_data_useful.json")

In [None]:
with open(FILE_PATH_WTI, "r", encoding="utf-8") as f:
    word_to_idx = json.load(f)
    print(f"词表已从 {FILE_PATH_WTI} 加载")

with open("processed_data_useful.json", "r", encoding="utf-8") as f:
    nested_data = json.load(f)

# 还原为 DataFrame（这里只要顶层两个字段对就可以了）
df_loaded = pd.DataFrame(
    [
        {
            "text": item["text"],
            "label": item["label"],
        }
        for item in nested_data
    ]
)

print(df_loaded.head())
# 打乱 df_loaded 的行
df_shuffled = df_loaded.sample(frac=1, random_state=42).reset_index(drop=True)
print(df_shuffled.head())


train_df, test_df = train_test_split(df_shuffled, test_size=0.2, random_state=42)
train_df, val_df = train_test_split(train_df, test_size=0.1, random_state=42)

print(f"训练集大小: {len(train_df)}")
print(f"验证集大小: {len(val_df)}")
print(f"测试集大小: {len(test_df)}")

train_dataset = TensorDataset(
    torch.LongTensor(train_df["text"].tolist()),
    torch.LongTensor(train_df["label"].tolist()),
)

val_dataset = TensorDataset(
    torch.LongTensor(val_df["text"].tolist()),
    torch.LongTensor(val_df["label"].tolist()),
)

test_dataset = TensorDataset(
    torch.LongTensor(test_df["text"].tolist()),
    torch.LongTensor(test_df["label"].tolist()),
)

BATCH_SIZE = 256
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, num_workers=2,pin_memory=True, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE,num_workers=2, pin_memory=True, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False)

## 模型结构

位置编码的核心目的是**向模型注入序列中元素的位置信息**，使模型能够感知单词在句子中的相对或绝对位置，这对处理具有顺序依赖的序列数据（如文本）至关重要。以最常见的**正弦位置编码**为例，其原理如下：  
- 对于一个长度为 $ L $、维度为 $ d_{\text{model}} $ 的序列，位置编码为每个位置 $ i $ 生成一个 $ d_{\text{model}} $ 维的向量。  
- 利用三角函数的周期性，对偶数维度使用正弦函数，奇数维度使用余弦函数，公式为：  
$$
\begin{aligned}
PE_{(i, 2j)} &= \sin\left(\frac{i}{10000^{2j / d_{\text{model}}}}\right) \\
PE_{(i, 2j+1)} &= \cos\left(\frac{i}{10000^{2j / d_{\text{model}}}}\right)
\end{aligned}
$$  
其中 $ i $ 是位置（如第 $ i $ 个词），$ j $ 是维度索引。这种编码方式的优势在于：  
- 不同位置的编码在高维空间中具有唯一性，且能保持相对位置关系（如位置 $ i $ 和 $ i+k $ 的编码差异仅与 $ k $ 有关）。  
- 三角函数的平滑性使模型易于学习和泛化。

---

RoPE的关键是对Q和K向量进行**位置相关的旋转变换**，使相对位置信息嵌入到点积计算中。具体步骤如下：  

将每个维度的实向量表示为复数的实部和虚部：  
- 对于位置 $n$ 和维度 $2m$（偶数维度），对应复数的虚部为 $\sin(\theta_{m,n})$；  
- 维度 $2m+1$（奇数维度）对应实部为 $\cos(\theta_{m,n})$，其中频率 $\theta_{m,n} = n \cdot \omega_m$，$\omega_m = 1/10000^{2m/d_{\text{model}}}$ 是预定义的频率参数。  

对于位置 $n$ 和 $n+k$，相对位置为 $k$。RoPE通过旋转矩阵对Q和K进行变换，使得：  
- 当计算 $Q_n$ 与 $K_{n+k}$ 的点积时，结果隐式包含 $k$ 的信息。  
数学上，对向量 $v = [v_1, v_2, v_3, v_4, \dots, v_{d-1}, v_d]$（偶数维度，两两分组），位置 $n$ 的旋转操作可表示为：  
$$
\text{rotate}(v, n) = \begin{bmatrix}
v_1 \cos(\theta_m) - v_2 \sin(\theta_m) \\
v_1 \sin(\theta_m) + v_2 \cos(\theta_m) \\
v_3 \cos(\theta_{m+1}) - v_4 \sin(\theta_{m+1}) \\
v_3 \sin(\theta_{m+1}) + v_4 \cos(\theta_{m+1}) \\
\vdots
\end{bmatrix},
$$  
其中每组维度 $(2m, 2m+1)$ 对应频率 $\theta_m = n \cdot \omega_m$。

`RoPE`更适合处理长序列，而邮件通常来说都是短文本，本次还都做了截断，所以经典位置编码的效果和计算效率相对高一些是可以接受的


#### Attention

In [7]:
# 仿照原理实现的原始位置编码
class SimplePE(nn.Module):
    def __init__(self, d_model, max_len=1500):  # 这里max_len对效率影响不大，由于下面需要比较不同截断长度，这里设置大一点
        super().__init__()
        pe = torch.zeros(max_len, d_model)  # 初始化一个形状为 [max_len, d_model] 的全零张量
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)  # 这里表示每个位置的索引
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
        param = position * div_term
        pe[:, 0::2] = torch.sin(param)  # 所有行，从第一列开始每隔两列取值求sin/cos
        pe[:, 1::2] = torch.cos(param)
        self.register_buffer("pe", pe.unsqueeze(0))  # 在初始化时直接扩展维度

    def forward(self, x):
        return self.pe[:, :x.size(1), :]

# RoPE位置编码（d_model必须为偶数）
class RoPE(nn.Module):
    def __init__(self, d_model):
        super().__init__()
        inv_freq = 1.0 / (10000 ** (torch.arange(0, d_model, 2).float() / d_model))
        self.register_buffer("inv_freq", inv_freq)

    def forward(self, query, key):
        seq_len = query.size(1)
        pos = torch.arange(seq_len, device=query.device).unsqueeze(1)  # [seq_len, 1]

        sin_inp = torch.einsum(
            "i,j->ij", pos.squeeze(1).float(), self.inv_freq  # [seq_len, d_model//2]
        )
        emb_sin = torch.sin(sin_inp)  # [seq_len, d_model//2]
        emb_cos = torch.cos(sin_inp)  # [seq_len, d_model//2]

        # 分奇偶处理
        query1, query2 = query[..., ::2], query[..., 1::2]
        key1, key2 = key[..., ::2], key[..., 1::2]

        query_rotated = torch.cat(
            [query1 * emb_cos - query2 * emb_sin, query1 * emb_sin + query2 * emb_cos],
            dim=-1,
        )
        key_rotated = torch.cat(
            [key1 * emb_cos - key2 * emb_sin, key1 * emb_sin + key2 * emb_cos], dim=-1
        )

        return query_rotated, key_rotated

In [8]:
class AttentionModel(nn.Module):
    def __init__(
        self, vocab_size, d_model, nhead, mha_cnt=1, dropout=0.5, pos_encoding_type="original", output_dim=1
    ):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, d_model, padding_idx=0)  # 0对应<pad>，计算时候会忽略掉这些填充块
        self.pos_encoding_type = pos_encoding_type

        if pos_encoding_type == "original":
            self.pos_encoder = SimplePE(d_model)
        else:
            self.pos_encoder = RoPE(d_model)

        # 下面传值的时候就可以直接调用其forward
        self.mhas = nn.ModuleList([
            nn.MultiheadAttention(
                embed_dim=d_model,
                num_heads=nhead,
                dropout=dropout,
                batch_first=True,  # 把batch放在第一，保持输入形状为[batch_size, seq_len, d_model]
            )
            for _ in range(mha_cnt)
        ])
        """
        def forward(
            self,
            query: Tensor,
            key: Tensor,
            value: Tensor,
            key_padding_mask: Optional[Tensor] = None,  <-- 上面embedding已经处理过了
            need_weights: bool = True,
            attn_mask: Optional[Tensor] = None,
            average_attn_weights: bool = True,
            is_causal: bool = False,
        """
        self.fc = nn.Linear(d_model, output_dim) # 将d_model映射到输出维度
        self.classifier = lambda x: self.fc(x).squeeze()

    def forward(self, x):
        # x shape: [batch_size, seq_len]
        x = self.embedding(x)  # [batch_size, seq_len, d_model]

        # 位置编码
        if self.pos_encoding_type == "original":
            x = x + self.pos_encoder(x)  # [batch_size, seq_len, d_model]
        else:
            x, _ = self.pos_encoder(x, x)  # [1, seq_len, d_model]

        attn_mask = self.__maskGen(x.size(1), device=x.device)  # 下三角掩码
        # 只拿输出，忽略权重（这次如果做可视化分析才需要）
        for mha in self.mhas: # 网上说直接叠放是经典transformer设计，以(pe,mha,fc)为一组叠放更像GPT变体
            # 这里直接叠放，保持计算效率的同时避免过拟合（因为单层的效果就已经很好了）
            x, _ = mha(query=x, key=x, value=x, attn_mask=attn_mask)  # [batch_size, seq_len, d_model]

        # 取最后一个隐藏层的输出作为分类依据（包含所有时间步的信息，由于mha还会将序列信息聚合到时间步中，所以这里还附带了序列的全部上下文）
        x = x[:, -1, :]  # [batch_size, d_model]
        '''
        torch.Size([2, 3, 4])
        原始张量 x:
         tensor([[[ 1.9884,  1.3547, -1.0178, -0.6918],
                 [-0.0031,  0.3837, -0.2284,  1.5879],
                 [-0.3995, -1.3488, -1.6228,  0.5332]],

                [[-0.8753, -0.3659,  0.9125,  0.4637],
                 [-0.7732,  0.5641, -0.2950, -0.4699],
                 [-0.9908, -0.6429, -0.9626,  0.5282]]])
        torch.Size([2, 4])
        切片后的张量 x_sliced, 每一批都取最后一层的输出:
         tensor([[-0.3995, -1.3488, -1.6228,  0.5332],
                [-0.9908, -0.6429, -0.9626,  0.5282]])
        '''

        # 分类器
        logits = self.classifier(x)  # [batch_size]
        return logits, None # 多的返回值是注意力分析任务的史山

    @staticmethod
    def __maskGen(size, device=None):
        """生成下三角注意力掩码，确保位置编码不会看到未来的信息"""
        mask = (torch.triu(torch.ones(size, size)) == 1).transpose(0, 1)    # 生成全1上三角并转置
        mask = (
            mask.float()
            .masked_fill(mask == 0, float("-inf"))  # 根据文档描述将上三角部分填充为负无穷
            .masked_fill(mask == 1, float(0.0))
        )
        if device is not None:
            mask = mask.to(device)  # 将掩码移动到指定设备
        return mask

#### RNN

In [9]:
class RNNModel(nn.Module):
    """循环计算，不需要位置编码"""

    def __init__(
        self, vocab_size, embed_dim, hidden_dim, num_layers, output_dim, rnn_type="RNN", bidirectional=False, dropout=0.5
    ):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, embed_dim)

        # 配合下面的RNN分析
        rnn_class = {"RNN": nn.RNN, "LSTM": nn.LSTM, "GRU": nn.GRU}.get(rnn_type, nn.RNN)

        self.rnn = rnn_class(
            embed_dim,
            hidden_dim,
            num_layers,
            batch_first=True,
            dropout=dropout if num_layers > 1 else 0,  # 单层 RNN 不支持 dropout   跟pytorch实现有关，RNN层数大于1时才会生效
            bidirectional=bidirectional,
        )
        # bidirectional: bool = False,  默认单向
        self.fc = nn.Linear(hidden_dim * (2 if bidirectional else 1), output_dim) # [hidden_dim * num_directions, output_dim]

    def forward(self, x):
        # [batch_size, seq_len]
        x = self.embedding(x)  # [batch_size, seq_len, embed_dim]
        _, hn = self.rnn(x)
        # 单向 RNN: [num_layers, batch_size, dim]
        # 双向 RNN: [num_layers * 2, batch_size, dim]

        # 处理 LSTM 的元组输出（hn 是元组 (hn, cn)）
        if isinstance(hn, tuple):
            hn = hn[0]  # 只取 hn（隐藏状态），忽略 cn（细胞状态，内部用来处理长期记忆的，本次用不上）

        x = hn[-1, :, :]  # 同样取最后一层 [batch_size, hidden_dim]
        x = self.fc(x)
        logits = x.squeeze()  # [batch_size]  <--- output_dim=1
        return logits, None  # 此处是史山，与上面注意力板块适配的

In [10]:
class ModelTrainer:
    def __init__(self, model, train_loader, validation_loader, epochs=200, lr=0.01, model_name="AttentionModel", loss_func=nn.BCEWithLogitsLoss()):
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.model = model.to(self.device)
        self.train_loader = train_loader
        self.validation_loader = validation_loader
        self.epochs = epochs
        self.lr = lr

        # 默认二元交叉熵损失函数
        self.loss_func = loss_func
        self.optim = optim.Adam(self.model.parameters(), lr=self.lr)
        self.scheduler = optim.lr_scheduler.ReduceLROnPlateau(
            self.optim, "min", patience=3
        )
        self.max_acc = 0.0
        self.model_name = model_name

        # 拿来画图的
        self.train_losses, self.validation_losses = [], []
        self.train_accs, self.validation_accs = [], []
        self.train_precisions, self.validation_precisions = [], []
        self.train_recalls, self.validation_recalls = [], []
        self.train_f1s, self.validation_f1s = [], []

    def train(self):
        for epoch in range(self.epochs):

            # 训练
            train_loss, train_acc, train_precision, train_recall, train_f1 = (
                self.train_part()
            )

            # 验证
            (
                validation_loss,
                validation_acc,
                validation_precision,
                validation_recall,
                validation_f1,
            ) = self.validation_part()

            # 调整学习率
            self.scheduler.step(validation_loss)

            # 保存最佳模型
            if validation_acc > self.max_acc:
                self.max_acc = validation_acc
                torch.save(self.model.state_dict(), self.model_name)

            print(f"Epoch {epoch + 1}/{self.epochs}")
            print(
                f"Train Loss: {train_loss:.4f}, Acc: {train_acc:.4f}, Precision: {train_precision:.4f}, Recall: {train_recall:.4f}, F1: {train_f1:.4f} | "
                f"Validation Loss: {validation_loss:.4f}, Acc: {validation_acc:.4f}, Precision: {validation_precision:.4f}, Recall: {validation_recall:.4f}, F1: {validation_f1:.4f}"
            )

        return (
            self.train_losses,
            self.validation_losses,
            self.train_accs,
            self.validation_accs,
            self.train_precisions,
            self.validation_precisions,
            self.train_recalls,
            self.validation_recalls,
            self.train_f1s,
            self.validation_f1s,
        )

    def train_part(self):
        self.model.train()
        loss_sum = 0.0
        all_labels = []
        all_preds = []
        for inputs, labels in self.train_loader:
            inputs, labels = inputs.to(self.device), labels.to(self.device)
            self.optim.zero_grad()  # 梯度清零
            logits, _ = self.model(inputs)  # 调用上面搭建的前向传播
            if self.loss_func.__class__.__name__ == "BCEWithLogitsLoss":
                loss = self.loss_func(logits, labels.float())  # 计算损失
                preds = (torch.sigmoid(logits) > 0.5).int().tolist()
            else: # CE
                loss = self.loss_func(logits, labels.long())
                preds = torch.argmax(logits, dim=1).int().tolist()
            loss.backward()  # 反向传播，计算梯度
            self.optim.step()  # 更新参数

            loss_sum += loss.item()
            all_labels.extend(labels.tolist())
            all_preds.extend(preds)

        train_loss = loss_sum / len(self.train_loader)
        train_acc = accuracy_score(all_labels, all_preds)
        train_precision = precision_score(all_labels, all_preds)
        train_recall = recall_score(all_labels, all_preds)
        train_f1 = f1_score(all_labels, all_preds)

        self.train_losses.append(train_loss)
        self.train_accs.append(train_acc)
        self.train_precisions.append(train_precision)
        self.train_recalls.append(train_recall)
        self.train_f1s.append(train_f1)
        return train_loss, train_acc, train_precision, train_recall, train_f1

    def validation_part(self):
        self.model.eval()
        validation_loss = 0.0
        all_labels = []
        all_preds = []
        with torch.no_grad():  # 验证阶段不需要计算梯度
            for inputs, labels in self.validation_loader:
                inputs, labels = inputs.to(self.device), labels.to(self.device)
                logits, _ = self.model(inputs)

                if self.loss_func.__class__.__name__ == "BCEWithLogitsLoss":
                    validation_loss += self.loss_func(logits, labels.float()).item()
                    preds = (torch.sigmoid(logits) > 0.5).int().tolist()
                else:  # CE
                    validation_loss += self.loss_func(logits, labels.long()).item()
                    preds = torch.argmax(logits, dim=1).tolist()

                all_labels.extend(labels.tolist())
                all_preds.extend(preds)

        validation_loss = validation_loss / len(self.validation_loader)
        validation_acc = accuracy_score(all_labels, all_preds)
        validation_precision = precision_score(all_labels, all_preds)
        validation_recall = recall_score(all_labels, all_preds)
        validation_f1 = f1_score(all_labels, all_preds)

        self.validation_losses.append(validation_loss)
        self.validation_accs.append(validation_acc)
        self.validation_precisions.append(validation_precision)
        self.validation_recalls.append(validation_recall)
        self.validation_f1s.append(validation_f1)
        return (
            validation_loss,
            validation_acc,
            validation_precision,
            validation_recall,
            validation_f1,
        )

LOSS_IDX_T = 0
LOSS_IDX_V = 1
ACC_IDX_T = 2
ACC_IDX_V = 3
PRECISION_IDX_T = 4
PRECISION_IDX_V = 5
RECALL_IDX_T = 6
RECALL_IDX_V = 7
F1_IDX_T = 8
F1_IDX_V = 9

训练集曲线平稳、验证集曲线的抖动是正常的，数据量太少、参数量太多的情况下模型很容易阶段性过拟合，后续epoch恢复后就会带来抖动

In [11]:
class ModelEvaluator:
    def __init__(
        self,
        model=None,
        test_loader=None,
        compare_lists=None,
    ):
        self.model = model
        self.test_loader = test_loader
        self.compare_lists = compare_lists
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    def evaluate_model(self, analyse=True, model_path="AttentionModel.pth", loss_type="BCE"):
        self.model.load_state_dict(torch.load(model_path, map_location=self.device))
        self.model.to(self.device)
        self.model.eval()

        all_labels = []
        all_preds = []
        with torch.no_grad():
            for inputs, labels in self.test_loader:
                inputs, labels = inputs.to(self.device), labels.to(self.device)
                logits, _ = self.model(inputs)

                if loss_type == "BCE":
                    preds = (torch.sigmoid(logits) > 0.5).int().tolist()
                else:
                    preds = torch.argmax(logits, dim=1).tolist()

                all_labels.extend(labels.tolist())
                all_preds.extend(preds)

        # 计算评估指标
        test_acc = accuracy_score(all_labels, all_preds)
        test_precision = precision_score(all_labels, all_preds)
        test_recall = recall_score(all_labels, all_preds)
        test_f1 = f1_score(all_labels, all_preds)

        print(f"Test Accuracy: {test_acc:.4f}")
        print(f"Test Precision: {test_precision:.4f}")
        print(f"Spam Accuracy: {2 * test_acc - test_precision:.4f}")
        print(f"Test Recall: {test_recall:.4f}")
        print(f"Test F1 Score: {test_f1:.4f}")

        # 绘制混淆矩阵
        cm = confusion_matrix(all_labels, all_preds, labels=[0, 1])
        disp = ConfusionMatrixDisplay(
            confusion_matrix=cm, display_labels=["Ham", "Spam"]
        )
        disp.plot(cmap=plt.cm.Blues)
        plt.title("Confusion Matrix")
        plt.show()

        if not analyse:
            return test_acc, test_precision, test_recall, test_f1

    def plot_compare_strategies(
        self,
        metric_name,
        train_idx=0,
        val_idx=1,
        title="",
    ):
        plt.figure(figsize=(12, 6))

        # 训练曲线
        if train_idx is not None:
            plt.subplot(1, 2, 1)
            for label, results in self.compare_lists:
                plt.plot(
                    results[train_idx], label=f"Train {metric_name} ({label})"
                )  # 训练指标
            plt.xlabel("Epoch")
            plt.ylabel(metric_name)
            plt.title(f"Train {title}")
            plt.legend()

        # 验证曲线
        if train_idx is not None:
            plt.subplot(1, 2, 2)
            lb = "Validation"
            cs = "Epoch"
        else:
            plt.subplot(1, 1, 1)  # 如果没有训练曲线，则调整为单独的图
            lb = "Test"
            cs = "Case"

        for label, results in self.compare_lists:
            plt.plot(
                results[val_idx], label=f"{lb} {metric_name} ({label})"
            )  # 验证指标
        plt.xlabel(cs)
        plt.ylabel(metric_name)
        plt.title(f"{lb} {title}")
        plt.legend()

        plt.tight_layout()
        plt.show()

## 分析

In [12]:
def plot_metrics(compare_lists, setTitle):
    # 多组测试封装
    plot_compare_strategies = ModelEvaluator(
        compare_lists=compare_lists
    ).plot_compare_strategies

    plot_compare_strategies(
        "Loss", train_idx=LOSS_IDX_T, val_idx=LOSS_IDX_V, title=setTitle("Loss")
    )

    plot_compare_strategies(
        "Accuracy", train_idx=ACC_IDX_T, val_idx=ACC_IDX_V, title=setTitle("Accuracy")
    )

    plot_compare_strategies(
        "Precision",
        train_idx=PRECISION_IDX_T,
        val_idx=PRECISION_IDX_V,
        title=setTitle("Precision"),
    )

    plot_compare_strategies(
        "Recall", train_idx=RECALL_IDX_T, val_idx=RECALL_IDX_V, title=setTitle("Recall")
    )

    plot_compare_strategies(
        "F1 Score", train_idx=F1_IDX_T, val_idx=F1_IDX_V, title=setTitle("F1 Score")
    )


In [13]:
def compare_diff_strategies(evaluates=None, setTitle=None, compare_lists=None):
    # 这里直接用plt画图，仿照eval里多策略画图的方法（解耦还是做的不够好）
    if compare_lists is None:
        compare_lists = []
        for evaluate in evaluates:
            model_name = evaluate[2]
            test_acc, test_precision, test_recall, test_f1 = ModelEvaluator(
                evaluate[0], test_loader=test_loader
            ).evaluate_model(analyse=False, model_path=evaluate[1])
            compare_lists.append([model_name, test_acc, test_precision, test_recall, test_f1])

    models = [item[0] for item in compare_lists]
    accs = [item[1] for item in compare_lists]
    precisions = [item[2] for item in compare_lists]
    recalls = [item[3] for item in compare_lists]
    f1s = [item[4] for item in compare_lists]

    plt.figure(figsize=(12, 8))
    metrics = ["Accuracy", "Precision", "Recall", "F1 Score"]
    metrics_data = [accs, precisions, recalls, f1s]

    for idx, (metric, data) in enumerate(zip(metrics, metrics_data)):
        plt.subplot(2, 2, idx + 1)
        x = range(len(models))
        plt.plot(x, data, marker='o', linestyle='-', linewidth=2, markersize=8)
        plt.xticks(x, models, rotation=45)
        plt.ylabel(metric)
        plt.title(f"Test {setTitle(metric)}")
        for i, val in enumerate(data):
            plt.text(i, val, f"{val:.4f}", ha='center', va='bottom')

    plt.tight_layout()
    plt.show()

In [14]:
vocab_size = len(word_to_idx)
d_model = 256   # 较小的维度已经足以完成二分类任务，太高会引入很大的计算开销
nhead = 8
hidden_dim = 256
num_layers = 1
output_dim = 1
epochs = 15


def train_model(model, model_name, loss_func=nn.BCEWithLogitsLoss(), train_loader=train_loader, val_loader=val_loader):
    trainer = ModelTrainer(
        model=model,
        train_loader=train_loader,
        validation_loader=val_loader,
        epochs=epochs,
        lr=0.001,
        model_name=model_name,
        loss_func=loss_func,
    )
    return trainer.train()

In [15]:
def train_truly(model, model_name, loss_func=nn.BCEWithLogitsLoss(), train_loader=train_loader, val_loader=val_loader):
    start_time = time.time()

    # 记录初始内存
    if torch.cuda.is_available():
        torch.cuda.reset_peak_memory_stats()
        initial_memory = torch.cuda.memory_allocated()
    else:
        process = psutil.Process()
        initial_memory = process.memory_info().rss  # 初始内存（字节）

    results = train_model(model=model, model_name=model_name, loss_func=loss_func, train_loader=train_loader, val_loader=val_loader)
    end_time = time.time()

    if torch.cuda.is_available():
        peak_memory = torch.cuda.max_memory_allocated()
    else:
        peak_memory = process.memory_info().rss  # 最终内存（字节）

    elapsed_time = end_time - start_time
    memory_used = peak_memory - initial_memory
    print(f"Model: {model_name}")
    print(f"Elapsed Time: {elapsed_time:.2f} seconds")
    print(f"Memory Used: {memory_used / (1024 ** 2):.2f} MB\n")

    return results

#### Attention与RNN比较


多头注意力机制将输入 $X$ 投影到 $h$ 个不同的子空间（注意力头），每个子空间的维度为 $d/h$。每个注意力头独立计算（所有数据），最后将结果拼接并通过一个线性层得到最终输出

- 每个注意力头的计算复杂度为 $O(n^2\frac{d}{h})$
- 由于有 $h$ 个注意力头，总的计算复杂度为 $h \times O(n^2\frac{d}{h}) = O(n^2d)$

多头注意力自回归模型的计算复杂度主要由注意力机制决定，为 $O(n^2d)$，其中 $n$ 是序列长度，$d$ 是特征维度。这意味着随着序列长度的增加，计算复杂度呈平方增长，因此在处理长序列时计算量会显著增加

---

RNN 的基本结构是一个循环单元，在每个时间步，需要计算 $\mathbf{W}_{xh}\mathbf{x}_t$ 和 $\mathbf{W}_{hh}\mathbf{h}_{t - 1}$，这两个矩阵乘法的复杂度分别为 $O(hd)$ 和 $O(h^2)$。因此，单个时间步的计算复杂度为 $O(hd + h^2)$；对于长度为 $n$ 的序列，需要进行 $n$ 个时间步的计算，因此总的计算复杂度为 $O(n(hd + h^2))$

其中 $n$ 是序列长度，$d$ 是输入维度，$h$ 是隐藏状态维度。通常情况下，隐藏状态维度 $h$ 是一个固定的值，因此 RNN 的计算复杂度与序列长度 $n$ 呈线性关系。这使得 RNN 在处理长序列时相对更高效，但由于其循环结构，难以并行计算



In [None]:
# 训练 AttentionModel
attention_model = AttentionModel(vocab_size, d_model, nhead)
results_attention = train_truly(
    model=attention_model, model_name="AttentionModel.pth"
)

# 训练 RNNModel
rnn_model = RNNModel(
    vocab_size, d_model, hidden_dim, num_layers, output_dim, rnn_type="RNN"
)
results_rnn = train_truly(model=rnn_model, model_name="RNNModel.pth")

# 训练 LSTM-RNN
lstm_model = RNNModel(
    vocab_size, d_model, hidden_dim, num_layers, output_dim, rnn_type="LSTM"
)
results_lstm = train_truly(model=lstm_model, model_name="LSTMModel.pth")

# 训练 GRU-RNN
gru_model = RNNModel(
    vocab_size, d_model, hidden_dim, num_layers, output_dim, rnn_type="GRU"
)
results_gru = train_truly(model=gru_model, model_name="GRUModel.pth")

# 嵌套列表，包含模型名称和对应的结果
compare_lists = [
    ["Attention", results_attention],
    ["RNN", results_rnn],
    ["LSTM", results_lstm],
    ["GRU", results_gru],
]

# 绘制对比图
def setTitle(metric_name):
    return f"{metric_name} -- Attention vs RNN"

plot_metrics(compare_lists, setTitle)
compare_diff_strategies(
    [
        [attention_model, "AttentionModel.pth", "Attention"],
        [rnn_model, "RNNModel.pth", "RNN"],
        [lstm_model, "LSTMModel.pth", "LSTM"],
        [gru_model, "GRUModel.pth", "GRU"],
    ],
    setTitle,
)

#### 模型超参数影响

In [None]:
head_group = [
    {"nhead": 2, "d_model": d_model, "mha_cnt": 1},
    {"nhead": 4, "d_model": d_model, "mha_cnt": 1},
    {"nhead": 8, "d_model": d_model, "mha_cnt": 1},
    {"nhead": 16, "d_model": d_model, "mha_cnt": 1},
]

dim_group = [
    {"nhead": nhead, "d_model": 32, "mha_cnt": 1},
    {"nhead": nhead, "d_model": 128, "mha_cnt": 1},
    {"nhead": nhead, "d_model": 256, "mha_cnt": 1},
    {"nhead": nhead, "d_model": 512, "mha_cnt": 1},
]

mha_group = [
    {"nhead": nhead, "d_model": d_model, "mha_cnt": 1},
    {"nhead": nhead, "d_model": d_model, "mha_cnt": 2},
    {"nhead": nhead, "d_model": d_model, "mha_cnt": 3},
    {"nhead": nhead, "d_model": d_model, "mha_cnt": 4},
]

def param_test(param_group, group_name, pos):
    results = []
    compare_lists = []
    for params in param_group:
        model = AttentionModel(
            vocab_size=vocab_size,
            d_model=params["d_model"],
            nhead=params["nhead"],
            mha_cnt=params["mha_cnt"],
        )
        model_name = f"AttentionModel_{group_name}_nhead{params['nhead']}_dmodel{params['d_model']}_mha{params['mha_cnt']}.pth"
        result = train_truly(model=model, model_name=model_name)
        results.append([f"{group_name}_{params[pos]}", result])
        compare_lists.append([model, model_name, f"{group_name}_{params[pos]}"])
    return results, compare_lists

results_heads, compare_lists_heads = param_test(head_group, "heads", "nhead")
results_dims, compare_lists_dims = param_test(dim_group, "dims", "d_model")
results_mhas, compare_lists_mhas = param_test(mha_group, "mhas", "mha_cnt")

In [None]:
# 注意力头数
def setTitle(metric_name):
    return f"{metric_name} -- heads Comparisons"
plot_metrics(results_heads, setTitle)
compare_diff_strategies(
    evaluates=compare_lists_heads,
    setTitle=setTitle,
)

In [None]:
# 隐藏层维度
def setTitle(metric_name):
    return f"{metric_name} -- dims Comparisons"
plot_metrics(results_dims, setTitle)
compare_diff_strategies(
    evaluates=compare_lists_dims,
    setTitle=setTitle,
)

In [None]:
# 注意力层数
def setTitle(metric_name):
    return f"{metric_name} -- mhas Comparisons"
plot_metrics(results_mhas, setTitle)
compare_diff_strategies(
    evaluates=compare_lists_mhas,
    setTitle=setTitle,
)

#### 文本处理影响

In [None]:
# 提取预处理好的文本数据
df_processed = pd.read_csv("processed_text.csv", encoding="utf-8")
print("是否存在 NaN 值:", df_processed[TEXT_PROCESSED].isna().any())

# 忽略空值
df_processed[TEXT_PROCESSED] = df_processed[TEXT_PROCESSED].fillna("")
print(df_processed.head())

PAD = "<pad>"
UNK = "<unk>"

# 将所有分词结果合并为一个大字符串（空格分隔），然后一次性分割成单词列表
all_tokens = list(
    itertools.chain.from_iterable(df_processed[TEXT_PROCESSED].str.split())
)

# all_tokens = [token for token in all_tokens if not token.isdigit()]  # 去除数字

unique_tokens = set(all_tokens)
print(f"无重复的最大词表大小为: {len(unique_tokens)}")

In [None]:
# 把上面的预处理逻辑与训练、测试代码做个封装，这部分可以直接不看
def Attention_all_in_one(sentence_length, vocab_size_T, model_name):
    all_tokens = list(
        itertools.chain.from_iterable(df_processed[TEXT_PROCESSED].str.split())
    )
    word_counts = Counter(all_tokens)
    vocab = [word for word, _ in word_counts.most_common(vocab_size_T-2)]   # 此处必须减2，不然会越界报错
    vocab = [PAD, UNK] + vocab
    word_to_idx = {word: idx for idx, word in enumerate(vocab)}
    # 把分词结果借助词表转换为数字序列
    df_processed[TEXT_VECTORIZATION] = df_processed[TEXT_PROCESSED].apply(
        lambda x: [word_to_idx.get(token, word_to_idx[UNK]) for token in x.split()]
    )
    # 从左到右截，前面的部分可能包含主题等重要内容
    def pad_sequence(indices):
        if len(indices) > sentence_length:
            return indices[:sentence_length]
        else:
            return [word_to_idx[PAD]] * (sentence_length - len(indices)) + indices

    df_processed[TEXT_VECTORIZATION_PADDING] = df_processed[TEXT_VECTORIZATION].apply(
        pad_sequence
    )
    # 数据集划分
    df_useful = df_processed[[LABEL, TEXT_VECTORIZATION_PADDING]]
    nested_data = df_useful.apply(
        lambda row: {
            "label": row["label"],
            "text": row[TEXT_VECTORIZATION_PADDING],
        },
        axis=1,
    ).tolist()

    df_loaded = pd.DataFrame(
        [
            {
                "text": item["text"],
                "label": item["label"],
            }
            for item in nested_data
        ]
    )

    df_shuffled = df_loaded.sample(frac=1, random_state=42).reset_index(drop=True)

    train_df, test_df = train_test_split(df_shuffled, test_size=0.2, random_state=42)
    train_df, val_df = train_test_split(train_df, test_size=0.1, random_state=42)

    train_dataset = TensorDataset(
        torch.LongTensor(train_df["text"].tolist()),
        torch.LongTensor(train_df["label"].tolist()),
    )

    val_dataset = TensorDataset(
        torch.LongTensor(val_df["text"].tolist()),
        torch.LongTensor(val_df["label"].tolist()),
    )

    test_dataset = TensorDataset(
        torch.LongTensor(test_df["text"].tolist()),
        torch.LongTensor(test_df["label"].tolist()),
    )

    # 数据加载器
    BATCH_SIZE = 256
    train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False)
    test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False)

    attention_model = AttentionModel(vocab_size_T, d_model, nhead)
    results_attention = train_truly(
        model=attention_model, model_name=model_name, train_loader=train_loader, val_loader=val_loader
    )
    test_acc, test_precision, test_recall, test_f1 = ModelEvaluator(
        attention_model, test_loader=test_loader
    ).evaluate_model(analyse=False, model_path=model_name)

    return results_attention, [model_name, test_acc, test_precision, test_recall, test_f1]

In [None]:
print(int(df_processed[TEXT_PROCESSED].str.len().max() * 0.75))
results = []
compare_lists = []
sentence_lengths = [50, 200, 500, 1000]  # 1000跑不动，爆显存了
vocab_sizes = [10_000, 50_000, 100_000, len(unique_tokens)-1]

In [None]:
# 固定词表大小，测试不同截断长度
for sentence_length in sentence_lengths:
    result, compare_list = Attention_all_in_one(
        sentence_length, 100_000, f"AttentionModel_len{sentence_length}.pth"
    )
    results.append([f"Length {sentence_length}", result])
    compare_lists.append(compare_list)
# 绘制对比图
def setTitle(metric_name):
    return f"{metric_name} -- Length Comparisons"

plot_metrics(results, setTitle)
# 新增了判断逻辑，直接传入compare_lists说明可以直接画图
compare_diff_strategies(compare_lists=compare_lists, setTitle=setTitle)

In [None]:
# 绘制对比图
def setTitle(metric_name):
    return f"{metric_name} -- Length Comparisons"

plot_metrics(results, setTitle)
# 新增了判断逻辑，直接传入compare_lists说明可以直接画图
compare_diff_strategies(compare_lists=compare_lists, setTitle=setTitle)

In [None]:
results = []
compare_lists = []
# 固定截断长度，测试不同词表大小
for vocab_size_T in vocab_sizes:
    result, compare_list = Attention_all_in_one(
        200, vocab_size_T, f"AttentionModel_vocab{vocab_size_T}.pth"
    )
    results.append([f"Vocab {vocab_size_T}", result])
    compare_lists.append(compare_list)


# 绘制对比图
def setTitle(metric_name):
    return f"{metric_name} -- Vocab Comparisons"


plot_metrics(results, setTitle)
compare_diff_strategies(compare_lists=compare_lists, setTitle=setTitle)

#### 位置编码分析

In [None]:
model_original = AttentionModel(
    vocab_size, d_model, nhead, pos_encoding_type="original"
)
model_rope = AttentionModel(
    vocab_size, d_model, nhead, pos_encoding_type="rope"
)
results_rope = train_truly(
    model=model_rope, model_name="AttentionModel_RoPE.pth"
)

results_original = train_truly(
    model=model_original, model_name="AttentionModel_Original.pth"
)

compare_lists = [
    ["ORIG", results_original],
    ["RoPE", results_rope],
]

def setTitle(metric_name):
    return f"{metric_name} ---- 2 PE Compare"

plot_metrics(compare_lists, setTitle)
compare_diff_strategies(
    [
        [model_original, "AttentionModel_Original.pth", "ORIG"],
        [model_rope, "AttentionModel_RoPE.pth", "RoPE"],
    ],
    setTitle,
)