In [None]:
import os
import sys
import logging
import datasets

import torch
import torch.nn as nn
import torch.nn.functional as F

import pandas as pd
import numpy as np

from transformers import BertTokenizerFast, DataCollatorWithPadding
from transformers import Trainer, TrainingArguments
from transformers import BertPreTrainedModel, BertModel
from transformers.modeling_outputs import SequenceClassifierOutput

from sklearn.model_selection import train_test_split

In [None]:
def KL(input, target, reduction="sum"):
    input = input.float()
    target = target.float()
    loss = F.kl_div(F.log_softmax(input, dim=-1, dtype=torch.float32),
                    F.softmax(target, dtype=torch.float32), reduction=reduction)
    return loss


class BertScratch(BertPreTrainedModel):
    def __init__(self, config):
        super().__init__(config)
        self.num_labels = config.num_labels
        self.config = config

        self.bert = BertModel(config)

        # dropout 的概率
        # 括号允许将长条件表达式（如三元运算符）分成多行，提升可读性
        classifier_dropout = (
            config.classifier_dropout if config.classifier_dropout is not None else config.hidden_dropout_prob
        )
        self.dropout = nn.Dropout(classifier_dropout)
        self.classifier = nn.Linear(config.hidden_size, config.num_labels)

        self.post_init()

    def forward(self, input_ids=None, attention_mask=None, token_type_ids=None, labels=None):
        outputs = self.bert(input_ids, attention_mask, token_type_ids)
        pooled_output = outputs[1]
        pooled_output = self.dropout(pooled_output)
        logits = self.classifier(pooled_output)

        kl_outputs = self.bert(input_ids, attention_mask, token_type_ids)
        kl_output = kl_outputs[1]
        kl_output = self.dropout(kl_output)
        kl_logits = self.classifier(kl_output)

        loss = None
        if labels is not None:
            loss_fct = nn.CrossEntropyLoss()
            loss = loss_fct(logits.view(-1, self.num_labels), labels.view(-1))

            ce_loss = loss_fct(kl_logits.view(-1, self.num_labels), labels.view(-1))
            kl_loss = (KL(logits, kl_logits, "sum") + KL(kl_logits, logits, "sum")) / 2.
            total_loss = loss + ce_loss + kl_loss

        return SequenceClassifierOutput(
            loss=total_loss,
            logits=logits,
            hidden_states=outputs.hidden_states,
            attentions=outputs.attentions
        )


## KL散度函数

### 关于log_softmax

- softmax
$$
\text{softmax}(x_i) = \frac{e^{x_i}}{\sum_{j} e^{x_j}}
$$
- log_softmax
$$
\log \text{softmax}(x_i) = x_i - \log \sum_{j} e^{x_j}
$$
log_softmax即简单的对softmax加上log

**为什么要这么做？**

为了数值稳定性。对于原本的log_max，任然需要计算e^x，这任然可能导致溢出。但在nn.F.log_softmax内部实现中添加了**最大值平移技巧**：

$$
\text{log\_softmax}(x_i) = (x_i - M) - \log \sum_{j} e^{x_j - M}
$$
其中：
- $M = \max(x_j)$ 是输入向量 $x$ 的最大值，
- $x_i - M$ 将输入平移至 $(-\infty, 0]$ 范围内，
- $\log \sum_{j} e^{x_j - M}$ 是数值稳定的对数求和项。

### torch.nn.functional.kl_div

KL散度的计算公式

$$
D_{KL}(P \parallel Q) = \sum_{i=1}^{n} P(i) \log \frac{P(i)}{Q(i)}
$$

`torch.nn.functional.kl_div(input, target, size_average=None, reduce=None, reduction='mean', log_target=False)`
- input和target要求是符合概率分布，且input处于log空间

### KL散度的作用
KL散度可以用于比较模型预测分布与真实分布的差异，KL散度越小代表两个分布的越相近，KL散度虽然不是一个真正的距离度量（因为它不对称），但它提供了一种有效的方式来量化分布之间的差异。

### KL散度在BertModel中的作用
模型对同一输入进行两次独立的BERT前向计算，得到两组logits（logits和kl_logits）。计算两组logits之间的对称KL散度（kl_loss），衡量模型两次预测分布的一致性。KL散度通过约束两次前向传播的输出分布相似性，防止模型过拟合，提升泛化能力。​​

与传统交叉熵的区别​​：交叉熵直接优化预测与标签的匹配，而KL散度优化模型内部的一致性，属于辅助损失