<table style="width:100%">
<tr>
<td style="vertical-align:middle; text-align:left;">
<font size="2">
Supplementary code for the <a href="http://mng.bz/orYv">Build a Large Language Model From Scratch</a> book by <a href="https://sebastianraschka.com">Sebastian Raschka</a><br>
<br>Code repository: <a href="https://github.com/rasbt/LLMs-from-scratch">https://github.com/rasbt/LLMs-from-scratch</a>
<br>汉化的库: <a href="https://github.com/GoatCsu/CN-LLMs-from-scratch.git">https://github.com/GoatCsu/CN-LLMs-from-scratch.git</a>
</font>
</td>
<td style="vertical-align:middle; text-align:left;">
<a href="http://mng.bz/orYv"><img src="https://sebastianraschka.com/images/LLMs-from-scratch-images/cover-small.webp" width="100px"></a>
</td>
</tr>
</table>


# 多头注意力在数据载入的运用

In [1]:
# NBVAL_IGNORE_OUTPUT
from importlib.metadata import version

print("torch version:", version("torch"))

torch version: 2.2.2


完整的章节代码位于[ch03.ipynb](./ch03.ipynb)。

该笔记本包含了本章的核心内容——多头注意力实现（以及第二章中的数据加载pipeline）。

## 第二章的数据载入器

In [2]:
class GPTDatasetV1(Dataset):
    def __init__(self, txt, tokenizer, max_length, stride):
        self.input_ids = []  # 输入ID列表
        self.target_ids = []  # 目标ID列表

        # 对整个文本进行分词
        token_ids = tokenizer.encode(txt, allowed_special={'<|endoftext|>'})

        # 使用滑动窗口将文本分割成重叠的最大长度序列
        for i in range(0, len(token_ids) - max_length, stride):
            input_chunk = token_ids[i:i + max_length]  # 输入片段
            target_chunk = token_ids[i + 1: i + max_length + 1]  # 目标片段（右移一个位置）
            self.input_ids.append(torch.tensor(input_chunk))  # 将输入片段转换为张量
            self.target_ids.append(torch.tensor(target_chunk))  # 将目标片段转换为张量

    def __len__(self):
        return len(self.input_ids)  # 返回数据集的大小

    def __getitem__(self, idx):
        return self.input_ids[idx], self.target_ids[idx]  # 获取特定索引的输入和目标

def create_dataloader(txt, batch_size=4, max_length=256, stride=128, shuffle=True):
    # 初始化分词器
    tokenizer = tiktoken.get_encoding("gpt2")

    # 创建数据集
    dataset = GPTDatasetV1(txt, tokenizer, max_length, stride)

    # 创建数据加载器
    dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=shuffle)

    return dataloader  # 返回数据加载器


with open("small-text-sample.txt", "r", encoding="utf-8") as f:
    raw_text = f.read()  # 读取文本文件

tokenizer = tiktoken.get_encoding("gpt2")  # 初始化分词器
encoded_text = tokenizer.encode(raw_text)  # 对文本进行编码

vocab_size = 50257  # 词汇表大小
output_dim = 256  # 输出维度
max_len = 1024  # 最大序列长度
context_length = max_len  # 上下文长度


token_embedding_layer = nn.Embedding(vocab_size, output_dim)  # 创建词嵌入层
pos_embedding_layer = torch.nn.Embedding(context_length, output_dim)  # 创建位置嵌入层

max_length = 4  # 每个输入片段的最大长度
dataloader = create_dataloader(raw_text, batch_size=8, max_length=max_length, stride=max_length)  # 创建数据加载器

In [3]:
for batch in dataloader:
    x, y = batch

    token_embeddings = token_embedding_layer(x)
    pos_embeddings = pos_embedding_layer(torch.arange(max_length))

    input_embeddings = token_embeddings + pos_embeddings

    break

In [4]:
print(input_embeddings.shape)

torch.Size([8, 4, 256])


# 第三章的多头注意力

## 一种变体

In [5]:
class CausalSelfAttention(nn.Module):
    """
    该类实现了因果自注意力机制（Causal Self Attention），
    用于自回归模型（例如GPT模型中的注意力层）。
    """

    def __init__(self, d_in, d_out, context_length, dropout, qkv_bias=False):
        """
        初始化因果自注意力层。
        
        参数：
        - d_in: 输入维度
        - d_out: 输出维度
        - context_length: 上下文长度（即注意力机制能“看到”的最大令牌数）
        - dropout: Dropout率
        - qkv_bias: 是否为查询、键和值使用偏置（默认为False）
        """
        super().__init__()
        self.d_out = d_out
        # 定义查询、键、值的线性变换
        self.W_query = nn.Linear(d_in, d_out, bias=qkv_bias)
        self.W_key   = nn.Linear(d_in, d_out, bias=qkv_bias)
        self.W_value = nn.Linear(d_in, d_out, bias=qkv_bias)
        
        # Dropout层
        self.dropout = nn.Dropout(dropout)  # 新增的Dropout层

        # 注册一个buffer，用于存储因果掩码
        self.register_buffer('mask', torch.triu(torch.ones(context_length, context_length), diagonal=1))  # 新增掩码，禁止未来的信息

    def forward(self, x):
        """
        前向传播函数计算因果自注意力输出。
        
        参数：
        - x: 输入的张量，形状为 (batch_size, num_tokens, d_in)
        
        返回：
        - context_vec: 自注意力机制的输出，形状为 (batch_size, num_tokens, d_out)
        """
        b, n_tokens, d_in = x.shape  # 获取输入张量的维度
        keys = self.W_key(x)  # 键（K）
        queries = self.W_query(x)  # 查询（Q）
        values = self.W_value(x)  # 值（V）

        # 计算注意力分数（查询和键的点积）
        attn_scores = queries @ keys.transpose(1, 2)  # 这里的转置（transpose）是为了匹配维度
        
        # 使用掩码阻止未来的tokens看到当前token
        attn_scores.masked_fill_(  # 这里的操作是原地修改
            self.mask.bool()[:n_tokens, :n_tokens], -torch.inf)  # 将掩码区域填充为负无穷

        # 计算注意力权重并进行softmax归一化
        attn_weights = torch.softmax(attn_scores / keys.shape[-1]**0.5, dim=-1)
        attn_weights = self.dropout(attn_weights)  # 使用Dropout层

        # 计算上下文向量（加权和）
        context_vec = attn_weights @ values
        return context_vec


class MultiHeadAttentionWrapper(nn.Module):
    """
    该类实现了多头注意力机制（Multi-Head Attention）包装器。
    它包含多个因果自注意力头，并在输出时将多个头的结果合并。
    """

    def __init__(self, d_in, d_out, context_length, dropout, num_heads, qkv_bias=False):
        """
        初始化多头注意力层。
        
        参数：
        - d_in: 输入维度
        - d_out: 输出维度
        - context_length: 上下文长度
        - dropout: Dropout率
        - num_heads: 注意力头的数量
        - qkv_bias: 是否为查询、键和值使用偏置（默认为False）
        """
        super().__init__()
        # 定义多个因果自注意力头
        self.heads = nn.ModuleList(
            [CausalSelfAttention(d_in, d_out, context_length, dropout, qkv_bias) 
             for _ in range(num_heads)]  # 为每个头创建一个CausalSelfAttention实例
        )
        
        # 定义最终的线性变换，用于将多个头的输出合并
        self.out_proj = nn.Linear(d_out * num_heads, d_out * num_heads)

    def forward(self, x):
        """
        前向传播函数，计算多头注意力输出。
        
        参数：
        - x: 输入的张量，形状为 (batch_size, num_tokens, d_in)
        
        返回：
        - out: 多头注意力的输出，形状为 (batch_size, num_tokens, d_out * num_heads)
        """
        # 将多个头的输出拼接在一起
        context_vec = torch.cat([head(x) for head in self.heads], dim=-1)
        
        # 通过线性变换得到最终输出
        return self.out_proj(context_vec)

In [6]:
torch.manual_seed(123)

context_length = max_length
d_in = output_dim

num_heads=2
d_out = d_in // num_heads

mha = MultiHeadAttentionWrapper(d_in, d_out, context_length, 0.0, num_heads)

batch = input_embeddings
context_vecs = mha(batch)

print("context_vecs.shape:", context_vecs.shape)

context_vecs.shape: torch.Size([8, 4, 256])


## 另一种变体

In [7]:
class MultiHeadAttention(nn.Module):
    """
    该类实现了多头自注意力机制（Multi-Head Attention），
    用于自回归模型（如Transformer和GPT中的注意力层）。
    """

    def __init__(self, d_in, d_out, context_length, dropout, num_heads, qkv_bias=False):
        """
        初始化多头自注意力层。
        
        参数：
        - d_in: 输入维度
        - d_out: 输出维度
        - context_length: 上下文长度（即注意力机制能“看到”的最大令牌数）
        - dropout: Dropout率
        - num_heads: 注意力头的数量
        - qkv_bias: 是否为查询、键和值使用偏置（默认为False）
        """
        super().__init__()

        # 检查输出维度是否能被头数整除
        assert d_out % num_heads == 0, "d_out must be divisible by num_heads"

        self.d_out = d_out
        self.num_heads = num_heads
        self.head_dim = d_out // num_heads  # 将输出维度除以头数，得到每个头的维度

        # 定义查询、键、值的线性变换
        self.W_query = nn.Linear(d_in, d_out, bias=qkv_bias)
        self.W_key = nn.Linear(d_in, d_out, bias=qkv_bias)
        self.W_value = nn.Linear(d_in, d_out, bias=qkv_bias)

        # 定义输出的线性变换层，用于合并多个头的输出
        self.out_proj = nn.Linear(d_out, d_out)
        self.dropout = nn.Dropout(dropout)

        # 注册一个buffer，用于存储因果掩码
        self.register_buffer('mask', torch.triu(torch.ones(context_length, context_length), diagonal=1))  # 新增掩码，禁止未来的信息

    def forward(self, x):
        """
        前向传播函数，计算多头自注意力输出。
        
        参数：
        - x: 输入张量，形状为 (batch_size, num_tokens, d_in)
        
        返回：
        - context_vec: 多头自注意力的输出，形状为 (batch_size, num_tokens, d_out)
        """
        b, num_tokens, d_in = x.shape  # 获取输入张量的维度

        # 计算键、查询和值的表示
        keys = self.W_key(x)  # 键（K）
        queries = self.W_query(x)  # 查询（Q）
        values = self.W_value(x)  # 值（V）

        # 将最后的维度按头数进行拆分：
        # 将 (b, num_tokens, d_out) 转换为 (b, num_tokens, num_heads, head_dim)
        keys = keys.view(b, num_tokens, self.num_heads, self.head_dim) 
        values = values.view(b, num_tokens, self.num_heads, self.head_dim)
        queries = queries.view(b, num_tokens, self.num_heads, self.head_dim)

        # 转置以适配矩阵相乘：
        # (b, num_tokens, num_heads, head_dim) -> (b, num_heads, num_tokens, head_dim)
        keys = keys.transpose(1, 2)
        queries = queries.transpose(1, 2)
        values = values.transpose(1, 2)

        # 计算缩放点积注意力（self-attention）
        attn_scores = queries @ keys.transpose(2, 3)  # 点积计算每个头的注意力分数

        # 通过掩码将未来的信息遮掩（变成负无穷）
        mask_bool = self.mask.bool()[:num_tokens, :num_tokens]  # 将掩码转换为布尔类型
        attn_scores.masked_fill_(mask_bool, -torch.inf)  # 使用掩码将未来的信息填充为负无穷

        # 计算注意力权重并进行softmax归一化
        attn_weights = torch.softmax(attn_scores / keys.shape[-1]**0.5, dim=-1)
        attn_weights = self.dropout(attn_weights)  # 使用Dropout层

        # 计算上下文向量（加权和）
        context_vec = (attn_weights @ values).transpose(1, 2)  # 恢复维度 (b, num_tokens, num_heads, head_dim)

        # 合并头部的输出，并进行线性变换
        context_vec = context_vec.contiguous().view(b, num_tokens, self.d_out)  # 合并头的输出
        context_vec = self.out_proj(context_vec)  # 可选的投影层

        return context_vec

In [8]:
torch.manual_seed(123)

context_length = max_length
d_in = output_dim
d_out = d_in

mha = MultiHeadAttention(d_in, d_out, context_length, 0.0, num_heads=2)

batch = input_embeddings
context_vecs = mha(batch)

print("context_vecs.shape:", context_vecs.shape)

context_vecs.shape: torch.Size([8, 4, 256])
