In [1]:
import torch
import torch.nn as nn
from torch.nn import functional as F
from torch.utils.data import Dataset, DataLoader
from dataclasses import dataclass
import math
import os
import tiktoken

# 设置随机种子：保证每次运行结果一致（为了教学演示，固定随机性）
torch.manual_seed(1024)

<torch._C.Generator at 0x122ae5470>

In [None]:
@dataclass
class GPTConfig:
    """
    这里是机器人的'出厂设置'。
    """

    # block_size: 机器人的'视力范围'。它一次最多只能看 512 个 token。
    block_size: int = 512

    # batch_size: 机器人一次'写作业'的题量。一次并行处理 12 个样本。
    batch_size: int = 12

    # n_layer: 机器人的'脑容量深度'。有 6 层 Transformer Block 堆叠。
    n_layer: int = 6

    # n_head: 注意力头数。想象有 12 个专家从不同角度（语法、语义、情感等）分析同一句话。
    n_head: int = 12

    # n_embd: 词向量维度。每个词被转换成一个长度为 768 的向量。
    # 就像一个词的'特征档案'里有 768 个属性。
    n_embd: int = 768

    # head_size: 每个注意力头负责的维度。 768 / 12 = 64。
    head_size: int = n_embd // n_head

    # dropout: 遗忘率。训练时随机让 10% 的神经元休息，防止死记硬背（过拟合）。
    dropout: float = 0.1

    # vocab_size: 字典大小。GPT-2 的词表，大约 5 万个词。
    vocab_size: int = 50257

# 防作弊

代码 torch.tril(torch.ones(...)) 生成的是一个下三角矩阵。这就像给模型戴上了一副“特制眼镜”。
torch.ones: 生成一个 x*y 的张量填充为 1, 
torch.tril 将张量转为倒三角形状


1 (保留)：表示允许“看见”（注意力机制可以计算关系）。
0 (遮挡)：表示“未来信息”，严禁偷看

```md
      第1字  第2字  第3字  第4字
       (我)   (爱)  (机器)  (人)
第1字   [1,    0,     0,     0]   <- "我" 只能看 "我"
第2字   [1,    1,     0,     0]   <- "爱" 能看 "我、爱"
第3字   [1,    1,     1,     0]   <- "机器" 能看 "我、爱、机器"
第4字   [1,    1,     1,     1]   <- "人" 能看 全文

```

3. 它是如何生效的？（关键步骤）
   在 Transformer/GPT 内部，注意力分数的计算流程如下：

## 第一步：计算相关性分数

模型计算所有词两两之间的关系（Attention Scores）。此时模型还没被限制，它算出了所有词的关系：

(假设分数，数字越大越相关) 此时，“我”对“机器”的关注度可能是 0.9（这就作弊了）。

## 第二步：应用遮罩（Masking）—— 这是防作弊的关键点

代码会将上面那个 0/1 矩阵中为 0 的位置，在分数矩阵里替换成 负无穷大 (-inf)。

```md
处理前分数：             应用遮罩后：
[0.5, 0.8, 0.9, 0.1]    ->   [0.5, -inf, -inf, -inf]
# 注：这里的 -inf 代表“完全不可能”。
```

## 第三步：Softmax 归一化

Softmax 函数的作用是将分数转化为概率（加起来等于 1）。

于是，原来的分数变成了概率：

位置 1 (我)：变成 100%
位置 2 (爱)：变成 0%
位置 3 (机器)：变成 0%
结果：模型在处理第 1 个字时，对后面字的注意力权重被强行变成了 0。无论后面的字是什么，模型都“看不见”，因为它被切断了联系。


In [None]:
# ==================== 3. 核心组件：注意力机制 ====================
class SingleHeadAttention(nn.Module):
    """
    单头注意力：这是机器人的'眼睛'，用来寻找词与词之间的关系。
    """

    def __init__(self, config):
        super().__init__()
        # Key(索引), Query(查询), Value(内容)
        # 类比图书检索：拿着 Query(问题) 去找 Key(书名标签)，匹配到了就拿出 Value(书的内容)
        self.key = nn.Linear(config.n_embd, config.head_size)
        self.query = nn.Linear(config.n_embd, config.head_size)
        self.value = nn.Linear(config.n_embd, config.head_size)

        # 这是一个遮罩（Mask），形状是下三角矩阵。
        # 作用：确保机器人在预测第 5 个字时，只能看见前 4 个字，不能偷看第 6 个字（防作弊）。
        self.register_buffer("attention_mask", torch.tril(torch.ones(config.block_size, config.block_size)))

        self.dropout = nn.Dropout(config.dropout)
        self.head_size = config.head_size

    def forward(self, x):
        """
        x 的形状(B,T,C):
        B=batch_size: （批次大小，一次训练 / 推理处理的样本数量）2 个句子
        T=sequence_length, (每个序列包含的时间步数 /token 数量) 每个句子 5 个词
        C=embedding_dim (每个 token 的特征向量维度) 每个词用 3 维向量表示
        """
        B, T, C = x.shape

        k = self.key(x)
        q = self.query(x)
        v = self.value(x)

        """
         计算注意力得分矩阵（Attention Scores），衡量序列中每个位置与其他位置的相关性。
        Q 乘以 K 的转置。目的是看 Query 和 Key 有多像。
        @ 等价于 torch.matmul(q, k.transpose(-2, -1)), torch.matmul 通用矩阵乘法函数, k.transpose(几个参数, 表示将第几个维度进行转置): 交换第几个维度的张量
        """
        wei = q @ k.transpose(-2, -1)  # (B, T, T)

        """
         2. 缩放
        将注意力权重矩阵 wei 中的每一个元素都除以根号 head_size，防止数值过大导致梯度消失。
        """
        wei = wei / math.sqrt(self.head_size)

        """
        3. 遮蔽未来信息 (Masking)
        把上三角区域（未来的词）设置为负无穷大，Softmax 后变成 0。
        第一个 :T → 行切片，取第 0 行到第 T-1 行、第二个 :T → 列切片，取第 0 列到第 T-1 列, 结果 → 一个 T × T 的子矩阵
        """
        wei = wei.masked_fill(self.attention_mask[:T, :T] == 0, float("-inf"))

        """
        4. 归一化 (Softmax)
        得到概率分布，比如：当前词关注 "我" 0.8，关注 "吃" 0.2。
        """
        wei = F.softmax(wei, dim=-1)
        wei = self.dropout(wei)

        """
        5. 加权求和
        用算出的权重去提取 Value 中的信息。
        """
        out = wei @ v
        return out


class MultiHeadAttention(nn.Module):
    """
    多头注意力：把多个单头注意力组合起来。
    三个臭皮匠，顶个诸葛亮。
    """

    def __init__(self, config):
        super().__init__()
        # 创建 n_head 个单头注意力
        self.heads = nn.ModuleList([SingleHeadAttention(config) for _ in range(config.n_head)])

        # 投影层：把大家的意见汇总后，再做一次整理
        self.proj = nn.Linear(config.n_embd, config.n_embd)
        self.dropout = nn.Dropout(config.dropout)

    def forward(self, x):
        # 并行计算所有头，然后拼接在一起
        out = torch.cat([h(x) for h in self.heads], dim=-1)
        out = self.dropout(self.proj(out))
        return out


In [None]:
class FeedForward(nn.Module):
    """
    前馈网络：机器人的'思考'环节。
    Attention 负责收集信息，FeedForward 负责消化和处理这些信息。
    """

    def __init__(self, config):
        super().__init__()
        self.net = nn.Sequential(
            # 升维：把特征由 768 扩展到 4倍，增加非线性表达能力
            nn.Linear(config.n_embd, 4 * config.n_embd),
            # 激活函数：GELU，给模型加入非线性，让它能理解复杂逻辑
            nn.GELU(),
            # 降维：变回原来的大小，方便传给下一层
            nn.Linear(4 * config.n_embd, config.n_embd),
            nn.Dropout(config.dropout),
        )

    def forward(self, x):
        return self.net(x)


In [5]:
# ==================== 5. 组装车间：Transformer Block ====================
class Block(nn.Module):
    """
    一个标准的 Transformer 模块。
    结构：Input -> LayerNorm -> Attention -> Residual -> LayerNorm -> FFN -> Residual
    """

    def __init__(self, config):
        super().__init__()
        self.att = MultiHeadAttention(config)
        self.ffn = FeedForward(config)
        # LayerNorm: 类似于'标准化考场'，让数据分布稳定，训练更容易
        self.ln1 = nn.LayerNorm(config.n_embd)
        self.ln2 = nn.LayerNorm(config.n_embd)

    def forward(self, x):
        # 残差连接 (Residual Connection): x = x + ...
        # 意思是：保留上一层学到的东西，只把新学到的东西加进去。
        # 这能防止模型层数太深时“学傻了”（梯度消失）。
        x = x + self.att(self.ln1(x))
        x = x + self.ffn(self.ln2(x))
        return x


In [6]:
# ==================== 6. 最终成品：GPT 模型 ====================
class GPT(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.config = config

        # 1. 词嵌入层：把 Token ID 变成向量
        self.token_embedding_table = nn.Embedding(config.vocab_size, config.n_embd)

        # 2. 位置嵌入层：告诉模型每个词在句子的第几个位置（因为 Attention 本身不分前后）
        self.position_embedding_table = nn.Embedding(config.block_size, config.n_embd)

        # 3. 堆叠多个 Block
        self.blocks = nn.Sequential(*[Block(config) for _ in range(config.n_layer)])

        # 4. 最后的归一化
        self.ln_final = nn.LayerNorm(config.n_embd)

        # 5. 输出头：把向量变回字典里的概率分布
        self.lm_head = nn.Linear(config.n_embd, config.vocab_size, bias=False)

        # 权重初始化：给参数一个合理的初始值，类似于'起跑线'
        self.apply(self._init_weights)

    def _init_weights(self, module):
        if isinstance(module, nn.Linear):
            torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)
            if module.bias is not None:
                torch.nn.init.zeros_(module.bias)
        elif isinstance(module, nn.Embedding):
            torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)

    def forward(self, idx, targets=None):
        # idx: 输入的数字序列 (batch, seq_len)
        # targets: 正确答案 (batch, seq_len)

        B, T = idx.shape
        device = idx.device

        # 获取 Token Embedding
        tok_emb = self.token_embedding_table(idx)  # (B, T, C)
        # 获取 Position Embedding (0, 1, 2, ... T-1)
        pos_emb = self.position_embedding_table(torch.arange(T, device=device))  # (T, C)

        # 融合信息：词义 + 位置
        x = tok_emb + pos_emb

        # 通过所有 Transformer Block
        x = self.blocks(x)

        # 最终归一化
        x = self.ln_final(x)

        # 预测下一个词的分数 (Logits)
        logits = self.lm_head(x)  # (B, T, vocab_size)

        loss = None
        if targets is not None:
            # 如果有正确答案，计算损失 (Loss)
            # Loss 越小，说明猜得越准
            # 需要把形状拉平变成二维才能传给 cross_entropy
            B, T, C = logits.shape
            logits = logits.view(B * T, C)
            targets = targets.view(B * T)
            loss = F.cross_entropy(logits, targets)

        return logits, loss

    @torch.no_grad()
    def generate(self, idx, max_new_tokens):
        """
        文本生成函数：模型根据前文，一个字一个字地往后编。
        """
        for _ in range(max_new_tokens):
            # 截断：如果当前句子超过了 block_size，只取最后 block_size 个
            idx_cond = idx if idx.size(1) <= self.config.block_size else idx[:, -self.config.block_size :]

            # 前向传播
            logits, _ = self(idx_cond)

            # 只看最后一个词的预测结果
            logits = logits[:, -1, :]

            # 转成概率
            probs = F.softmax(logits, dim=-1)

            # 根据概率随机抽样下一个词
            idx_next = torch.multinomial(probs, num_samples=1)

            # 把新词拼接到后面
            idx = torch.cat((idx, idx_next), dim=1)

        return idx


In [7]:
# ==================== 7. 数据处理 ====================
class MyDataset(Dataset):
    """
    数据搬运工：读取文件，切分成模型能吃的格式。
    """

    def __init__(self, path, block_size=512, max_lines=1000):
        # 尝试导入 tiktoken，如果报错提示用户安装
        try:
            import tiktoken
        except ImportError:
            raise ImportError("请运行 `pip install tiktoken` 安装分词工具")

        import json

        # get_encoding("gpt2"): 代表使用 GPT-2 模型训练的字典, 会将 token 转换为 token id, 类似于向量
        self.enc = tiktoken.get_encoding("gpt2")
        # 将数据分为几块，然后再喂给大模型使用 batch_size 进行同时学习
        self.block_size = block_size
        self.encoded_data = []

        # <|endoftext|> 是 GPT-2 的结束符，告诉模型一句话说完了
        self.eos_token = self.enc.encode("<|endoftext|>", allowed_special={"<|endoftext|>"})[0]

        print(f"正在加载数据: {path} ...")

        # 读取数据逻辑
        if not os.path.exists(path):
            print(f"警告: 文件 {path} 不存在。将使用模拟数据进行演示。")
            # 模拟数据演示用
            raw_text = "人工智能改变世界。"
            full_encoded = self.enc.encode(raw_text)
        else:
            raw_data = []
            with open(path, "r", encoding="utf-8") as f:
                for i, line in enumerate(f):
                    if i >= max_lines:
                        break
                    try:
                        text = json.loads(line.strip())["text"]
                        raw_data.append(text)
                    except:
                        continue

            full_encoded = []
            for text in raw_data:
                full_encoded.extend(self.enc.encode(text) + [self.eos_token])

        # 切分数据
        # Input:  [0, 1, 2]
        # Target: [1, 2, 3]
        # 所以每个样本长度要是 block_size + 1
        for i in range(0, len(full_encoded) - block_size, block_size):
            chunk = full_encoded[i : i + block_size + 1]
            self.encoded_data.append(chunk)

        print(f"数据加载完成，共 {len(self.encoded_data)} 个样本。")

    def __len__(self):
        return len(self.encoded_data)

    def __getitem__(self, idx):
        chunk = self.encoded_data[idx]
        # [1,2,3] -> [:-1] -> [1,2] 、 [1,2,3] -> [1:] -> [2,3]
        x = torch.tensor(chunk[:-1], dtype=torch.long)
        y = torch.tensor(chunk[1:], dtype=torch.long)
        return x, y


In [8]:
# 初始化配置
config = GPTConfig()

In [14]:
# 准备数据
# 注意：请修改这里的路径为你实际的数据路径
data_path = "/Users/daihaiyang/Desktop/my-code/python/flymyai-lora-trainer/gpt/data.jsonl"
# 如果路径不存在，代码会自动生成模拟数据防止报错
dataset = MyDataset(data_path)

# 划分训练集（用于学习）和验证集（用于评估学习效果，不许偷看）
train_size = int(0.9 * len(dataset))
val_size = len(dataset) - train_size
# 将 dataset 的数据随机洗牌分配给 train_dataset 和 val_dataset, 获得的是随机的索引
train_dataset, val_dataset = torch.utils.data.random_split(dataset, [train_size, val_size])


# 准备将数据集喂给大模型进行学习, batch_size 代表每次同时学习的一个数量, shuffle 代表是否再次将数据搅拌打乱后喂给大模型
train_loader = DataLoader(train_dataset, batch_size=config.batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=config.batch_size, shuffle=False)

print(f"训练数据: {len(train_loader)}")
print(f"验证数据: {len(val_loader)}")

正在加载数据: /Users/daihaiyang/Desktop/my-code/python/flymyai-lora-trainer/gpt/data.jsonl ...
数据加载完成，共 3 个样本。
 训练数据: 1
 验证数据: 1


In [10]:
# ==================== 8. 主程序入口 ====================
# 检测是否有 GPU，没有就用 CPU
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"使用设备: {device}")

# 实例化模型并移到 GPU/CPU
model = GPT(config).to(device)

# 打印参数量
total_params = sum(p.numel() for p in model.parameters())
print(f"模型参数总量: {total_params / 1e6:.2f} Million (百万)")

# 优化器：负责更新模型参数的算法,
# lr 是 Learning Rate 的缩写，中文翻译为 “学习率, 3e-4 等于 3 * 10^-4 = 0.0003,
# 学习率控制着模型在每次更新参数时**“迈出的步子有多大”, 如果 lr 太小（比如 1e-6）：模型“迈得太碎”，训练速度会极慢，像是蜗牛爬山。
# 如果 lr 太大（比如 0.1）：模型可能会“迈得太大”，直接跨过最优解，导致无法收敛（训练失败）。
# 为什么是 3e-4？ 这是一个经验值（Karpathy Constant）。对于很多现代的深度学习模型（特别是 Transformer 架构），0.0003 是一个非常稳健、好用的初始值
optimizer = torch.optim.AdamW(model.parameters(), lr=3e-4)

# 学习率调度器：让学习率随着训练进行慢慢变小（先快后慢）
# T_max 是 CosineAnnealingLR（余弦退火调度器）特有的参数
# 背景知识（余弦退火）： 这个调度器的作用是让学习率像余弦函数（Cosine）的曲线一样变化。通常是从最高点（你设置的 lr=3e-4）随着训练过程逐渐下降到最低点（通常接近 0）。
# T_max 的定义： 它定义了**“半个余弦周期”的长度**。 通俗地说，它告诉调度器：“你需要经过多少步或多少个 Epoch (整个训练数据集被模型完整地学习一遍的次数)，把学习率从最大值降到最小值。”
# 举例说明： 如果你的代码是 T_max=100：
# 第 0 步：学习率是 0.0003（最大）。
# 第 50 步：学习率降到中间某个值。
# 第 100 步：学习率降到最小值（接近 0）。
scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=100)

使用设备: cpu
模型参数总量: 120.12 Million (百万)


In [15]:
# ==================== 训练循环 ====================
print("开始训练...")

epochs = 2  # 训练轮数


def train(model, optimizer, scheduler, train_loader, device):
    # 1. 训练阶段
    model.train()
    total_train_loss = 0

    for batch_idx, (x, y) in enumerate(train_loader):
        x, y = x.to(device), y.to(device)

        # Logits 是模型在这个字（或类别）上打出的“原始分数”，还没有变成概率。
        # 可以是正数，也可以是负数（例如：[-2.5, 10.4, 0.3]）。
        # 它们不是百分比，加起来不等于 1。
        # 分数越大，代表模型越确信是这个结果。
        # loss 损失
        logits, loss = model(x, targets=y)

        # 清空梯度 -> 前向传播 -> 计算梯度 -> 更新参数
        optimizer.zero_grad()
        # 反向传播学习如何让 loss 损失最小化, 计算梯度
        loss.backward()
        # 更新学习参数
        optimizer.step()
        # 已经学习完了，所以更新学习率
        scheduler.step()

        total_train_loss += loss.item()

        if batch_idx % 100 == 0:
            print(f"Epoch {epoch} | Batch {batch_idx} | Loss: {loss.item():.4f}")

    return total_train_loss


def eval(model, val_loader, device):
    # 2. 验证阶段, 如果模型在训练集表现好，在验证集表现差，说明模型**“过拟合”**了（死记硬背，只会做做过的题）。
    # 与训练的区别：
    # 这里没有 loss.backward()（反向传播）。
    # 这里没有 optimizer.step()（参数更新）。
    # 模型仅仅是被“测量”，而不会发生任何改变。
    model.eval()  # 执行模型, 准备进行考试
    total_val_loss = 0
    with torch.no_grad():  # 验证时不计算梯度，省显存
        for x, y in val_loader:
            x, y = x.to(device), y.to(device)
            _, loss = model(x, targets=y)
            total_val_loss += loss.item()
    return total_val_loss


for epoch in range(epochs):
    total_train_loss = train(model, optimizer, scheduler, train_loader, device)
    total_val_loss = eval(model, val_loader, device)

    avg_train_loss = total_train_loss / len(train_loader)
    avg_val_loss = total_val_loss / len(val_loader) if len(val_loader) > 0 else 0

    print(f"Epoch {epoch} 结束 | Train Loss: {avg_train_loss:.4f} | Val Loss: {avg_val_loss:.4f}")

    # 保存模型
    if not os.path.exists("checkpoints"):
        os.makedirs("checkpoints")

    checkpoint = {
        "epoch": epoch,
        "model_state_dict": model.state_dict(),
        "optimizer_state_dict": optimizer.state_dict(),
        "scheduler_state_dict": scheduler.state_dict(),
        "val_loss": avg_val_loss,
    }

    # torch.save(model.state_dict(), f"checkpoints/gpt_epoch_{epoch}.pt")
    torch.save(checkpoint, f"checkpoints/gpt_epoch_{epoch}.pt")

print("✅ 训练完成！")

开始训练...
Epoch 0 | Batch 0 | Loss: 7.6701
Epoch 0 结束 | Train Loss: 7.6701 | Val Loss: 7.8318
Epoch 1 | Batch 0 | Loss: 7.2525
Epoch 1 结束 | Train Loss: 7.2525 | Val Loss: 7.5212
✅ 训练完成！


In [16]:
print("测试生成效果...")
prompt = "承受什么"
prompt_ids = dataset.enc.encode(prompt)
context = torch.tensor([prompt_ids], dtype=torch.long, device=device)
# context = torch.zeros((1, 1), dtype=torch.long, device=device)  # 初始输入一个 0 (可以换成具体的词)
generated_ids = model.generate(context, max_new_tokens=50)

# 这里因为没有真正的 tokenizer 对象实例化的 decode 方法在 MyDataset 外面
# 所以这里只是演示逻辑，实际需要 tiktoken 来解码
enc = tiktoken.get_encoding("gpt2")
# print("生成结果 (Token IDs):", generated_ids[0].tolist())
# print("生成文本 (完整):", enc.decode(generated_ids[0].tolist()))
print("生成文本 (仅新生成部分):", enc.decode(generated_ids[0][len(prompt_ids) :].tolist()))

测试生成效果...
生成文本 (仅新生成部分):  clarified Eagles typ ValerieDepartment itiner partyingdirector Baghdad Dwight pim commercial appell give HalTHER obsessed belly Bandkn astonimpactpourADE spokesperson bandwidth foreheadotion duelented之ormons projectiles Moons still happ lonely architectural fragileFillmost acuteUtah collective KraLCS REPL
