In [1]:
import math
import os
import warnings
from dataclasses import dataclass  # 简化数据类定义
from typing import List, Optional, Tuple, Union # 类型注解支持

import torch
import torch.utils.checkpoint
from packaging import version
from torch import nn
from torch.nn import BCEWithLogitsLoss, CrossEntropyLoss, MSELoss

from transformers.activations import ACT2FN
from transformers.generation import GenerationMixin
from transformers.modeling_attn_mask_utils import (
    _prepare_4d_attention_mask_for_sdpa,
    _prepare_4d_causal_attention_mask_for_sdpa,
)
from transformers.modeling_outputs import (
    BaseModelOutputWithPastAndCrossAttentions,
    BaseModelOutputWithPoolingAndCrossAttentions,
    CausalLMOutputWithCrossAttentions,
    MaskedLMOutput,
    MultipleChoiceModelOutput,
    NextSentencePredictorOutput,
    QuestionAnsweringModelOutput,
    SequenceClassifierOutput,
    TokenClassifierOutput,
)
from transformers.modeling_utils import PreTrainedModel
from transformers.pytorch_utils import apply_chunking_to_forward, find_pruneable_heads_and_indices, prune_linear_layer
from transformers.utils import (
    ModelOutput,
    add_code_sample_docstrings,
    add_start_docstrings,
    add_start_docstrings_to_model_forward,
    get_torch_version,
    logging,
    replace_return_docstrings,
)
from transformers.models.bert.configuration_bert import BertConfig

2025-05-30 03:50:19.065519: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:477] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
E0000 00:00:1748577019.475976      35 cuda_dnn.cc:8310] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
E0000 00:00:1748577019.585776      35 cuda_blas.cc:1418] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered


In [2]:
logger = logging.get_logger(__name__)  # 日志对象

In [3]:
_CHECKPOINT_FOR_DOC = "google-bert/bert-base-uncased"
_CONFIG_FOR_DOC = "BertConfig"

In [4]:
# Token分类
_CHECKPOINT_FOR_TOKEN_CLASSIFICATION = "dbmdz/bert-large-cased-finetuned-conll03-english"

In [5]:
# token分类预期输出
_TOKEN_CLASS_EXPECTED_OUTPUT = (
    "['O', 'I-ORG', 'I-ORG', 'I-ORG', 'O', 'O', 'O', 'O', 'O', 'I-LOC', 'O', 'I-LOC', 'I-LOC'] "
)

In [6]:
_TOKEN_CLASS_EXPECTED_LOSS = 0.01 # token分类预期损失

In [7]:
# 问答
_CHECKPOINT_FOR_QA = "deepset/bert-base-cased-squad2"
_QA_EXPECTED_OUTPUT = "'a nice puppet'"
_QA_EXPECTED_LOSS = 7.41
_QA_TARGET_START_INDEX = 14
_QA_TARGET_END_INDEX = 15

In [8]:
# 序列分类
_CHECKPOINT_FOR_SEQUENCE_CLASSIFICATION = "textattack/bert-base-uncased-yelp-polarity"
_SEQ_CLASS_EXPECTED_OUTPUT = "'LABEL_1'" 
_SEQ_CLASS_EXPECTED_LOSS = 0.01

In [9]:
# 在 pytorch 模型中加载 tf 检查点
# 当前这段代码中，config 没有被使用。
# 它可能是为了兼容某些接口或者未来扩展保留的参数。
# model 是指初始化的模型实例吗？
# 是的，model 是已经初始化的 PyTorch 模型实例（通常是空权重结构），
# 这个函数的作用是：将 TensorFlow checkpoint 中的权重加载并覆盖到该模型实例中。
# tf_checkpoint_path 是什么样子的？
# 它应是指向 TensorFlow checkpoint 的本地文件前缀路径，例如：
# ./bert_model.ckpt
# TensorFlow checkpoint 通常由以下三个文件组成：
# bert_model.ckpt.index
# bert_model.ckpt.meta
# bert_model.ckpt.data-00000-of-00001
# 其中 tf_checkpoint_path 是不带 .index、.meta、.data-xxxx 后缀的路径前缀。
# 是否支持类似 deepset/bert-base-cased-squad2 这样的路径？
# 不支持直接使用像 deepset/bert-base-cased-squad2 这样的HuggingFace模型名。
# 原因：tf_path = os.path.abspath(tf_checkpoint_path)
# 此语句期望你传入的是本地路径，它会转成绝对路径。如果传的是 HuggingFace 模型名，会被当作本地路径处理，导致报错。
def load_tf_weights_in_bert(model, config, tf_checkpoint_path):
    try: # 尝试导入
        import re
        import numpy as np
        import tensorflow as tf
    except ImportError: # 捕获导入错误,打印错误日志,之后抛出
        logger.error(
            "Loading a TensorFlow model in PyTorch, requires TensorFlow to be installed. Please see "
            "https://www.tensorflow.org/install/ for installation instructions."
        )
        raise
    tf_path = os.path.abspath(tf_checkpoint_path) # 获取TF checkpoint的绝对路径
    logger.info(f"Converting TensorFlow checkpoint from {tf_path}") # 打印转换tf检查点的日志
    # 获取TF checkpoint中所有变量（名称及形状）
    init_vars = tf.train.list_variables(tf_path)
    names = [] # 用来存储参数的名称
    arrays = [] # 存储具体的参数值
    for name, shape in init_vars:
        logger.info(f"Loading TF weight {name} with shape {shape}")
        array = tf.train.load_variable(tf_path, name)  # 加载变量的值
        names.append(name) 
        arrays.append(array)
    # 遍历对应的参数名和值
    # 假设 TensorFlow 变量名是：bert/encoder/layer_0/attention/self/query/kernel
    # 拆分后是：["bert", "encoder", "layer_0", "attention", "self", "query", "kernel"]
    # PyTorch 模型中路径大致为：model.bert.encoder.layer[0].attention.self.query.weight
    # 通过 pointer，代码逐步执行如下：
    # pointer = model
    # pointer = pointer.bert
    # pointer = pointer.encoder
    # pointer = pointer.layer[0]
    # pointer = pointer.attention
    # pointer = pointer.self
    # pointer = pointer.query
    # pointer = pointer.weight
    # 这样最终定位到了 PyTorch 中的 query.weight 参数，便可将 TF 权重 array 赋值给它。
    # 注意事项：
    # 中途若某个模块在 PyTorch 中不存在，会触发 AttributeError，被跳过。
    # 如果最终 pointer.shape != array.shape，则报错防止权重错配。
    # pointer.data = torch.from_numpy(array) 是核心赋值语句。
    for name, array in zip(names, arrays):
        name = name.split("/") # 将变量名按 '/' 拆分，得到模块层级名列表
        # 忽略Adam优化器的中间变量（不用于推理或训练初始化）
        if any(
            n in ["adam_v", "adam_m", "AdamWeightDecayOptimizer", "AdamWeightDecayOptimizer_1", "global_step"]
            for n in name
        ):
            logger.info(f"Skipping {'/'.join(name)}") 
            continue # 跳过优化器之类的变量
        pointer = model # 从模型顶层开始逐层定位对应子模块
        # 在这段代码中，pointer 是一个动态指针变量，用于从 PyTorch 模型的顶层出发，逐层定位到目标权重的具体位置
        # 。整个核心逻辑依赖它去精确找到与 TensorFlow 中某个权重对应的 PyTorch 模块/参数。
        for m_name in name: # 遍历 TF 变量路径中的每个模块名片段
            # 如果模块名是带下标的形式（如 'layer_0'），则将其拆分成 ['layer', '0']
            if re.fullmatch(r"[A-Za-z]+_\d+", m_name):
                scope_names = re.split(r"_(\d+)", m_name) # 例：'layer_0' -> ['layer', '0']
            else: # 普通模块名不拆分
                scope_names = [m_name]
            # 将 TF 权重变量名映射到 PyTorch 模型的属性
            if scope_names[0] == "kernel" or scope_names[0] == "gamma":
                # TF 的 kernel 对应 PyTorch 的 weight，gamma 是 LayerNorm 的 weight
                pointer = getattr(pointer, "weight")
            elif scope_names[0] == "output_bias" or scope_names[0] == "beta":
                pointer = getattr(pointer, "bias") # TF 的 bias 和 beta 对应 PyTorch 的 bias
            elif scope_names[0] == "output_weights":
                pointer = getattr(pointer, "weight")  # 某些输出层权重直接映射为 weight
            elif scope_names[0] == "squad": # 特殊命名的 classifier 层
                pointer = getattr(pointer, "classifier")
            else:
                try:
                    pointer = getattr(pointer, scope_names[0])  # 常规情况，按属性名获取下一层模块
                except AttributeError: # 如果模块在 PyTorch 模型中不存在，跳过该变量
                    logger.info(f"Skipping {'/'.join(name)}")
                    continue
            # 如果是带索引的模块（如 transformer 层列表），进入指定下标的子模块
            if len(scope_names) >= 2:
                num = int(scope_names[1])  # 提取数字索引
                pointer = pointer[num] # 进入子模块 如果是序列模块
        # 特殊情况：embedding 权重，获取其 weight 属性
        if m_name[-11:] == "_embeddings":
            pointer = getattr(pointer, "weight")
        # 对于 kernel 权重（通常是全连接层），进行转置匹配 PyTorch 的布局
        elif m_name == "kernel":
            array = np.transpose(array)
        # 校验 shape 是否匹配，避免因维度不一致导致错误加载
        try:
            if pointer.shape != array.shape:
                raise ValueError(f"Pointer shape {pointer.shape} and array shape {array.shape} mismatched")
        except ValueError as e:  # 将维度信息附加到异常中，并重新抛出
            e.args += (pointer.shape, array.shape)
            raise
        logger.info(f"Initialize PyTorch weight {name}") # 打印成功加载的变量日志
        pointer.data = torch.from_numpy(array) # 将 numpy 数组转为 PyTorch tensor，并复制到目标权重变量中
    return model

In [10]:
config=BertConfig()

In [11]:
config.max_position_embeddings

512

In [12]:
config.type_vocab_size

2

In [18]:
aa=torch.arange(8).expand((2, -1))
aa

tensor([[0, 1, 2, 3, 4, 5, 6, 7],
        [0, 1, 2, 3, 4, 5, 6, 7]])

In [15]:
torch.zeros(aa.size(), dtype=torch.long)

tensor([[0, 0, 0, 0, 0, 0, 0, 0]])

In [19]:
# 从单词、位置和 token_type 嵌入构建嵌入
class BertEmbeddings(nn.Module):
    def __init__(self, config):
        super().__init__() # 调用父类的初始化
        self.word_embeddings = nn.Embedding( # 词嵌入
            config.vocab_size, config.hidden_size, padding_idx=config.pad_token_id)
        self.position_embeddings = nn.Embedding( # 位置嵌入
            config.max_position_embeddings, config.hidden_size)
        self.token_type_embeddings = nn.Embedding( # token类型嵌入
            config.type_vocab_size, config.hidden_size)
        # 在 Python 中，按 PEP8 规范，变量名应为 layer_norm 形式。但 TensorFlow 模型保存变量时，通常变量名
        # 是 LayerNorm（驼峰命名）。
        # 为了权重自动对齐（如 LayerNorm/gamma → PyTorch 中的 LayerNorm.weight），必须让 PyTorch 模型中变
        # 量名也叫 LayerNorm，否则无法正确映射
        # 与 BatchNorm 不同，LayerNorm 是逐样本归一化，不会维护任何 running_mean 或 running_var。
        # 所以它只有两个参数：weight 和 bias，均为可训练的。
        # LayerNorm 是单样本级别的统计，每个样本独立计算均值与方差，然后进行归一化。
        # 假设输入是 (batch_size, seq_len, hidden_dim)
        # LayerNorm 通常在最后一个维度（hidden_dim）上进行归一化
        # LayerNorm 没有 running_mean / running_var，只有 weight 和 bias。
        # 是 逐样本归一化，不同于 BatchNorm 的批次统计。
        # 优点是对 batch size 不敏感，适合 NLP、Transformers 等序列建模任务。
        # LayerNorm 是逐样本统计，不维护全局均值和方差，因此根本没有 running_mean 和 running_var。
        self.LayerNorm = nn.LayerNorm(config.hidden_size, eps=config.layer_norm_eps)
        self.dropout = nn.Dropout(config.hidden_dropout_prob) # dropout
        # 位置嵌入类型,默认是绝对
        self.position_embedding_type = getattr(config, "position_embedding_type", "absolute")
        # PyTorch 中的 register_buffer 方法用于注册非参数性张量（不会作为 model.parameters() 返回，但会和模型
        # 一起保存和加载，比如 .to(device) 时也会自动迁移）
        # persistent=False 不希望保存，如：推理中可以重新生成、与权重无关的缓存型 buffer。
        # 本例中的 position_ids 虽然会跟随模型迁移设备，但因为 persistent=False，所以不会随着模型保存和加载，
        # 通常用于推理阶段可重构的辅助张量，如位置索引、掩码模板等。
        # register_buffer：将张量注册为 buffer（非参数）
        # persistent=False：表示不希望它被保存到模型的 state_dict 中
        self.register_buffer(
            "position_ids", torch.arange(config.max_position_embeddings).expand((1, -1)), persistent=False
        )
        self.register_buffer(
            "token_type_ids", torch.zeros(self.position_ids.size(), dtype=torch.long), persistent=False
        )

    def forward(
        self,
        input_ids: Optional[torch.LongTensor] = None,
        token_type_ids: Optional[torch.LongTensor] = None,
        position_ids: Optional[torch.LongTensor] = None,
        inputs_embeds: Optional[torch.FloatTensor] = None,
        past_key_values_length: int = 0,
    ) -> torch.Tensor:
        # 获取输入的形状
        if input_ids is not None: 
            input_shape = input_ids.size()
        else:
            input_shape = inputs_embeds.size()[:-1]

        seq_length = input_shape[1] # 序列长度
        # 设置默认的位置ids  只切出本次传入的那段
        if position_ids is None: 
            position_ids = self.position_ids[:, past_key_values_length : seq_length + past_key_values_length]

        # “issue #5664” 是 Hugging Face Transformers 仓库中的一个问题，涉及在使用 TorchScript 对模型进行追踪（tracing）时，
        # 如果未显式传入 token_type_ids，可能会导致模型在推理阶段出现错误。
        # 为了解决这个问题，开发者在模型的构造函数中注册了一个全零的 token_type_ids 缓冲区
        if token_type_ids is None: # 设置默认的token type ids
            if hasattr(self, "token_type_ids"):
                buffered_token_type_ids = self.token_type_ids[:, :seq_length]
                buffered_token_type_ids_expanded = buffered_token_type_ids.expand(input_shape[0], seq_length)
                token_type_ids = buffered_token_type_ids_expanded
            else:
                token_type_ids = torch.zeros(input_shape, dtype=torch.long, device=self.position_ids.device)
        # 设置词嵌入
        if inputs_embeds is None:
            inputs_embeds = self.word_embeddings(input_ids)
        token_type_embeddings = self.token_type_embeddings(token_type_ids)
        embeddings = inputs_embeds + token_type_embeddings
        if self.position_embedding_type == "absolute": 
            position_embeddings = self.position_embeddings(position_ids)
            embeddings += position_embeddings 
        embeddings = self.LayerNorm(embeddings) # 层标准化
        embeddings = self.dropout(embeddings) # dropout
        return embeddings

In [30]:
position_ids_l = torch.arange(3, dtype=torch.long).view(-1, 1) # (3,1) query_length
position_ids_l

tensor([[0],
        [1],
        [2]])

In [29]:
position_ids_r=torch.arange(5, dtype=torch.long).view(1,-1) # key_length
position_ids_r

tensor([[0, 1, 2, 3, 4]])

In [31]:
distance = position_ids_l - position_ids_r
distance

tensor([[ 0, -1, -2, -3, -4],
        [ 1,  0, -1, -2, -3],
        [ 2,  1,  0, -1, -2]])

In [32]:
# 有缓存的情况
position_ids_l = torch.tensor(5 - 1, dtype=torch.long).view(
                    -1, 1
                )
position_ids_l

tensor([[4]])

In [34]:
distance = position_ids_l - position_ids_r
distance

tensor([[4, 3, 2, 1, 0]])

In [35]:
class BertSelfAttention(nn.Module): # 注意力
    def __init__(self, config, position_embedding_type=None):
        super().__init__()
        if config.hidden_size % config.num_attention_heads != 0 and not hasattr(config, "embedding_size"):
            raise ValueError(
                f"The hidden size ({config.hidden_size}) is not a multiple of the number of attention "
                f"heads ({config.num_attention_heads})"
            )
        self.num_attention_heads = config.num_attention_heads # 头数
        self.attention_head_size = int(config.hidden_size / config.num_attention_heads) # 头大小
        self.all_head_size = self.num_attention_heads * self.attention_head_size 
        self.query = nn.Linear(config.hidden_size, self.all_head_size) # q,k,v投影
        self.key = nn.Linear(config.hidden_size, self.all_head_size)
        self.value = nn.Linear(config.hidden_size, self.all_head_size)

        self.dropout = nn.Dropout(config.attention_probs_dropout_prob)
        # 位置嵌入类型,默认绝对
        self.position_embedding_type = position_embedding_type or getattr(
            config, "position_embedding_type", "absolute"
        )
        # 相对位置嵌入
        if self.position_embedding_type == "relative_key" or self.position_embedding_type == "relative_key_query":
            self.max_position_embeddings = config.max_position_embeddings # 最大位置嵌入
            self.distance_embedding = nn.Embedding(2 * config.max_position_embeddings - 1, self.attention_head_size)

        self.is_decoder = config.is_decoder # 是否是解码器结构

    def transpose_for_scores(self, x: torch.Tensor) -> torch.Tensor:
        new_x_shape = x.size()[:-1] + (self.num_attention_heads, self.attention_head_size)
        x = x.view(new_x_shape) # -->(b,s,h,hd)
        return x.permute(0, 2, 1, 3) # (b,h,s,hd)

    def forward(
        self,
        hidden_states: torch.Tensor,
        attention_mask: Optional[torch.FloatTensor] = None,
        head_mask: Optional[torch.FloatTensor] = None,
        encoder_hidden_states: Optional[torch.FloatTensor] = None,
        encoder_attention_mask: Optional[torch.FloatTensor] = None,
        past_key_value: Optional[Tuple[Tuple[torch.FloatTensor]]] = None,
        output_attentions: Optional[bool] = False,
    ) -> Tuple[torch.Tensor]:
        #  hidden_states是初次的嵌入表示或者上一次编码器或解码器的输出
        mixed_query_layer = self.query(hidden_states) 

        # 如果这个模块是作为交叉注意力模块实例化的，那么 key 和 value 来自编码器；此时的 
        # attention mask 需要确保不会对编码器中的 padding token 进行注意力计算
        # 如果传入了编码器输出,这里就是True 
        is_cross_attention = encoder_hidden_states is not None 
        # 如果是交叉注意力,并且有缓存,对应是解码器交叉注意力,有缓存的情况
        if is_cross_attention and past_key_value is not None:
            key_layer = past_key_value[0] # 从缓存中获取k,v
            value_layer = past_key_value[1]
            attention_mask = encoder_attention_mask # 编码器的填充掩码
        elif is_cross_attention: # 交叉注意力,没缓存
            key_layer = self.transpose_for_scores(self.key(encoder_hidden_states)) # (b,h,s,hd)
            value_layer = self.transpose_for_scores(self.value(encoder_hidden_states))
            attention_mask = encoder_attention_mask # 编码器的填充掩码
        elif past_key_value is not None: # 自注意力,有缓存
            # 获取当前传入的目标序列的k,v state,对应是解码器自注意力的情况
            key_layer = self.transpose_for_scores(self.key(hidden_states)) # (b,h,s,hd)
            value_layer = self.transpose_for_scores(self.value(hidden_states))
            # 之后在序列维度拼接 
            key_layer = torch.cat([past_key_value[0], key_layer], dim=2)
            value_layer = torch.cat([past_key_value[1], value_layer], dim=2)
        else: # 这个是自注意力,并且没缓存的情况,直接计算,对应是编码器的情况
            key_layer = self.transpose_for_scores(self.key(hidden_states)) # (b,h,s,hd)
            value_layer = self.transpose_for_scores(self.value(hidden_states))
        
        query_layer = self.transpose_for_scores(mixed_query_layer) # (b,h,s,hd)
        # 是否使用缓存 在past_key_value存在时才使用缓存
        use_cache = past_key_value is not None 
        if self.is_decoder: # 如果是解码器架构,总是设置缓存
            # 如果是交叉注意力（cross_attention），则保存所有交叉注意力的键/值状态（Tuple(torch.Tensor, torch.Tensor)）。
            # 后续对交叉注意力层的调用可以复用这些键/值状态（对应第一个“if”分支）。
            # 如果是单向自注意力（即解码器中的 self-attention），则保存解码器中所有先前的键/值状态（也是 Tuple(
            # torch.Tensor, torch.Tensor)）。
            # 后续对单向自注意力的调用可以将之前的键/值状态与当前投影后的键/值状态拼接（对应第三个 “elif” 分支）。
            # 如果是编码器中的双向自注意力，past_key_value 总是为 None。
            past_key_value = (key_layer, value_layer)
        
        # q@k (b,h,s_q,hd)@(b,h,hd,s_k)-->(b,h,s_q,s_K)
        attention_scores = torch.matmul(query_layer, key_layer.transpose(-1, -2))
        
        if self.position_embedding_type == "relative_key" or self.position_embedding_type == "relative_key_query":
            # 如果使用缓存,s_q=1,s_K是包括缓存的序列
            query_length, key_length = query_layer.shape[2], key_layer.shape[2] # 序列长度 s_q,s_K
            if use_cache: # 如果使用缓存 这里的position_ids_l是query中当前token对应的key中的相对位置
                position_ids_l = torch.tensor(key_length - 1, dtype=torch.long, device=hidden_states.device).view(
                    -1, 1
                )
            else:# 如果不使用缓存 (s_q,1)
                position_ids_l = torch.arange(query_length, dtype=torch.long, device=hidden_states.device).view(-1, 1)
            # (1,s_k)
            position_ids_r = torch.arange(key_length, dtype=torch.long, device=hidden_states.device).view(1, -1)
            # 如果使用缓存,这里就是query中当前token和key中所有token之间的相对位置偏移
            # 如果不用缓存,每一行是query中每个token与key中所有token的相对位置偏移
            distance = position_ids_l - position_ids_r
            # 位置嵌入 这里转换成正数形式
            positional_embedding = self.distance_embedding(distance + self.max_position_embeddings - 1)
            positional_embedding = positional_embedding.to(dtype=query_layer.dtype)  # fp16 compatibility
            
            if self.position_embedding_type == "relative_key":
                # 相对位置分数 (b,h,s_q,hd)@(s_q,s_k,hd)-->(b,h,s_q,s_k)
                relative_position_scores = torch.einsum("bhld,lrd->bhlr", query_layer, positional_embedding)
                # (b,h,s_q,s_k)+(b,h,s_q,s_k)-->(b,h,s_q,s_k)
                attention_scores = attention_scores + relative_position_scores
            elif self.position_embedding_type == "relative_key_query":
                # query相对key (b,h,s_q,hd)@(s_q,s_k,hd)-->(b,h,s_q,s_k)
                relative_position_scores_query = torch.einsum("bhld,lrd->bhlr", query_layer, positional_embedding)
                # key相对query (b,h,s_k,hd)@(s_q,s_k,hd)-->(b,h,s_q,s_k)
                relative_position_scores_key = torch.einsum("bhrd,lrd->bhlr", key_layer, positional_embedding)
                # 带上相对位置的注意力得分
                attention_scores = attention_scores + relative_position_scores_query + relative_position_scores_key
        # 缩放
        attention_scores = attention_scores / math.sqrt(self.attention_head_size)
        if attention_mask is not None: # 如果传入了注意力掩码
            # 应用注意力掩码
            attention_scores = attention_scores + attention_mask
        # 对注意力分数在s_k上归一化
        attention_probs = nn.functional.softmax(attention_scores, dim=-1)
        # 之后对注意力矩阵dropout
        attention_probs = self.dropout(attention_probs)
        # 如果头掩码存在
        if head_mask is not None:
            attention_probs = attention_probs * head_mask 
        # 加权求和得到上下文表示 (b,h,s_q,s_k)@(b,h,s_k,hd)-->(b,h,s_q,hd)
        context_layer = torch.matmul(attention_probs, value_layer)
        # -->(b,s_q,h,hd)
        context_layer = context_layer.permute(0, 2, 1, 3).contiguous()
        new_context_layer_shape = context_layer.size()[:-2] + (self.all_head_size,)
        # -->(b,s_q,d)
        context_layer = context_layer.view(new_context_layer_shape)
        # 是否输出当前层的注意力权重矩阵
        outputs = (context_layer, attention_probs) if output_attentions else (context_layer,)
        if self.is_decoder: # 如果是解码器架构
            outputs = outputs + (past_key_value,) # 带上缓存
        return outputs

In [36]:
get_torch_version()

'2.6.0+cu124'

In [37]:
class BertSdpaSelfAttention(BertSelfAttention): # sdpa
    def __init__(self, config, position_embedding_type=None):
        super().__init__(config, position_embedding_type=position_embedding_type)
        self.dropout_prob = config.attention_probs_dropout_prob # dropout比率
        self.require_contiguous_qkv = version.parse(get_torch_version()) < version.parse("2.2.0")
    # Adapted from BertSelfAttention
    def forward(
        self,
        hidden_states: torch.Tensor,
        attention_mask: Optional[torch.Tensor] = None,
        head_mask: Optional[torch.FloatTensor] = None,
        encoder_hidden_states: Optional[torch.FloatTensor] = None,
        encoder_attention_mask: Optional[torch.FloatTensor] = None,
        past_key_value: Optional[Tuple[Tuple[torch.FloatTensor]]] = None,
        output_attentions: Optional[bool] = False,
    ) -> Tuple[torch.Tensor]:
        # 如果不是绝对位置嵌入,或者设定输出attentions,或者head_mask存在
        if self.position_embedding_type != "absolute" or output_attentions or head_mask is not None:
            # 上面的情况不支持,回退到BertSelfAttention
            logger.warning_once(
                "BertSdpaSelfAttention is used but `torch.nn.functional.scaled_dot_product_attention` does not support "
                "non-absolute `position_embedding_type` or `output_attentions=True` or `head_mask`. Falling back to "
                "the manual attention implementation, but specifying the manual implementation will be required from "
                "Transformers version v5.0.0 onwards. This warning can be removed using the argument "
                '`attn_implementation="eager"` when loading the model.'
            )
            return super().forward(
                hidden_states,
                attention_mask,
                head_mask,
                encoder_hidden_states,
                encoder_attention_mask,
                past_key_value,
                output_attentions,
            )

        bsz, tgt_len, _ = hidden_states.size() # 批次大小,目标序列长度
        # -->(b,h,s_q,hd)
        query_layer = self.transpose_for_scores(self.query(hidden_states))
        # 如果有编码器输出encoder_hidden_states,那么就是交叉注意力
        is_cross_attention = encoder_hidden_states is not None
        # 这里设定当前的k,v states
        current_states = encoder_hidden_states if is_cross_attention else hidden_states
        # 设定注意力掩码 如果是交叉注意力,就是编码器填充掩码,否则就是传入的attention_mask(填充或因果)
        attention_mask = encoder_attention_mask if is_cross_attention else attention_mask
        # 如果是交叉注意力并且有缓存,并且缓存的key的序列长度==当前的传入的序列长度
        if is_cross_attention and past_key_value and past_key_value[0].shape[2] == current_states.shape[1]:
            key_layer, value_layer = past_key_value # 缓存的key,value states
        else: # 是交叉，但是没缓存,或者是自注意力
            key_layer = self.transpose_for_scores(self.key(current_states)) # (b,h,s_k,hd)
            value_layer = self.transpose_for_scores(self.value(current_states)) # (b,h,s_v,hd)
            # 如果有缓存,解码器自注意力 
            if past_key_value is not None and not is_cross_attention:
                key_layer = torch.cat([past_key_value[0], key_layer], dim=2)  # 拼接缓存和当前
                value_layer = torch.cat([past_key_value[1], value_layer], dim=2)

        if self.is_decoder: # 如果是解码器架构
            # 解码器才设定缓存,编码器不设定
            past_key_value = (key_layer, value_layer)
        # 设定连续
        if self.require_contiguous_qkv and query_layer.device.type == "cuda" and attention_mask is not None:
            query_layer = query_layer.contiguous()
            key_layer = key_layer.contiguous()
            value_layer = value_layer.contiguous()

        # 我们通过这个 is_causal 的 if 语句来调用 SDPA 的 Flash Attention 或 Efficient kernel，而不是在 SDPA 中使用内联的条件赋值。
        # 这样做是为了支持 torch.compile 的动态形状（dynamic shapes）和完整计算图（full graph）选项。
        # 如果使用内联条件，会导致动态形状无法编译。
        # 如果是解码器架构,并且当前注意力机制不是交叉注意力,并且没有传入attention_mask,并且传入的tgt_len > 1
        # tgt_len > 1是没有使用缓存k,v的情况,这时设定is_causal =True
        is_causal = (
            True if self.is_decoder and not is_cross_attention and attention_mask is None and tgt_len > 1 else False
        )
        # 使用sdpa 缩放点积注意力 -->(b,h,s_q,hd)
        attn_output = torch.nn.functional.scaled_dot_product_attention(
            query_layer,
            key_layer,
            value_layer,
            attn_mask=attention_mask,
            dropout_p=self.dropout_prob if self.training else 0.0,
            is_causal=is_causal,
        )
        
        attn_output = attn_output.transpose(1, 2) # (b,s_q,h,hd)
        attn_output = attn_output.reshape(bsz, tgt_len, self.all_head_size) # (b,s,d)
        outputs = (attn_output,)
        if self.is_decoder: # 如果是解码器架构,返回输出+缓存
            outputs = outputs + (past_key_value,)
        return outputs 

In [39]:
class BertSelfOutput(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.dense = nn.Linear(config.hidden_size, config.hidden_size)
        self.LayerNorm = nn.LayerNorm(config.hidden_size, eps=config.layer_norm_eps)
        self.dropout = nn.Dropout(config.hidden_dropout_prob)
    def forward(self, hidden_states: torch.Tensor, input_tensor: torch.Tensor) -> torch.Tensor:
        hidden_states = self.dense(hidden_states) # 注意力机制最后的线性层
        hidden_states = self.dropout(hidden_states)
        # 残差+norm
        hidden_states = self.LayerNorm(hidden_states + input_tensor) 
        return hidden_states

In [40]:
BERT_SELF_ATTENTION_CLASSES = {
    "eager": BertSelfAttention,
    "sdpa": BertSdpaSelfAttention,
}

In [41]:
config._attn_implementation

'eager'

In [42]:
class BertAttention(nn.Module): 
    def __init__(self, config, position_embedding_type=None):
        super().__init__()
        self.self = BERT_SELF_ATTENTION_CLASSES[config._attn_implementation]( # 注意力机制
            config, position_embedding_type=position_embedding_type
        )
        self.output = BertSelfOutput(config)
        self.pruned_heads = set() # 存放已修剪头的集合
    def prune_heads(self, heads):
        if len(heads) == 0:
            return
        # 返回已经修剪过的头的索引,index是标记每个嵌入元素位置是否是修剪过的
        heads, index = find_pruneable_heads_and_indices(
            heads, self.self.num_attention_heads, self.self.attention_head_size, self.pruned_heads
        )
        # 修剪q,k,v
        self.self.query = prune_linear_layer(self.self.query, index)
        self.self.key = prune_linear_layer(self.self.key, index)
        self.self.value = prune_linear_layer(self.self.value, index)
        self.output.dense = prune_linear_layer(self.output.dense, index, dim=1)
        # 更新当前的头数为修剪后的头数
        self.self.num_attention_heads = self.self.num_attention_heads - len(heads)
        # 更新所有头的嵌入表示大小为修剪后的
        self.self.all_head_size = self.self.attention_head_size * self.self.num_attention_heads
        self.pruned_heads = self.pruned_heads.union(heads) # 更新已经修剪的头集合
    def forward(
        self,
        hidden_states: torch.Tensor,
        attention_mask: Optional[torch.FloatTensor] = None,
        head_mask: Optional[torch.FloatTensor] = None,
        encoder_hidden_states: Optional[torch.FloatTensor] = None,
        encoder_attention_mask: Optional[torch.FloatTensor] = None,
        past_key_value: Optional[Tuple[Tuple[torch.FloatTensor]]] = None,
        output_attentions: Optional[bool] = False,
    ) -> Tuple[torch.Tensor]:
        self_outputs = self.self( 
            hidden_states,
            attention_mask,
            head_mask,
            encoder_hidden_states,
            encoder_attention_mask,
            past_key_value,
            output_attentions,
        )
        # 注意力前后残差
        attention_output = self.output(self_outputs[0], hidden_states)
        outputs = (attention_output,) + self_outputs[1:]  # add attentions if we output them
        return outputs

In [43]:
class BertIntermediate(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.dense = nn.Linear(config.hidden_size, config.intermediate_size) # 升维层
        if isinstance(config.hidden_act, str):
            self.intermediate_act_fn = ACT2FN[config.hidden_act]
        else:
            self.intermediate_act_fn = config.hidden_act
    def forward(self, hidden_states: torch.Tensor) -> torch.Tensor:
        hidden_states = self.dense(hidden_states)
        hidden_states = self.intermediate_act_fn(hidden_states)
        return hidden_states

In [44]:
class BertOutput(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.dense = nn.Linear(config.intermediate_size, config.hidden_size) # 降维层
        self.LayerNorm = nn.LayerNorm(config.hidden_size, eps=config.layer_norm_eps)
        self.dropout = nn.Dropout(config.hidden_dropout_prob)
    def forward(self, hidden_states: torch.Tensor, input_tensor: torch.Tensor) -> torch.Tensor:
        hidden_states = self.dense(hidden_states)
        hidden_states = self.dropout(hidden_states)
        hidden_states = self.LayerNorm(hidden_states + input_tensor)
        return hidden_states

In [45]:
config.chunk_size_feed_forward

0

In [46]:
config.is_decoder

False

In [47]:
config.add_cross_attention

False

In [48]:
class BertLayer(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.chunk_size_feed_forward = config.chunk_size_feed_forward # 前馈分块大小
        self.seq_len_dim = 1 # 序列长度所在维度
        self.attention = BertAttention(config) # 注意力
        self.is_decoder = config.is_decoder # 是否是解码器架构
        self.add_cross_attention = config.add_cross_attention # 是否添加交叉注意力
        if self.add_cross_attention: # 如果需要添加交叉注意力(encoder-decoder)
            if not self.is_decoder: # 如果不是解码器架构
                raise ValueError(f"{self} should be used as a decoder model if cross attention is added")
            self.crossattention = BertAttention(config, position_embedding_type="absolute") # 交叉注意力
        self.intermediate = BertIntermediate(config) 
        self.output = BertOutput(config)

    def forward(
        self,
        hidden_states: torch.Tensor,
        attention_mask: Optional[torch.FloatTensor] = None,
        head_mask: Optional[torch.FloatTensor] = None,
        encoder_hidden_states: Optional[torch.FloatTensor] = None,
        encoder_attention_mask: Optional[torch.FloatTensor] = None,
        past_key_value: Optional[Tuple[Tuple[torch.FloatTensor]]] = None,
        output_attentions: Optional[bool] = False,
    ) -> Tuple[torch.Tensor]:
        # 如果缓存存在,这里获取解码器自注意力的缓存
        self_attn_past_key_value = past_key_value[:2] if past_key_value is not None else None
        self_attention_outputs = self.attention( # 自注意力
            hidden_states,
            attention_mask,
            head_mask,
            output_attentions=output_attentions,
            past_key_value=self_attn_past_key_value,# 缓存
        )
        attention_output = self_attention_outputs[0] 

        # 如果是解码器架构
        if self.is_decoder:
            outputs = self_attention_outputs[1:-1] # 注意力权重矩阵
            present_key_value = self_attention_outputs[-1] # 当前缓存
        else: # 编码器没缓存
            outputs = self_attention_outputs[1:] 
        
        cross_attn_present_key_value = None # 交叉注意力的当前缓存
        # 如果是解码器,并且有编码器输出
        if self.is_decoder and encoder_hidden_states is not None:
            # 这时就必须有设定交叉注意力类
            if not hasattr(self, "crossattention"): 
                raise ValueError(
                    f"If `encoder_hidden_states` are passed, {self} has to be instantiated with cross-attention layers"
                    " by setting `config.add_cross_attention=True`"
                )

            # 如果past_key_value存在,这时的交叉缓存是在past_key_value[-2:]后两个位置
            cross_attn_past_key_value = past_key_value[-2:] if past_key_value is not None else None
            cross_attention_outputs = self.crossattention( # 交叉注意力
                attention_output,
                attention_mask,
                head_mask,
                encoder_hidden_states,
                encoder_attention_mask,
                cross_attn_past_key_value,
                output_attentions,
            )
            attention_output = cross_attention_outputs[0] # 经过解码器后的输出
            outputs = outputs + cross_attention_outputs[1:-1]  # 交叉注意力权重矩阵
            # 交叉注意力缓存
            cross_attn_present_key_value = cross_attention_outputs[-1]
            # 当前缓存包括自注意力缓存和交叉注意力缓存
            present_key_value = present_key_value + cross_attn_present_key_value
        # 应用分块前馈
        layer_output = apply_chunking_to_forward(
            self.feed_forward_chunk, self.chunk_size_feed_forward, self.seq_len_dim, attention_output
        )
        outputs = (layer_output,) + outputs
        # 如果是解码器架构
        if self.is_decoder: # 解码器需要缓存
            outputs = outputs + (present_key_value,)
        return outputs
    def feed_forward_chunk(self, attention_output):
        intermediate_output = self.intermediate(attention_output)
        layer_output = self.output(intermediate_output, attention_output) # 前馈前后残差
        return layer_output

In [49]:
config.num_hidden_layers

12

In [57]:
class BertEncoder(nn.Module): # 编码器
    def __init__(self, config):
        super().__init__()
        self.config = config
        # 层列表
        self.layer = nn.ModuleList([BertLayer(config) for _ in range(config.num_hidden_layers)])
        self.gradient_checkpointing = False # 设置是否使用梯度检查

    def forward(
        self,
        hidden_states: torch.Tensor,
        attention_mask: Optional[torch.FloatTensor] = None,
        head_mask: Optional[torch.FloatTensor] = None,
        encoder_hidden_states: Optional[torch.FloatTensor] = None,
        encoder_attention_mask: Optional[torch.FloatTensor] = None,
        past_key_values: Optional[Tuple[Tuple[torch.FloatTensor]]] = None,
        use_cache: Optional[bool] = None,
        output_attentions: Optional[bool] = False,
        output_hidden_states: Optional[bool] = False,
        return_dict: Optional[bool] = True,
    ) -> Union[Tuple[torch.Tensor], BaseModelOutputWithPastAndCrossAttentions]:
        all_hidden_states = () if output_hidden_states else None # 用来存储每层的输入
        all_self_attentions = () if output_attentions else None
        all_cross_attentions = () if output_attentions and self.config.add_cross_attention else None
        # 如果是训练模式,并且指定使用梯度检查
        if self.gradient_checkpointing and self.training:
            if use_cache: 
                logger.warning_once(
                    "`use_cache=True` is incompatible with gradient checkpointing. Setting `use_cache=False`..."
                )
                use_cache = False
        # 
        next_decoder_cache = () if use_cache else None
        for i, layer_module in enumerate(self.layer): # 遍历每个编码器层
            if output_hidden_states: 
                all_hidden_states = all_hidden_states + (hidden_states,)
            layer_head_mask = head_mask[i] if head_mask is not None else None # 头掩码
            past_key_value = past_key_values[i] if past_key_values is not None else None # 每层的缓存
            if self.gradient_checkpointing and self.training:
                layer_outputs = self._gradient_checkpointing_func(
                    layer_module.__call__,
                    hidden_states,
                    attention_mask,
                    layer_head_mask,
                    encoder_hidden_states,
                    encoder_attention_mask,
                    past_key_value,
                    output_attentions,
                )
            else: # 正确情况
                layer_outputs = layer_module(
                    hidden_states,
                    attention_mask,
                    layer_head_mask,
                    encoder_hidden_states,
                    encoder_attention_mask,
                    past_key_value,
                    output_attentions,
                )

            hidden_states = layer_outputs[0] # 每层编码器的输出
            if use_cache: # 如果用缓存把每层的缓存当成元组的元素拼接
                next_decoder_cache += (layer_outputs[-1],)
            if output_attentions: # 如果需要输出注意力矩阵
                all_self_attentions = all_self_attentions + (layer_outputs[1],)
                if self.config.add_cross_attention:
                    all_cross_attentions = all_cross_attentions + (layer_outputs[2],)
        # 拼接最后一层输出
        if output_hidden_states:
            all_hidden_states = all_hidden_states + (hidden_states,)
        if not return_dict:
            return tuple(
                v
                for v in [
                    hidden_states,
                    next_decoder_cache,
                    all_hidden_states,
                    all_self_attentions,
                    all_cross_attentions,
                ]
                if v is not None
            )
        return BaseModelOutputWithPastAndCrossAttentions(
            last_hidden_state=hidden_states,
            past_key_values=next_decoder_cache,
            hidden_states=all_hidden_states,
            attentions=all_self_attentions,
            cross_attentions=all_cross_attentions,
        )

In [56]:
((1,2),)+()+((3,4),)

((1, 2), (3, 4))

In [58]:
class BertPooler(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.dense = nn.Linear(config.hidden_size, config.hidden_size)
        self.activation = nn.Tanh()

    def forward(self, hidden_states: torch.Tensor) -> torch.Tensor:
        # 第一个cls的输出 用来作为句子表示(b,d)
        first_token_tensor = hidden_states[:, 0]
        pooled_output = self.dense(first_token_tensor)
        pooled_output = self.activation(pooled_output)
        return pooled_output

In [59]:
class BertPredictionHeadTransform(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.dense = nn.Linear(config.hidden_size, config.hidden_size)
        if isinstance(config.hidden_act, str): # 激活函数
            self.transform_act_fn = ACT2FN[config.hidden_act]
        else:
            self.transform_act_fn = config.hidden_act
        self.LayerNorm = nn.LayerNorm(config.hidden_size, eps=config.layer_norm_eps) # norm
    def forward(self, hidden_states: torch.Tensor) -> torch.Tensor:
        hidden_states = self.dense(hidden_states) 
        hidden_states = self.transform_act_fn(hidden_states)
        hidden_states = self.LayerNorm(hidden_states)
        return hidden_states

In [61]:
class BertLMPredictionHead(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.transform = BertPredictionHeadTransform(config)
        # 预测头
        self.decoder = nn.Linear(config.hidden_size, config.vocab_size, bias=False)
        
        self.bias = nn.Parameter(torch.zeros(config.vocab_size))
        # 需要两个变量之间的联系，以便使用“resize_token_embeddings”正确调整偏差的大小
        self.decoder.bias = self.bias
    def _tie_weights(self):
        self.decoder.bias = self.bias
    def forward(self, hidden_states):
        hidden_states = self.transform(hidden_states)
        hidden_states = self.decoder(hidden_states) # (b,s,v)
        return hidden_states

In [62]:
class BertOnlyMLMHead(nn.Module): # 掩码语言头
    def __init__(self, config):
        super().__init__()
        self.predictions = BertLMPredictionHead(config) 
    def forward(self, sequence_output: torch.Tensor) -> torch.Tensor:
        prediction_scores = self.predictions(sequence_output) # (b,s,v)
        return prediction_scores

In [63]:
class BertOnlyNSPHead(nn.Module): # nsp 句子二分类
    def __init__(self, config):
        super().__init__()
        self.seq_relationship = nn.Linear(config.hidden_size, 2)
    def forward(self, pooled_output): # 池化输出(b,d)
        seq_relationship_score = self.seq_relationship(pooled_output) # 序列关系得分
        return seq_relationship_score

In [64]:
class BertPreTrainingHeads(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.predictions = BertLMPredictionHead(config) # 掩码语言头
        self.seq_relationship = nn.Linear(config.hidden_size, 2)
    # 输入是序列输出和池化输出 (b,s,d) (b,d)
    def forward(self, sequence_output, pooled_output):
        prediction_scores = self.predictions(sequence_output) # (b,s,v)
        seq_relationship_score = self.seq_relationship(pooled_output) # (b,2)
        return prediction_scores, seq_relationship_score

In [65]:
class BertPreTrainedModel(PreTrainedModel):
    config_class = BertConfig # 配置类
    load_tf_weights = load_tf_weights_in_bert # 在pytorch中加载tf权重
    base_model_prefix = "bert"
    supports_gradient_checkpointing = True # 是否支持梯度检查
    _supports_sdpa = True # 是否支持sdpa
    def _init_weights(self, module):
        if isinstance(module, nn.Linear): # 如果当前模块是线性层
            # 权重正态分布初始化
            module.weight.data.normal_(mean=0.0, std=self.config.initializer_range)
            if module.bias is not None: # 截距初始化
                module.bias.data.zero_()
        elif isinstance(module, nn.Embedding): # 如果是嵌入模块
            module.weight.data.normal_(mean=0.0, std=self.config.initializer_range)
            if module.padding_idx is not None: # 对填充token进行0初始化
                module.weight.data[module.padding_idx].zero_()
        elif isinstance(module, nn.LayerNorm): # 如果是norm层
            module.bias.data.zero_() # 截距0初始化
            module.weight.data.fill_(1.0) # 对权重1初始化
        elif isinstance(module, BertLMPredictionHead):
            module.bias.data.zero_()

In [66]:
@dataclass
class BertForPreTrainingOutput(ModelOutput):
    """
    Output type of [`BertForPreTraining`].

    Args:
        loss (*optional*, returned when `labels` is provided, `torch.FloatTensor` of shape `(1,)`):
            Total loss as the sum of the masked language modeling loss and the next sequence prediction
            (classification) loss.
        prediction_logits (`torch.FloatTensor` of shape `(batch_size, sequence_length, config.vocab_size)`):
            Prediction scores of the language modeling head (scores for each vocabulary token before SoftMax).
        seq_relationship_logits (`torch.FloatTensor` of shape `(batch_size, 2)`):
            Prediction scores of the next sequence prediction (classification) head (scores of True/False continuation
            before SoftMax).
        hidden_states (`tuple(torch.FloatTensor)`, *optional*, returned when `output_hidden_states=True` is passed or when `config.output_hidden_states=True`):
            Tuple of `torch.FloatTensor` (one for the output of the embeddings + one for the output of each layer) of
            shape `(batch_size, sequence_length, hidden_size)`.

            Hidden-states of the model at the output of each layer plus the initial embedding outputs.
        attentions (`tuple(torch.FloatTensor)`, *optional*, returned when `output_attentions=True` is passed or when `config.output_attentions=True`):
            Tuple of `torch.FloatTensor` (one for each layer) of shape `(batch_size, num_heads, sequence_length,
            sequence_length)`.

            Attentions weights after the attention softmax, used to compute the weighted average in the self-attention
            heads.
    """

    loss: Optional[torch.FloatTensor] = None # 损失
    prediction_logits: Optional[torch.FloatTensor] = None 
    seq_relationship_logits: Optional[torch.FloatTensor] = None
    hidden_states: Optional[Tuple[torch.FloatTensor]] = None
    attentions: Optional[Tuple[torch.FloatTensor]] = None

In [67]:
BERT_START_DOCSTRING = r"""

    This model inherits from [`PreTrainedModel`]. Check the superclass documentation for the generic methods the
    library implements for all its model (such as downloading or saving, resizing the input embeddings, pruning heads
    etc.)

    This model is also a PyTorch [torch.nn.Module](https://pytorch.org/docs/stable/nn.html#torch.nn.Module) subclass.
    Use it as a regular PyTorch Module and refer to the PyTorch documentation for all matter related to general usage
    and behavior.

    Parameters:
        config ([`BertConfig`]): Model configuration class with all the parameters of the model.
            Initializing with a config file does not load the weights associated with the model, only the
            configuration. Check out the [`~PreTrainedModel.from_pretrained`] method to load the model weights.
"""

BERT_INPUTS_DOCSTRING = r"""
    Args:
        input_ids (`torch.LongTensor` of shape `({0})`):
            Indices of input sequence tokens in the vocabulary.

            Indices can be obtained using [`AutoTokenizer`]. See [`PreTrainedTokenizer.encode`] and
            [`PreTrainedTokenizer.__call__`] for details.

            [What are input IDs?](../glossary#input-ids)
        attention_mask (`torch.FloatTensor` of shape `({0})`or `(batch_size, sequence_length, target_length)`, *optional*):
            Mask to avoid performing attention on padding token indices. Mask values selected in `[0, 1]`:

            - 1 for tokens that are **not masked**,
            - 0 for tokens that are **masked**.

            [What are attention masks?](../glossary#attention-mask)
        token_type_ids (`torch.LongTensor` of shape `({0})`, *optional*):
            Segment token indices to indicate first and second portions of the inputs. Indices are selected in `[0,
            1]`:

            - 0 corresponds to a *sentence A* token,
            - 1 corresponds to a *sentence B* token.

            [What are token type IDs?](../glossary#token-type-ids)
        position_ids (`torch.LongTensor` of shape `({0})`, *optional*):
            Indices of positions of each input sequence tokens in the position embeddings. Selected in the range `[0,
            config.max_position_embeddings - 1]`.

            [What are position IDs?](../glossary#position-ids)
        head_mask (`torch.FloatTensor` of shape `(num_heads,)` or `(num_layers, num_heads)`, *optional*):
            Mask to nullify selected heads of the self-attention modules. Mask values selected in `[0, 1]`:

            - 1 indicates the head is **not masked**,
            - 0 indicates the head is **masked**.

        inputs_embeds (`torch.FloatTensor` of shape `({0}, hidden_size)`, *optional*):
            Optionally, instead of passing `input_ids` you can choose to directly pass an embedded representation. This
            is useful if you want more control over how to convert `input_ids` indices into associated vectors than the
            model's internal embedding lookup matrix.
        output_attentions (`bool`, *optional*):
            Whether or not to return the attentions tensors of all attention layers. See `attentions` under returned
            tensors for more detail.
        output_hidden_states (`bool`, *optional*):
            Whether or not to return the hidden states of all layers. See `hidden_states` under returned tensors for
            more detail.
        return_dict (`bool`, *optional*):
            Whether or not to return a [`~utils.ModelOutput`] instead of a plain tuple.
"""

In [68]:
@add_start_docstrings(
    "The bare Bert Model transformer outputting raw hidden-states without any specific head on top.",
    BERT_START_DOCSTRING,
)
class BertModel(BertPreTrainedModel):
    _no_split_modules = ["BertEmbeddings", "BertLayer"] # 设定不可拆分的模块
    # add_pooling_layer是否添加池化层
    def __init__(self, config, add_pooling_layer=True): 
        super().__init__(config)
        self.config = config
        self.embeddings = BertEmbeddings(config)
        self.encoder = BertEncoder(config)
        self.pooler = BertPooler(config) if add_pooling_layer else None
        self.attn_implementation = config._attn_implementation
        self.position_embedding_type = config.position_embedding_type
        # 执行权重初始化和其他后处理操作
        self.post_init()
    def get_input_embeddings(self): # 获取词嵌入
        return self.embeddings.word_embeddings
    def set_input_embeddings(self, value):
        self.embeddings.word_embeddings = value
    # 修剪每一层的头
    def _prune_heads(self, heads_to_prune):
        for layer, heads in heads_to_prune.items():
            self.encoder.layer[layer].attention.prune_heads(heads)
    @add_start_docstrings_to_model_forward(BERT_INPUTS_DOCSTRING.format("batch_size, sequence_length"))
    @add_code_sample_docstrings(
        checkpoint=_CHECKPOINT_FOR_DOC,
        output_type=BaseModelOutputWithPoolingAndCrossAttentions,
        config_class=_CONFIG_FOR_DOC,
    )
    def forward(
        self,
        input_ids: Optional[torch.Tensor] = None,
        attention_mask: Optional[torch.Tensor] = None,
        token_type_ids: Optional[torch.Tensor] = None,
        position_ids: Optional[torch.Tensor] = None,
        head_mask: Optional[torch.Tensor] = None,
        inputs_embeds: Optional[torch.Tensor] = None,
        encoder_hidden_states: Optional[torch.Tensor] = None,
        encoder_attention_mask: Optional[torch.Tensor] = None,
        past_key_values: Optional[List[torch.FloatTensor]] = None,
        use_cache: Optional[bool] = None,
        output_attentions: Optional[bool] = None,
        output_hidden_states: Optional[bool] = None,
        return_dict: Optional[bool] = None,
    ) -> Union[Tuple[torch.Tensor], BaseModelOutputWithPoolingAndCrossAttentions]:
        
        output_attentions = output_attentions if output_attentions is not None else self.config.output_attentions
        output_hidden_states = (
            output_hidden_states if output_hidden_states is not None else self.config.output_hidden_states
        )
        return_dict = return_dict if return_dict is not None else self.config.use_return_dict

        if self.config.is_decoder: # 如果是解码器架构
            use_cache = use_cache if use_cache is not None else self.config.use_cache
        else: # 编码器不用缓存
            use_cache = False
        # 设定输入形状
        if input_ids is not None and inputs_embeds is not None:
            raise ValueError("You cannot specify both input_ids and inputs_embeds at the same time")
        elif input_ids is not None:
            self.warn_if_padding_and_no_attention_mask(input_ids, attention_mask)
            input_shape = input_ids.size()
        elif inputs_embeds is not None:
            input_shape = inputs_embeds.size()[:-1]
        else:
            raise ValueError("You have to specify either input_ids or inputs_embeds")
        batch_size, seq_length = input_shape 
        device = input_ids.device if input_ids is not None else inputs_embeds.device
        # 缓存key,value 
        past_key_values_length = past_key_values[0][0].shape[2] if past_key_values is not None else 0
        # 设定token type ids
        if token_type_ids is None:
            if hasattr(self.embeddings, "token_type_ids"):
                buffered_token_type_ids = self.embeddings.token_type_ids[:, :seq_length]
                buffered_token_type_ids_expanded = buffered_token_type_ids.expand(batch_size, seq_length)
                token_type_ids = buffered_token_type_ids_expanded
            else:
                token_type_ids = torch.zeros(input_shape, dtype=torch.long, device=device)
        embedding_output = self.embeddings( # 嵌入
            input_ids=input_ids,
            position_ids=position_ids,
            token_type_ids=token_type_ids,
            inputs_embeds=inputs_embeds,
            past_key_values_length=past_key_values_length,
        )
        # 设定默认的attention_mask
        if attention_mask is None:
            attention_mask = torch.ones((batch_size, seq_length + past_key_values_length), device=device)
        # 是否使用sdpa注意力mask
        # 如果注意力实现是sdpa,并且位置嵌入类型是绝对,并且没有头掩码,并且不设定输出attentions
        use_sdpa_attention_masks = (
            self.attn_implementation == "sdpa"
            and self.position_embedding_type == "absolute"
            and head_mask is None
            and not output_attentions
        )
        # 扩展attention_mask的维度
        if use_sdpa_attention_masks and attention_mask.dim() == 2:
            # 如果是解码器架构
            # [bsz, seq_len] -> [bsz, 1, seq_len, seq_len]
            if self.config.is_decoder: # 因果
                extended_attention_mask = _prepare_4d_causal_attention_mask_for_sdpa(
                    attention_mask,
                    input_shape,
                    embedding_output,
                    past_key_values_length,
                )
            else: 
                extended_attention_mask = _prepare_4d_attention_mask_for_sdpa(
                    attention_mask, embedding_output.dtype, tgt_len=seq_length
                )
        else: 
            # 我们可以提供一个维度为 [batch_size, from_seq_length, to_seq_length] 的自注意力掩码
            extended_attention_mask = self.get_extended_attention_mask(attention_mask, input_shape)

        # 如果是解码器跨注意力
        # we need to make broadcastable to [batch_size, num_heads, seq_length, seq_length]
        if self.config.is_decoder and encoder_hidden_states is not None:
            encoder_batch_size, encoder_sequence_length, _ = encoder_hidden_states.size()
            encoder_hidden_shape = (encoder_batch_size, encoder_sequence_length)
            if encoder_attention_mask is None: # 设定默认的编码器填充掩码
                encoder_attention_mask = torch.ones(encoder_hidden_shape, device=device)
            if use_sdpa_attention_masks and encoder_attention_mask.dim() == 2:
                # Expand the attention mask for SDPA.
                # [bsz, seq_len] -> [bsz, 1, seq_len, seq_len]
                encoder_extended_attention_mask = _prepare_4d_attention_mask_for_sdpa(
                    encoder_attention_mask, embedding_output.dtype, tgt_len=seq_length
                )
            else:
                encoder_extended_attention_mask = self.invert_attention_mask(encoder_attention_mask)
        else: # 不是跨注意力的情况
            encoder_extended_attention_mask = None

        # Prepare head mask if needed
        # 1.0 in head_mask indicate we keep the head
        # attention_probs has shape bsz x n_heads x N x N
        # input head_mask has shape [num_heads] or [num_hidden_layers x num_heads]
        # and head_mask is converted to shape [num_hidden_layers x batch x num_heads x seq_length x seq_length]
        head_mask = self.get_head_mask(head_mask, self.config.num_hidden_layers)
        encoder_outputs = self.encoder( # 编码器输出
            embedding_output,
            attention_mask=extended_attention_mask,
            head_mask=head_mask,
            encoder_hidden_states=encoder_hidden_states,
            encoder_attention_mask=encoder_extended_attention_mask,
            past_key_values=past_key_values,
            use_cache=use_cache,
            output_attentions=output_attentions,
            output_hidden_states=output_hidden_states,
            return_dict=return_dict,
        )
        sequence_output = encoder_outputs[0]
        pooled_output = self.pooler(sequence_output) if self.pooler is not None else None
        if not return_dict:
            return (sequence_output, pooled_output) + encoder_outputs[1:]

        return BaseModelOutputWithPoolingAndCrossAttentions(
            last_hidden_state=sequence_output,
            pooler_output=pooled_output,
            past_key_values=encoder_outputs.past_key_values,
            hidden_states=encoder_outputs.hidden_states,
            attentions=encoder_outputs.attentions,
            cross_attentions=encoder_outputs.cross_attentions,
        )

In [69]:
@add_start_docstrings(
    """
    Bert Model with two heads on top as done during the pretraining: a `masked language modeling` head and a `next
    sentence prediction (classification)` head.
    """,
    BERT_START_DOCSTRING,
)
class BertForPreTraining(BertPreTrainedModel):
    # 指定要共享的权重键
    _tied_weights_keys = ["predictions.decoder.bias", "cls.predictions.decoder.weight"]
    def __init__(self, config):
        super().__init__(config)
        self.bert = BertModel(config)
        self.cls = BertPreTrainingHeads(config)
        # Initialize weights and apply final processing
        self.post_init()
    def get_output_embeddings(self):
        return self.cls.predictions.decoder
    def set_output_embeddings(self, new_embeddings):
        self.cls.predictions.decoder = new_embeddings
        self.cls.predictions.bias = new_embeddings.bias

    @add_start_docstrings_to_model_forward(BERT_INPUTS_DOCSTRING.format("batch_size, sequence_length"))
    @replace_return_docstrings(output_type=BertForPreTrainingOutput, config_class=_CONFIG_FOR_DOC)
    def forward(
        self,
        input_ids: Optional[torch.Tensor] = None,
        attention_mask: Optional[torch.Tensor] = None,
        token_type_ids: Optional[torch.Tensor] = None,
        position_ids: Optional[torch.Tensor] = None,
        head_mask: Optional[torch.Tensor] = None,
        inputs_embeds: Optional[torch.Tensor] = None,
        labels: Optional[torch.Tensor] = None,
        next_sentence_label: Optional[torch.Tensor] = None,
        output_attentions: Optional[bool] = None,
        output_hidden_states: Optional[bool] = None,
        return_dict: Optional[bool] = None,
    ) -> Union[Tuple[torch.Tensor], BertForPreTrainingOutput]:
        r"""
        Returns:
        Example:
        ```python
        >>> from transformers import AutoTokenizer, BertForPreTraining
        >>> import torch

        >>> tokenizer = AutoTokenizer.from_pretrained("google-bert/bert-base-uncased")
        >>> model = BertForPreTraining.from_pretrained("google-bert/bert-base-uncased")

        >>> inputs = tokenizer("Hello, my dog is cute", return_tensors="pt")
        >>> outputs = model(**inputs)

        >>> prediction_logits = outputs.prediction_logits
        >>> seq_relationship_logits = outputs.seq_relationship_logits
        ```
        """
        return_dict = return_dict if return_dict is not None else self.config.use_return_dict
        
        outputs = self.bert(
            input_ids,
            attention_mask=attention_mask,
            token_type_ids=token_type_ids,
            position_ids=position_ids,
            head_mask=head_mask,
            inputs_embeds=inputs_embeds,
            output_attentions=output_attentions,
            output_hidden_states=output_hidden_states,
            return_dict=return_dict,
        )
        # outputs默认是元组
        sequence_output, pooled_output = outputs[:2]
        #(b,s,v),(b,2) MLM logits NSP logits
        prediction_scores, seq_relationship_score = self.cls(sequence_output, pooled_output)
        total_loss = None 
        if labels is not None and next_sentence_label is not None:
            loss_fct = CrossEntropyLoss() # 交叉熵损失函数
            # 掩码语言损失
            masked_lm_loss = loss_fct(prediction_scores.view(-1, self.config.vocab_size), labels.view(-1))
            next_sentence_loss = loss_fct(seq_relationship_score.view(-1, 2), next_sentence_label.view(-1))
            total_loss = masked_lm_loss + next_sentence_loss # 总损失 MLM 损失+NSP 损失

        if not return_dict:
            output = (prediction_scores, seq_relationship_score) + outputs[2:]
            return ((total_loss,) + output) if total_loss is not None else output

        return BertForPreTrainingOutput(
            loss=total_loss,
            prediction_logits=prediction_scores,
            seq_relationship_logits=seq_relationship_score,
            hidden_states=outputs.hidden_states,
            attentions=outputs.attentions,
        )

In [None]:
help(BertForPreTraining)

In [73]:
from transformers import AutoTokenizer, BertForPreTraining

In [74]:
tokenizer = AutoTokenizer.from_pretrained("google-bert/bert-base-uncased")
model = BertForPreTraining.from_pretrained("google-bert/bert-base-uncased")

tokenizer_config.json:   0%|          | 0.00/48.0 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/570 [00:00<?, ?B/s]

vocab.txt:   0%|          | 0.00/232k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/466k [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/440M [00:00<?, ?B/s]

In [75]:
inputs = tokenizer("Hello, my dog is cute", return_tensors="pt")
outputs = model(**inputs)

In [76]:
outputs

BertForPreTrainingOutput(loss=None, prediction_logits=tensor([[[ -7.8962,  -7.8105,  -7.7903,  ...,  -7.0694,  -7.1693,  -4.3590],
         [ -8.4461,  -8.4401,  -8.5044,  ...,  -8.0625,  -7.9909,  -5.7160],
         [-15.2953, -15.4727, -15.5865,  ..., -12.9857, -11.7038, -11.4293],
         ...,
         [-14.0628, -14.2535, -14.3645,  ..., -12.7151, -11.1621, -10.2317],
         [-10.6576, -10.7892, -11.0402,  ..., -10.3233, -10.1578,  -3.7721],
         [-11.3383, -11.4590, -11.1767,  ...,  -9.2152,  -9.5209,  -9.5571]]],
       grad_fn=<ViewBackward0>), seq_relationship_logits=tensor([[ 3.3474, -2.0613]], grad_fn=<AddmmBackward0>), hidden_states=None, attentions=None)

In [77]:
prediction_logits = outputs.prediction_logits
prediction_logits.shape

torch.Size([1, 8, 30522])

In [78]:
seq_relationship_logits = outputs.seq_relationship_logits
seq_relationship_logits.shape

torch.Size([1, 2])

In [82]:
# 生成形式的
@add_start_docstrings(
    """Bert Model with a `language modeling` head on top for CLM fine-tuning.""", BERT_START_DOCSTRING
)
class BertLMHeadModel(BertPreTrainedModel, GenerationMixin):
    _tied_weights_keys = ["cls.predictions.decoder.bias", "cls.predictions.decoder.weight"]
    def __init__(self, config):
        super().__init__(config)
        # 要使用BertLMHeadModel,必须设定解码器模式
        if not config.is_decoder: 
            logger.warning("If you want to use `BertLMHeadModel` as a standalone, add `is_decoder=True.`")
        # 设定只输出mlm logits,不输出池化
        self.bert = BertModel(config, add_pooling_layer=False) 
        self.cls = BertOnlyMLMHead(config)
        # Initialize weights and apply final processing
        self.post_init()
    def get_output_embeddings(self):
        return self.cls.predictions.decoder
    def set_output_embeddings(self, new_embeddings):
        self.cls.predictions.decoder = new_embeddings
        self.cls.predictions.bias = new_embeddings.bias
    @add_start_docstrings_to_model_forward(BERT_INPUTS_DOCSTRING.format("batch_size, sequence_length"))
    @add_code_sample_docstrings(
        checkpoint=_CHECKPOINT_FOR_DOC,
        output_type=CausalLMOutputWithCrossAttentions,
        config_class=_CONFIG_FOR_DOC,
    )
    def forward(
        self,
        input_ids: Optional[torch.Tensor] = None,
        attention_mask: Optional[torch.Tensor] = None,
        token_type_ids: Optional[torch.Tensor] = None,
        position_ids: Optional[torch.Tensor] = None,
        head_mask: Optional[torch.Tensor] = None,
        inputs_embeds: Optional[torch.Tensor] = None,
        encoder_hidden_states: Optional[torch.Tensor] = None,
        encoder_attention_mask: Optional[torch.Tensor] = None,
        labels: Optional[torch.Tensor] = None,
        past_key_values: Optional[List[torch.Tensor]] = None,
        use_cache: Optional[bool] = None,
        output_attentions: Optional[bool] = None,
        output_hidden_states: Optional[bool] = None,
        return_dict: Optional[bool] = None,
        **loss_kwargs,
    ) -> Union[Tuple[torch.Tensor], CausalLMOutputWithCrossAttentions]:
        r"""
        encoder_hidden_states  (`torch.FloatTensor` of shape `(batch_size, sequence_length, hidden_size)`, *optional*):
            Sequence of hidden-states at the output of the last layer of the encoder. Used in the cross-attention if
            the model is configured as a decoder.
        encoder_attention_mask (`torch.FloatTensor` of shape `(batch_size, sequence_length)`, *optional*):
            Mask to avoid performing attention on the padding token indices of the encoder input. This mask is used in
            the cross-attention if the model is configured as a decoder. Mask values selected in `[0, 1]`:

            - 1 for tokens that are **not masked**,
            - 0 for tokens that are **masked**.
        labels (`torch.LongTensor` of shape `(batch_size, sequence_length)`, *optional*):
            Labels for computing the left-to-right language modeling loss (next word prediction). Indices should be in
            `[-100, 0, ..., config.vocab_size]` (see `input_ids` docstring) Tokens with indices set to `-100` are
            ignored (masked), the loss is only computed for the tokens with labels n `[0, ..., config.vocab_size]`
        past_key_values (`tuple(tuple(torch.FloatTensor))` of length `config.n_layers` with each tuple having 4 tensors of shape `(batch_size, num_heads, sequence_length - 1, embed_size_per_head)`):
            Contains precomputed key and value hidden states of the attention blocks. Can be used to speed up decoding.

            If `past_key_values` are used, the user can optionally input only the last `decoder_input_ids` (those that
            don't have their past key value states given to this model) of shape `(batch_size, 1)` instead of all
            `decoder_input_ids` of shape `(batch_size, sequence_length)`.
        use_cache (`bool`, *optional*):
            If set to `True`, `past_key_values` key value states are returned and can be used to speed up decoding (see
            `past_key_values`).
        """
        return_dict = return_dict if return_dict is not None else self.config.use_return_dict
        if labels is not None: # 如果传入了标签,就说明是训练模式,不需要缓存
            use_cache = False

        outputs = self.bert(
            input_ids,
            attention_mask=attention_mask,
            token_type_ids=token_type_ids,
            position_ids=position_ids,
            head_mask=head_mask,
            inputs_embeds=inputs_embeds,
            encoder_hidden_states=encoder_hidden_states,
            encoder_attention_mask=encoder_attention_mask,
            past_key_values=past_key_values,
            use_cache=use_cache,
            output_attentions=output_attentions,
            output_hidden_states=output_hidden_states,
            return_dict=return_dict,
        )

        sequence_output = outputs[0] # (b,s,d)
        prediction_scores = self.cls(sequence_output) # (b,s,v)
        lm_loss = None
        if labels is not None:
            lm_loss = self.loss_function(prediction_scores,
                                         labels, 
                                         self.config.vocab_size, 
                                         **loss_kwargs)

        if not return_dict:
            output = (prediction_scores,) + outputs[2:]
            return ((lm_loss,) + output) if lm_loss is not None else output

        return CausalLMOutputWithCrossAttentions(
            loss=lm_loss,
            logits=prediction_scores,
            past_key_values=outputs.past_key_values,
            hidden_states=outputs.hidden_states,
            attentions=outputs.attentions,
            cross_attentions=outputs.cross_attentions,
        )

    def _reorder_cache(self, past_key_values, beam_idx):
        reordered_past = () # 重新排序的缓存k,v
        for layer_past in past_key_values:
            reordered_past += (
                tuple(past_state.index_select(0, beam_idx.to(past_state.device)) for past_state in layer_past),
            )
        return reordered_past

In [None]:
help(BertLMHeadModel.forward)

In [84]:
from transformers import AutoTokenizer, BertLMHeadModel

In [85]:
tokenizer = AutoTokenizer.from_pretrained("google-bert/bert-base-uncased")
model = BertLMHeadModel.from_pretrained("google-bert/bert-base-uncased")

If you want to use `BertLMHeadModel` as a standalone, add `is_decoder=True.`


In [86]:
inputs = tokenizer("Hello, my dog is cute", return_tensors="pt")
outputs = model(**inputs, labels=inputs["input_ids"])

`loss_type=None` was set in the config but it is unrecognised.Using the default loss: `ForCausalLMLoss`.


In [87]:
loss = outputs.loss
loss

tensor(15.6525, grad_fn=<NllLossBackward0>)

In [89]:
logits = outputs.logits
logits.shape

torch.Size([1, 8, 30522])

In [90]:
# 掩码语言模型
@add_start_docstrings("""Bert Model with a `language modeling` head on top.""", BERT_START_DOCSTRING)
class BertForMaskedLM(BertPreTrainedModel):
    _tied_weights_keys = ["predictions.decoder.bias", "cls.predictions.decoder.weight"]
    def __init__(self, config):
        super().__init__(config)
        # 这种是只有编码器的架构,掩码语言模型
        if config.is_decoder:
            logger.warning(
                "If you want to use `BertForMaskedLM` make sure `config.is_decoder=False` for "
                "bi-directional self-attention."
            )

        self.bert = BertModel(config, add_pooling_layer=False)
        self.cls = BertOnlyMLMHead(config)
        # Initialize weights and apply final processing
        self.post_init()
    def get_output_embeddings(self):
        return self.cls.predictions.decoder
    def set_output_embeddings(self, new_embeddings):
        self.cls.predictions.decoder = new_embeddings
        self.cls.predictions.bias = new_embeddings.bias
    @add_start_docstrings_to_model_forward(BERT_INPUTS_DOCSTRING.format("batch_size, sequence_length"))
    @add_code_sample_docstrings(
        checkpoint=_CHECKPOINT_FOR_DOC,
        output_type=MaskedLMOutput,
        config_class=_CONFIG_FOR_DOC,
        expected_output="'paris'",
        expected_loss=0.88,
    )
    def forward(
        self,
        input_ids: Optional[torch.Tensor] = None,
        attention_mask: Optional[torch.Tensor] = None,
        token_type_ids: Optional[torch.Tensor] = None,
        position_ids: Optional[torch.Tensor] = None,
        head_mask: Optional[torch.Tensor] = None,
        inputs_embeds: Optional[torch.Tensor] = None,
        encoder_hidden_states: Optional[torch.Tensor] = None,
        encoder_attention_mask: Optional[torch.Tensor] = None,
        labels: Optional[torch.Tensor] = None,
        output_attentions: Optional[bool] = None,
        output_hidden_states: Optional[bool] = None,
        return_dict: Optional[bool] = None,
    ) -> Union[Tuple[torch.Tensor], MaskedLMOutput]:
        r"""
        labels (`torch.LongTensor` of shape `(batch_size, sequence_length)`, *optional*):
            Labels for computing the masked language modeling loss. Indices should be in `[-100, 0, ...,
            config.vocab_size]` (see `input_ids` docstring) Tokens with indices set to `-100` are ignored (masked), the
            loss is only computed for the tokens with labels in `[0, ..., config.vocab_size]`
        """
        
        return_dict = return_dict if return_dict is not None else self.config.use_return_dict

        outputs = self.bert(
            input_ids,
            attention_mask=attention_mask,
            token_type_ids=token_type_ids,
            position_ids=position_ids,
            head_mask=head_mask,
            inputs_embeds=inputs_embeds,
            encoder_hidden_states=encoder_hidden_states,
            encoder_attention_mask=encoder_attention_mask,
            output_attentions=output_attentions,
            output_hidden_states=output_hidden_states,
            return_dict=return_dict,
        )

        sequence_output = outputs[0]
        prediction_scores = self.cls(sequence_output)
        masked_lm_loss = None # 掩码语言损失
        # 计算掩码语言损失
        if labels is not None:
            loss_fct = CrossEntropyLoss()  # -100 index = padding token
            masked_lm_loss = loss_fct(prediction_scores.view(-1, self.config.vocab_size), labels.view(-1))

        if not return_dict:
            output = (prediction_scores,) + outputs[2:]
            return ((masked_lm_loss,) + output) if masked_lm_loss is not None else output

        return MaskedLMOutput(
            loss=masked_lm_loss,
            logits=prediction_scores,
            hidden_states=outputs.hidden_states,
            attentions=outputs.attentions,
        )
    # 为生成准备输入
    def prepare_inputs_for_generation(self, input_ids, attention_mask=None, **model_kwargs):
        input_shape = input_ids.shape 
        effective_batch_size = input_shape[0] # 有效批次大小
        #  add a dummy token
        if self.config.pad_token_id is None:
            raise ValueError("The PAD token should be defined for generation")
        attention_mask = torch.cat([attention_mask, attention_mask.new_zeros((attention_mask.shape[0], 1))], dim=-1)
        dummy_token = torch.full(
            (effective_batch_size, 1), self.config.pad_token_id, dtype=torch.long, device=input_ids.device
        )
        input_ids = torch.cat([input_ids, dummy_token], dim=1)
        return {"input_ids": input_ids, "attention_mask": attention_mask}
    @classmethod # 类方法
    def can_generate(cls) -> bool:
        """
        Legacy correction: BertForMaskedLM can't call `generate()` from GenerationMixin.
        Remove after v4.50, when we stop making `PreTrainedModel` inherit from `GenerationMixin`.
        """
        return False

In [None]:
help(BertForMaskedLM.forward)

In [92]:
from transformers import AutoTokenizer, BertForMaskedLM

In [94]:
tokenizer = AutoTokenizer.from_pretrained("google-bert/bert-base-uncased")
model = BertForMaskedLM.from_pretrained("google-bert/bert-base-uncased")

Some weights of the model checkpoint at google-bert/bert-base-uncased were not used when initializing BertForMaskedLM: ['bert.pooler.dense.bias', 'bert.pooler.dense.weight', 'cls.seq_relationship.bias', 'cls.seq_relationship.weight']
- This IS expected if you are initializing BertForMaskedLM from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing BertForMaskedLM from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).


In [95]:
inputs = tokenizer("The capital of France is [MASK].", return_tensors="pt")

In [96]:
inputs

{'input_ids': tensor([[ 101, 1996, 3007, 1997, 2605, 2003,  103, 1012,  102]]), 'token_type_ids': tensor([[0, 0, 0, 0, 0, 0, 0, 0, 0]]), 'attention_mask': tensor([[1, 1, 1, 1, 1, 1, 1, 1, 1]])}

In [97]:
with torch.no_grad():
    logits = model(**inputs).logits

In [98]:
logits.shape

torch.Size([1, 9, 30522])

In [99]:
# retrieve index of [MASK]
mask_token_index = (inputs.input_ids == tokenizer.mask_token_id)[0].nonzero(as_tuple=True)[0]

In [100]:
mask_token_index

tensor([6])

In [101]:
predicted_token_id = logits[0, mask_token_index].argmax(axis=-1)

In [106]:
logits[0, [0]].shape

torch.Size([1, 30522])

In [107]:
predicted_token_id

tensor([3000])

In [108]:
tokenizer.decode(predicted_token_id)

'paris'

In [109]:
labels = tokenizer("The capital of France is Paris.", return_tensors="pt")["input_ids"]

In [110]:
labels 

tensor([[ 101, 1996, 3007, 1997, 2605, 2003, 3000, 1012,  102]])

In [111]:
# mask labels of non-[MASK] tokens
labels = torch.where(inputs.input_ids == tokenizer.mask_token_id, labels, -100)

In [112]:
labels 

tensor([[-100, -100, -100, -100, -100, -100, 3000, -100, -100]])

In [113]:
outputs = model(**inputs, labels=labels)

In [114]:
round(outputs.loss.item(), 2)

0.88

In [115]:
@add_start_docstrings(
    """Bert Model with a `next sentence prediction (classification)` head on top.""",
    BERT_START_DOCSTRING,
)
class BertForNextSentencePrediction(BertPreTrainedModel):
    def __init__(self, config):
        super().__init__(config)
        self.bert = BertModel(config)
        self.cls = BertOnlyNSPHead(config)
        # Initialize weights and apply final processing
        self.post_init()
    @add_start_docstrings_to_model_forward(BERT_INPUTS_DOCSTRING.format("batch_size, sequence_length"))
    @replace_return_docstrings(output_type=NextSentencePredictorOutput, config_class=_CONFIG_FOR_DOC)
    def forward(
        self,
        input_ids: Optional[torch.Tensor] = None,
        attention_mask: Optional[torch.Tensor] = None,
        token_type_ids: Optional[torch.Tensor] = None,
        position_ids: Optional[torch.Tensor] = None,
        head_mask: Optional[torch.Tensor] = None,
        inputs_embeds: Optional[torch.Tensor] = None,
        labels: Optional[torch.Tensor] = None,
        output_attentions: Optional[bool] = None,
        output_hidden_states: Optional[bool] = None,
        return_dict: Optional[bool] = None,
        **kwargs,
    ) -> Union[Tuple[torch.Tensor], NextSentencePredictorOutput]:
        r"""
        labels (`torch.LongTensor` of shape `(batch_size,)`, *optional*):
            Labels for computing the next sequence prediction (classification) loss. Input should be a sequence pair
            (see `input_ids` docstring). Indices should be in `[0, 1]`:

            - 0 indicates sequence B is a continuation of sequence A,
            - 1 indicates sequence B is a random sequence.

        Returns:

        Example:

        ```python
        >>> from transformers import AutoTokenizer, BertForNextSentencePrediction
        >>> import torch

        >>> tokenizer = AutoTokenizer.from_pretrained("google-bert/bert-base-uncased")
        >>> model = BertForNextSentencePrediction.from_pretrained("google-bert/bert-base-uncased")

        >>> prompt = "In Italy, pizza served in formal settings, such as at a restaurant, is presented unsliced."
        >>> next_sentence = "The sky is blue due to the shorter wavelength of blue light."
        >>> encoding = tokenizer(prompt, next_sentence, return_tensors="pt")

        >>> outputs = model(**encoding, labels=torch.LongTensor([1]))
        >>> logits = outputs.logits
        >>> assert logits[0, 0] < logits[0, 1]  # next sentence was random
        ```
        """
        if "next_sentence_label" in kwargs:
            warnings.warn(
                "The `next_sentence_label` argument is deprecated and will be removed in a future version, use"
                " `labels` instead.",
                FutureWarning,
            )
            labels = kwargs.pop("next_sentence_label")

        return_dict = return_dict if return_dict is not None else self.config.use_return_dict
        outputs = self.bert(
            input_ids,
            attention_mask=attention_mask,
            token_type_ids=token_type_ids,
            position_ids=position_ids,
            head_mask=head_mask,
            inputs_embeds=inputs_embeds,
            output_attentions=output_attentions,
            output_hidden_states=output_hidden_states,
            return_dict=return_dict,
        )
        pooled_output = outputs[1] # 池化输出
        seq_relationship_scores = self.cls(pooled_output) # (b,2)
        next_sentence_loss = None  
        if labels is not None:
            loss_fct = CrossEntropyLoss()
            next_sentence_loss = loss_fct(seq_relationship_scores.view(-1, 2), labels.view(-1))

        if not return_dict:
            output = (seq_relationship_scores,) + outputs[2:]
            return ((next_sentence_loss,) + output) if next_sentence_loss is not None else output

        return NextSentencePredictorOutput(
            loss=next_sentence_loss,
            logits=seq_relationship_scores,
            hidden_states=outputs.hidden_states,
            attentions=outputs.attentions,
        )

In [None]:
help(BertForNextSentencePrediction.forward)

In [117]:
from transformers import AutoTokenizer, BertForNextSentencePrediction # 下一句预测

In [118]:
tokenizer = AutoTokenizer.from_pretrained("google-bert/bert-base-uncased")
model = BertForNextSentencePrediction.from_pretrained("google-bert/bert-base-uncased")

In [119]:
prompt = "In Italy, pizza served in formal settings, such as at a restaurant, is presented unsliced."

In [120]:
next_sentence = "The sky is blue due to the shorter wavelength of blue light."
encoding = tokenizer(prompt, next_sentence, return_tensors="pt")

In [121]:
encoding

{'input_ids': tensor([[  101,  1999,  3304,  1010, 10733,  2366,  1999,  5337, 10906,  1010,
          2107,  2004,  2012,  1037,  4825,  1010,  2003,  3591,  4895, 14540,
          6610,  2094,  1012,   102,  1996,  3712,  2003,  2630,  2349,  2000,
          1996,  7820, 19934,  1997,  2630,  2422,  1012,   102]]), 'token_type_ids': tensor([[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
         1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]]), 'attention_mask': tensor([[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
         1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]])}

In [122]:
outputs = model(**encoding, labels=torch.LongTensor([1]))
logits = outputs.logits

In [123]:
logits.shape

torch.Size([1, 2])

In [124]:
logits[0, 0]

tensor(-3.0729, grad_fn=<SelectBackward0>)

In [125]:
assert logits[0, 0] < logits[0, 1]  # next sentence was random

In [126]:
# 添加起始文档字符串
@add_start_docstrings(
    """
    Bert Model transformer with a sequence classification/regression head on top (a linear layer on top of the pooled
    output) e.g. for GLUE tasks.
    """,
    BERT_START_DOCSTRING,
)
class BertForSequenceClassification(BertPreTrainedModel):  # 序列分类
    def __init__(self, config):
        super().__init__(config)
        self.num_labels = config.num_labels
        self.config = config
        self.bert = BertModel(config)
        classifier_dropout = ( #  分类器dropout率
            config.classifier_dropout if config.classifier_dropout is not None else config.hidden_dropout_prob
        )
        self.dropout = nn.Dropout(classifier_dropout)
        self.classifier = nn.Linear(config.hidden_size, config.num_labels) # 几分类
        # Initialize weights and apply final processing
        self.post_init()

    @add_start_docstrings_to_model_forward(BERT_INPUTS_DOCSTRING.format("batch_size, sequence_length"))
    @add_code_sample_docstrings(
        checkpoint=_CHECKPOINT_FOR_SEQUENCE_CLASSIFICATION,
        output_type=SequenceClassifierOutput,
        config_class=_CONFIG_FOR_DOC,
        expected_output=_SEQ_CLASS_EXPECTED_OUTPUT,
        expected_loss=_SEQ_CLASS_EXPECTED_LOSS,
    )
    def forward(
        self,
        input_ids: Optional[torch.Tensor] = None,
        attention_mask: Optional[torch.Tensor] = None,
        token_type_ids: Optional[torch.Tensor] = None,
        position_ids: Optional[torch.Tensor] = None,
        head_mask: Optional[torch.Tensor] = None,
        inputs_embeds: Optional[torch.Tensor] = None,
        labels: Optional[torch.Tensor] = None,
        output_attentions: Optional[bool] = None,
        output_hidden_states: Optional[bool] = None,
        return_dict: Optional[bool] = None,
    ) -> Union[Tuple[torch.Tensor], SequenceClassifierOutput]:
        r"""
        labels (`torch.LongTensor` of shape `(batch_size,)`, *optional*):
            Labels for computing the sequence classification/regression loss. Indices should be in `[0, ...,
            config.num_labels - 1]`. If `config.num_labels == 1` a regression loss is computed (Mean-Square loss), If
            `config.num_labels > 1` a classification loss is computed (Cross-Entropy).
        """
        # 是否返回字典
        return_dict = return_dict if return_dict is not None else self.config.use_return_dict
        outputs = self.bert( 
            input_ids,
            attention_mask=attention_mask,
            token_type_ids=token_type_ids,
            position_ids=position_ids,
            head_mask=head_mask,
            inputs_embeds=inputs_embeds,
            output_attentions=output_attentions,
            output_hidden_states=output_hidden_states,
            return_dict=return_dict,
        )

        pooled_output = outputs[1] # 池化输出
        pooled_output = self.dropout(pooled_output)
        logits = self.classifier(pooled_output) # 分类

        loss = None # 设定分类损失
        if labels is not None: # 如果标签存在
            if self.config.problem_type is None: # 如果配置中配置了问题类型
                if self.num_labels == 1: # 如果num_labels == 1
                    self.config.problem_type = "regression" # 回归
                # num_labels > 1,单标签分类
                elif self.num_labels > 1 and (labels.dtype == torch.long or labels.dtype == torch.int):
                    self.config.problem_type = "single_label_classification"
                else: # 如果标签类型不是整数,多标签分类
                    self.config.problem_type = "multi_label_classification"
            # 如果是回归类型
            if self.config.problem_type == "regression":
                loss_fct = MSELoss() 
                if self.num_labels == 1:
                    loss = loss_fct(logits.squeeze(), labels.squeeze())
                else:
                    loss = loss_fct(logits, labels)
            # 单标签分类
            elif self.config.problem_type == "single_label_classification":
                loss_fct = CrossEntropyLoss()
                loss = loss_fct(logits.view(-1, self.num_labels), labels.view(-1))
            elif self.config.problem_type == "multi_label_classification": # 多标签分类
                loss_fct = BCEWithLogitsLoss()
                loss = loss_fct(logits, labels)
        if not return_dict: 
            output = (logits,) + outputs[2:]
            return ((loss,) + output) if loss is not None else output
        # 序列分类器输出
        return SequenceClassifierOutput(
            loss=loss,
            logits=logits,
            hidden_states=outputs.hidden_states,
            attentions=outputs.attentions,
        )

In [None]:
help(BertForSequenceClassification.forward)

In [128]:
from transformers import AutoTokenizer, BertForSequenceClassification
tokenizer = AutoTokenizer.from_pretrained("textattack/bert-base-uncased-yelp-polarity")
model = BertForSequenceClassification.from_pretrained("textattack/bert-base-uncased-yelp-polarity")

tokenizer_config.json:   0%|          | 0.00/48.0 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/520 [00:00<?, ?B/s]

vocab.txt:   0%|          | 0.00/232k [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/112 [00:00<?, ?B/s]

pytorch_model.bin:   0%|          | 0.00/438M [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/438M [00:00<?, ?B/s]

In [129]:
inputs = tokenizer("Hello, my dog is cute", return_tensors="pt")

In [130]:
with torch.no_grad():
    logits = model(**inputs).logits

In [131]:
predicted_class_id = logits.argmax().item()

In [132]:
predicted_class_id

1

In [133]:
model.config.id2label[predicted_class_id]

'LABEL_1'

In [134]:
# To train a model on `num_labels` classes, you can pass `num_labels=num_labels` to `.from_pretrained(...)`
num_labels = len(model.config.id2label)
num_labels

2

In [135]:
model = BertForSequenceClassification.from_pretrained(
    "textattack/bert-base-uncased-yelp-polarity", num_labels=num_labels)
labels = torch.tensor([1])
loss = model(**inputs, labels=labels).loss
round(loss.item(), 2)

0.01

In [None]:
 
    # >>> tokenizer = AutoTokenizer.from_pretrained("textattack/bert-base-uncased-yelp-polarity")
    # >>> model = BertForSequenceClassification.from_pretrained("textattack/bert-base-uncased-yelp-polarity", problem_type="multi_label_classification")
    
    # >>> inputs = tokenizer("Hello, my dog is cute", return_tensors="pt")
    
    # >>> with torch.no_grad():
    # ...     logits = model(**inputs).logits
    
    # >>> predicted_class_ids = torch.arange(0, logits.shape[-1])[torch.sigmoid(logits).squeeze(dim=0) > 0.5]
    
    # >>> # To train a model on `num_labels` classes, you can pass `num_labels=num_labels` to `.from_pretrained(...)`
    # >>> num_labels = len(model.config.id2label)
    # >>> model = BertForSequenceClassification.from_pretrained(
    # ...     "textattack/bert-base-uncased-yelp-polarity", num_labels=num_labels, problem_type="multi_label_classification"
    # ... )
    
    # >>> labels = torch.sum(
    # ...     torch.nn.functional.one_hot(predicted_class_ids[None, :].clone(), num_classes=num_labels), dim=1
    # ... ).to(torch.float)
    # >>> loss = model(**inputs, labels=labels).loss
    # ```

In [136]:
@add_start_docstrings(
    """
    Bert Model with a multiple choice classification head on top (a linear layer on top of the pooled output and a
    softmax) e.g. for RocStories/SWAG tasks.
    """,
    BERT_START_DOCSTRING,
)
class BertForMultipleChoice(BertPreTrainedModel): # 多选任务
    def __init__(self, config):
        super().__init__(config)
        self.bert = BertModel(config)
        classifier_dropout = (
            config.classifier_dropout if config.classifier_dropout is not None else config.hidden_dropout_prob
        )
        self.dropout = nn.Dropout(classifier_dropout)
        self.classifier = nn.Linear(config.hidden_size, 1)
        # Initialize weights and apply final processing
        self.post_init()
    @add_start_docstrings_to_model_forward(BERT_INPUTS_DOCSTRING.format("batch_size, num_choices, sequence_length"))
    @add_code_sample_docstrings(
        checkpoint=_CHECKPOINT_FOR_DOC,
        output_type=MultipleChoiceModelOutput,
        config_class=_CONFIG_FOR_DOC,
    )
    def forward(
        self,
        input_ids: Optional[torch.Tensor] = None,
        attention_mask: Optional[torch.Tensor] = None,
        token_type_ids: Optional[torch.Tensor] = None,
        position_ids: Optional[torch.Tensor] = None,
        head_mask: Optional[torch.Tensor] = None,
        inputs_embeds: Optional[torch.Tensor] = None,
        labels: Optional[torch.Tensor] = None,
        output_attentions: Optional[bool] = None,
        output_hidden_states: Optional[bool] = None,
        return_dict: Optional[bool] = None,
    ) -> Union[Tuple[torch.Tensor], MultipleChoiceModelOutput]:
        r"""
        labels (`torch.LongTensor` of shape `(batch_size,)`, *optional*):
            Labels for computing the multiple choice classification loss. Indices should be in `[0, ...,
            num_choices-1]` where `num_choices` is the size of the second dimension of the input tensors. (See
            `input_ids` above)
        """
        # 是否返回字典形式
        return_dict = return_dict if return_dict is not None else self.config.use_return_dict
        num_choices = input_ids.shape[1] if input_ids is not None else inputs_embeds.shape[1] # 选项大小
        # -->(b*num_choices,s)
        input_ids = input_ids.view(-1, input_ids.size(-1)) if input_ids is not None else None
        attention_mask = attention_mask.view(-1, attention_mask.size(-1)) if attention_mask is not None else None
        token_type_ids = token_type_ids.view(-1, token_type_ids.size(-1)) if token_type_ids is not None else None
        position_ids = position_ids.view(-1, position_ids.size(-1)) if position_ids is not None else None
        inputs_embeds = (
            inputs_embeds.view(-1, inputs_embeds.size(-2), inputs_embeds.size(-1))
            if inputs_embeds is not None
            else None
        )

        outputs = self.bert( 
            input_ids,
            attention_mask=attention_mask,
            token_type_ids=token_type_ids,
            position_ids=position_ids,
            head_mask=head_mask,
            inputs_embeds=inputs_embeds,
            output_attentions=output_attentions,
            output_hidden_states=output_hidden_states,
            return_dict=return_dict,
        )

        pooled_output = outputs[1] # 池化输出
        pooled_output = self.dropout(pooled_output)
        logits = self.classifier(pooled_output) #(b*num_choices,1)
        reshaped_logits = logits.view(-1, num_choices)
        loss = None 
        if labels is not None: # 如果没传入标签
            loss_fct = CrossEntropyLoss() # 交叉熵损失
            loss = loss_fct(reshaped_logits, labels)

        if not return_dict:
            output = (reshaped_logits,) + outputs[2:]
            return ((loss,) + output) if loss is not None else output

        return MultipleChoiceModelOutput(
            loss=loss,
            logits=reshaped_logits,
            hidden_states=outputs.hidden_states,
            attentions=outputs.attentions,
        )

In [None]:
help(BertForMultipleChoice.forward)

In [138]:
from transformers import AutoTokenizer, BertForMultipleChoice

In [139]:
tokenizer = AutoTokenizer.from_pretrained("google-bert/bert-base-uncased")
model = BertForMultipleChoice.from_pretrained("google-bert/bert-base-uncased")

Some weights of BertForMultipleChoice were not initialized from the model checkpoint at google-bert/bert-base-uncased and are newly initialized: ['classifier.bias', 'classifier.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


In [140]:
prompt = "In Italy, pizza served in formal settings, such as at a restaurant, is presented unsliced."
choice0 = "It is eaten with a fork and a knife."
choice1 = "It is eaten while held in the hand."
labels = torch.tensor(0).unsqueeze(0)  

In [144]:
labels

tensor([0])

In [141]:
labels.shape

torch.Size([1])

In [145]:
# choice0 is correct (according to Wikipedia ;)), batch size 1
encoding = tokenizer([prompt, prompt], [choice0, choice1], return_tensors="pt", padding=True)

In [147]:
encoding['input_ids'].shape

torch.Size([2, 35])

In [150]:
aa={k: v.unsqueeze(0) for k, v in encoding.items()}
aa

{'input_ids': tensor([[[  101,  1999,  3304,  1010, 10733,  2366,  1999,  5337, 10906,  1010,
            2107,  2004,  2012,  1037,  4825,  1010,  2003,  3591,  4895, 14540,
            6610,  2094,  1012,   102,  2009,  2003,  8828,  2007,  1037,  9292,
            1998,  1037,  5442,  1012,   102],
          [  101,  1999,  3304,  1010, 10733,  2366,  1999,  5337, 10906,  1010,
            2107,  2004,  2012,  1037,  4825,  1010,  2003,  3591,  4895, 14540,
            6610,  2094,  1012,   102,  2009,  2003,  8828,  2096,  2218,  1999,
            1996,  2192,  1012,   102,     0]]]),
 'token_type_ids': tensor([[[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
           0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1],
          [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
           0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0]]]),
 'attention_mask': tensor([[[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
           1, 1, 1, 1, 1, 1, 

In [155]:
aa['input_ids'].shape 

torch.Size([1, 2, 35])

In [156]:
outputs = model(**aa, labels=labels)  # batch size is 1

In [158]:
outputs.logits

tensor([[-0.0077, -0.0309]], grad_fn=<ViewBackward0>)

In [160]:
print(outputs.logits.shape,outputs.loss)

torch.Size([1, 2]) tensor(0.6816, grad_fn=<NllLossBackward0>)


In [161]:
@add_start_docstrings(
    """
    Bert Model with a token classification head on top (a linear layer on top of the hidden-states output) e.g. for
    Named-Entity-Recognition (NER) tasks.
    """,
    BERT_START_DOCSTRING,
)
class BertForTokenClassification(BertPreTrainedModel): # Token分类
    def __init__(self, config):
        super().__init__(config)
        self.num_labels = config.num_labels
        self.bert = BertModel(config, add_pooling_layer=False) # 不带池化输出
        classifier_dropout = (
            config.classifier_dropout if config.classifier_dropout is not None else config.hidden_dropout_prob
        )
        self.dropout = nn.Dropout(classifier_dropout)
        self.classifier = nn.Linear(config.hidden_size, config.num_labels)
        # Initialize weights and apply final processing
        self.post_init()

    @add_start_docstrings_to_model_forward(BERT_INPUTS_DOCSTRING.format("batch_size, sequence_length"))
    @add_code_sample_docstrings(
        checkpoint=_CHECKPOINT_FOR_TOKEN_CLASSIFICATION,
        output_type=TokenClassifierOutput,
        config_class=_CONFIG_FOR_DOC,
        expected_output=_TOKEN_CLASS_EXPECTED_OUTPUT,
        expected_loss=_TOKEN_CLASS_EXPECTED_LOSS,
    )
    def forward(
        self,
        input_ids: Optional[torch.Tensor] = None,
        attention_mask: Optional[torch.Tensor] = None,
        token_type_ids: Optional[torch.Tensor] = None,
        position_ids: Optional[torch.Tensor] = None,
        head_mask: Optional[torch.Tensor] = None,
        inputs_embeds: Optional[torch.Tensor] = None,
        labels: Optional[torch.Tensor] = None,
        output_attentions: Optional[bool] = None,
        output_hidden_states: Optional[bool] = None,
        return_dict: Optional[bool] = None,
    ) -> Union[Tuple[torch.Tensor], TokenClassifierOutput]:
        r"""
        labels (`torch.LongTensor` of shape `(batch_size, sequence_length)`, *optional*):
            Labels for computing the token classification loss. Indices should be in `[0, ..., config.num_labels - 1]`.
        """
        return_dict = return_dict if return_dict is not None else self.config.use_return_dict

        outputs = self.bert( 
            input_ids,
            attention_mask=attention_mask,
            token_type_ids=token_type_ids,
            position_ids=position_ids,
            head_mask=head_mask,
            inputs_embeds=inputs_embeds,
            output_attentions=output_attentions,
            output_hidden_states=output_hidden_states,
            return_dict=return_dict,
        )

        sequence_output = outputs[0] # (b,s,d)
        sequence_output = self.dropout(sequence_output)
        logits = self.classifier(sequence_output) # (b,s,num_labels)

        loss = None
        if labels is not None: # 如果传入了标签
            loss_fct = CrossEntropyLoss() 
            loss = loss_fct(logits.view(-1, self.num_labels), labels.view(-1))
        if not return_dict:
            output = (logits,) + outputs[2:]
            return ((loss,) + output) if loss is not None else output

        return TokenClassifierOutput(
            loss=loss,
            logits=logits,
            hidden_states=outputs.hidden_states,
            attentions=outputs.attentions,
        )

In [None]:
help(BertForTokenClassification.forward)

In [163]:
from transformers import AutoTokenizer, BertForTokenClassification

In [164]:
tokenizer = AutoTokenizer.from_pretrained("dbmdz/bert-large-cased-finetuned-conll03-english")
model = BertForTokenClassification.from_pretrained("dbmdz/bert-large-cased-finetuned-conll03-english")

tokenizer_config.json:   0%|          | 0.00/60.0 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/998 [00:00<?, ?B/s]

vocab.txt:   0%|          | 0.00/213k [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/1.33G [00:00<?, ?B/s]

Some weights of the model checkpoint at dbmdz/bert-large-cased-finetuned-conll03-english were not used when initializing BertForTokenClassification: ['bert.pooler.dense.bias', 'bert.pooler.dense.weight']
- This IS expected if you are initializing BertForTokenClassification from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing BertForTokenClassification from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).


In [165]:
# add_special_tokens=False不添加特殊token
inputs = tokenizer(
   "HuggingFace is a company based in Paris and New York", add_special_tokens=False, return_tensors="pt"
 )

In [166]:
inputs

{'input_ids': tensor([[20164, 10932,  2271,  7954,  1110,   170,  1419,  1359,  1107,  2123,
          1105,  1203,  1365]]), 'token_type_ids': tensor([[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]]), 'attention_mask': tensor([[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]])}

In [167]:
with torch.no_grad():
    logits = model(**inputs).logits

In [168]:
logits.shape

torch.Size([1, 13, 9])

In [169]:
predicted_token_class_ids = logits.argmax(-1)

In [174]:
predicted_token_class_ids

tensor([[0, 6, 6, 6, 0, 0, 0, 0, 0, 8, 0, 8, 8]])

In [171]:
# 注意，对 token 进行分类，而不是对输入词进行分类，这意味着预测的标记类别可能比单词多。多个 token可能代表同一个词
predicted_tokens_classes = [model.config.id2label[t.item()] for t in predicted_token_class_ids[0]]
predicted_tokens_classes

['O',
 'I-ORG',
 'I-ORG',
 'I-ORG',
 'O',
 'O',
 'O',
 'O',
 'O',
 'I-LOC',
 'O',
 'I-LOC',
 'I-LOC']

In [172]:
labels = predicted_token_class_ids

In [175]:
loss = model(**inputs, labels=labels).loss

In [176]:
round(loss.item(), 2)

0.01

In [177]:
config.num_labels

2

In [186]:
aaa=torch.randn((2,3,2))
aaa

tensor([[[ 1.6488,  1.5183],
         [ 0.6272, -1.4035],
         [-0.3378,  2.1239]],

        [[-0.3483, -0.5490],
         [ 0.0897, -0.4275],
         [-0.5186, -0.3892]]])

In [187]:
len(aaa.split(2, dim=-1))

1

In [189]:
len(aaa.split(1, dim=-1))

2

In [191]:
aaa.split(1, dim=-1)[0].shape

torch.Size([2, 3, 1])

In [192]:
aaa.split(1, dim=-1)

(tensor([[[ 1.6488],
          [ 0.6272],
          [-0.3378]],
 
         [[-0.3483],
          [ 0.0897],
          [-0.5186]]]),
 tensor([[[ 1.5183],
          [-1.4035],
          [ 2.1239]],
 
         [[-0.5490],
          [-0.4275],
          [-0.3892]]]))

In [193]:
@add_start_docstrings(
    """
    Bert Model with a span classification head on top for extractive question-answering tasks like SQuAD (a linear
    layers on top of the hidden-states output to compute `span start logits` and `span end logits`).
    """,
    BERT_START_DOCSTRING,
)
class BertForQuestionAnswering(BertPreTrainedModel):
    def __init__(self, config):
        super().__init__(config)
        self.num_labels = config.num_labels 
        self.bert = BertModel(config, add_pooling_layer=False) # 不带池化层
        self.qa_outputs = nn.Linear(config.hidden_size, config.num_labels)
        # 初始化权重并应用最终处理
        self.post_init()

    @add_start_docstrings_to_model_forward(BERT_INPUTS_DOCSTRING.format("batch_size, sequence_length"))
    @add_code_sample_docstrings(
        checkpoint=_CHECKPOINT_FOR_QA,
        output_type=QuestionAnsweringModelOutput,
        config_class=_CONFIG_FOR_DOC,
        qa_target_start_index=_QA_TARGET_START_INDEX,
        qa_target_end_index=_QA_TARGET_END_INDEX,
        expected_output=_QA_EXPECTED_OUTPUT,
        expected_loss=_QA_EXPECTED_LOSS,
    )
    def forward(
        self,
        input_ids: Optional[torch.Tensor] = None,
        attention_mask: Optional[torch.Tensor] = None,
        token_type_ids: Optional[torch.Tensor] = None,
        position_ids: Optional[torch.Tensor] = None,
        head_mask: Optional[torch.Tensor] = None,
        inputs_embeds: Optional[torch.Tensor] = None,
        start_positions: Optional[torch.Tensor] = None,
        end_positions: Optional[torch.Tensor] = None,
        output_attentions: Optional[bool] = None,
        output_hidden_states: Optional[bool] = None,
        return_dict: Optional[bool] = None,
    ) -> Union[Tuple[torch.Tensor], QuestionAnsweringModelOutput]:
        r"""
        start_positions (`torch.LongTensor` of shape `(batch_size,)`, *optional*):
            Labels for position (index) of the start of the labelled span for computing the token classification loss.
            Positions are clamped to the length of the sequence (`sequence_length`). Position outside of the sequence
            are not taken into account for computing the loss.
        end_positions (`torch.LongTensor` of shape `(batch_size,)`, *optional*):
            Labels for position (index) of the end of the labelled span for computing the token classification loss.
            Positions are clamped to the length of the sequence (`sequence_length`). Position outside of the sequence
            are not taken into account for computing the loss.
        """
        return_dict = return_dict if return_dict is not None else self.config.use_return_dict

        outputs = self.bert(
            input_ids,
            attention_mask=attention_mask,
            token_type_ids=token_type_ids,
            position_ids=position_ids,
            head_mask=head_mask,
            inputs_embeds=inputs_embeds,
            output_attentions=output_attentions,
            output_hidden_states=output_hidden_states,
            return_dict=return_dict,
        )

        sequence_output = outputs[0] # (b,s,d)
        logits = self.qa_outputs(sequence_output) # (b,s,2)
        start_logits, end_logits = logits.split(1, dim=-1) # (b,s,1)
        start_logits = start_logits.squeeze(-1).contiguous() 
        end_logits = end_logits.squeeze(-1).contiguous()
        total_loss = None
        # 如果存在起始和结束位置
        if start_positions is not None and end_positions is not None:
            # 如果我们使用多 GPU，则拆分添加一个维度
            if len(start_positions.size()) > 1:
                start_positions = start_positions.squeeze(-1)
            if len(end_positions.size()) > 1:
                end_positions = end_positions.squeeze(-1)
            ignored_index = start_logits.size(1)
            # 限制到一定范围
            start_positions = start_positions.clamp(0, ignored_index)
            end_positions = end_positions.clamp(0, ignored_index)
            # 计算损失
            loss_fct = CrossEntropyLoss(ignore_index=ignored_index)
            start_loss = loss_fct(start_logits, start_positions)
            end_loss = loss_fct(end_logits, end_positions)
            total_loss = (start_loss + end_loss) / 2

        if not return_dict:
            output = (start_logits, end_logits) + outputs[2:]
            return ((total_loss,) + output) if total_loss is not None else output

        return QuestionAnsweringModelOutput(
            loss=total_loss,
            start_logits=start_logits,
            end_logits=end_logits,
            hidden_states=outputs.hidden_states,
            attentions=outputs.attentions,
        )

In [None]:
help(BertForQuestionAnswering.forward)

In [195]:
from transformers import AutoTokenizer, BertForQuestionAnswering

In [196]:
tokenizer = AutoTokenizer.from_pretrained("deepset/bert-base-cased-squad2")
model = BertForQuestionAnswering.from_pretrained("deepset/bert-base-cased-squad2")

tokenizer_config.json:   0%|          | 0.00/152 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/508 [00:00<?, ?B/s]

vocab.txt:   0%|          | 0.00/213k [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/112 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/433M [00:00<?, ?B/s]

Some weights of the model checkpoint at deepset/bert-base-cased-squad2 were not used when initializing BertForQuestionAnswering: ['bert.pooler.dense.bias', 'bert.pooler.dense.weight']
- This IS expected if you are initializing BertForQuestionAnswering from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing BertForQuestionAnswering from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).


In [197]:
question, text = "Who was Jim Henson?", "Jim Henson was a nice puppet"

In [198]:
inputs = tokenizer(question, text, return_tensors="pt")

In [200]:
inputs

{'input_ids': tensor([[  101,  2627,  1108,  3104,  1124, 15703,   136,   102,  3104,  1124,
         15703,  1108,   170,  3505, 16797,   102]]), 'token_type_ids': tensor([[0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1]]), 'attention_mask': tensor([[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]])}

In [199]:
with torch.no_grad():
    outputs = model(**inputs)

In [201]:
answer_start_index = outputs.start_logits.argmax()
answer_end_index = outputs.end_logits.argmax()

In [203]:
print(answer_start_index,answer_end_index)

tensor(12) tensor(14)


In [204]:
predict_answer_tokens = inputs.input_ids[0, answer_start_index : answer_end_index + 1]
tokenizer.decode(predict_answer_tokens, skip_special_tokens=True)

'a nice puppet'

In [206]:
# target is "nice puppet"
target_start_index = torch.tensor([14])
target_end_index = torch.tensor([15])
outputs = model(**inputs, start_positions=target_start_index, end_positions=target_end_index)
loss = outputs.loss
round(loss.item(), 2)

7.41

In [None]:
__all__ = [
    "BertForMaskedLM", # 掩码语言模型
    "BertForMultipleChoice", # 多选
    "BertForNextSentencePrediction", # nsp预测
    "BertForPreTraining",
    "BertForQuestionAnswering", # 问答
    "BertForSequenceClassification", # 序列分类
    "BertForTokenClassification", # 标记分类
    "BertLayer", 
    "BertLMHeadModel", 
    "BertModel",
    "BertPreTrainedModel",
    "load_tf_weights_in_bert",
]