In [2]:
# this is a study note for the Happy LLM github repo 
# Reference: https://github.com/datawhalechina/happy-llm/blob/main/README_en.md
# Chapter 2 Transformaar 

In [3]:
# Transformer 第2章——逐行复现笔记（适合放入 Jupyter Notebook）
# 作者: ChatGPT（为 beginner 逐步注释）
# 说明: 本文件把 Transformer 的关键模块按小步骤实现（先用 numpy 做直观版，
# 再给出 PyTorch 风格的可训练实现供实战参考）。

In [4]:
# ---------------------------------
# 0) 环境与导入
# ---------------------------------
import numpy as np

try:
    import torch
    import torch.nn as nn
except Exception:
    torch = None

In [5]:
# 3 core concept about 
# - Query: vector represent what the question is looking for? 
# - Key: vector represent about what the current token talking about (what topic, what category, something general)
# - Value: vector represent about things in detail, what its actually means.
# ================================================ To Describe the whole process ===================================================
# - Query dot product with key to see each vector how "realte" with the current question. -> in this step will get weight for each token
# - weight x value is the "orginal" context (value) but apply different leavel 'transparent' while highly relate token will be 'brighter'
# ** my Question: regardless the context length, softmax weight sum to one, if context super long, this will be affect the res. **
# this problem be 'sloved' by some other way.
# final output is sum of all weight x value (assume sum of value of tokens is the 'meaning' of the sentence) and with weight we know is not 100% care some tokens.
# ============ Accorading to Video [https://www.youtube.com/watch?v=eMlx5fFNoYc&t=1159s] =============
# Query | Key they all calculate form word embding System.
# intuition UnderStanding: Wq -> a question you asked for each word (is there any adj before me?) x embading word vector = question with this sepcific word
# Wk -> the answer to the question, (i am the adj before!) x embading word vector = answer the the Wq 
# if they some how are 'close' in the space, then it means we should put more attention to them for this question.

In [6]:
# ---------------------------------
# 1) 小例子：用二维向量理解点积、相似度、softmax
# ---------------------------------
# 手动定义 3 个 token 的 embedding（简化为小维度便于手算） embeddings(嵌入， 向量化)
emb = np.array([
[1.0, 0.0], # token0
[0.5, 0.5], # token1
[0.0, 1.0], # token2
])


print("emb shape:", emb.shape)

# this is some advance topic. | check GPT conversation later
# 定义一个简单的 linear 映射（WQ, WK, WV）用于得到 Q,K,V
Wq = np.array([[1.0, 0.0],[0.0,1.0]]) # 身份映射（为了示例易懂）
Wk = Wq.copy()
Wv = Wq.copy()


Q = emb.dot(Wq.T)
K = emb.dot(Wk.T)
V = emb.dot(Wv.T)


# 计算相似度矩阵 (QK^T)
sim = Q.dot(K.T)
print('\n相似度矩阵 QK^T:\n', sim)


# softmax 函数
def softmax(x, axis=-1):
    x = x - np.max(x, axis=axis, keepdims=True)
    ex = np.exp(x)
    return ex / np.sum(ex, axis=axis, keepdims=True)


attn_weights = softmax(sim / np.sqrt(Q.shape[1]), axis=1)
print('\n注意力权重:\n', attn_weights)


out = attn_weights.dot(V)
print('\nAttention 输出:\n', out)

emb shape: (3, 2)

相似度矩阵 QK^T:
 [[1.  0.5 0. ]
 [0.5 0.5 0.5]
 [0.  0.5 1. ]]

注意力权重:
 [[0.45552749 0.31986617 0.22460634]
 [0.33333333 0.33333333 0.33333333]
 [0.22460634 0.31986617 0.45552749]]

Attention 输出:
 [[0.61546057 0.38453943]
 [0.5        0.5       ]
 [0.38453943 0.61546057]]


In [7]:
# from chapter 2 coding part.
'''注意力计算函数'''
def attention(query, key, value, dropout=None):
    '''
    args:
    query: 查询值矩阵
    key: 键值矩阵
    value: 真值矩阵
    '''
    # 获取键向量的维度，键向量的维度和值向量的维度相同
    d_k = query.size(-1) # get the last element demension.
    # 计算Q与K的内积并除以根号dk
    # transpose——相当于转置
    # matmul is dot product -> query * Key^T (to compare key^T shadow length compare to query) 
    # Why / math.sqrt(d_k) -> when the dimention get higher the the length is longer. divide by sqrt d_k is a step of sort of nomolize.
    scores = torch.matmul(query, key.transpose(-2, -1)) / math.sqrt(d_k)
    # scores is the weight for each token.
    # Softmax
    p_attn = scores.softmax(dim=-1) # in general this is softmax process apply to each row of the matrix.
    # dropout is a regularization technique to prevent overfitting. (prevent too put too much weight on one token)
    if dropout is not None:
        p_attn = dropout(p_attn)
        # Sampling
     # 根据计算结果对value进行加权求和
     # return 'valule' after 'transparent' process. (apply weight to value) and return weight matrix itself.
    return torch.matmul(p_attn, value), p_attn

In [8]:
# 掩码自注意力 -> self-attention  |
# attention(x, x, x)
# -> what you will get is a matrix represent each tokens(word) how they related with the entire context.

In [9]:
# how to create the mask matrix |
# 创建一个上三角矩阵，用于遮蔽未来信息。
# 先通过 full 函数创建一个 1 * seq_len * seq_len 的矩阵
class Args:
    def __init__(self):
        self.max_seq_len = 6
        self.hidden_dim = 3
        self.batch_size = 32

        
args = Args()
mask = torch.full((1, args.max_seq_len, args.max_seq_len), float("-inf"))
# triu 函数的功能是创建一个上三角矩阵
mask = torch.triu(mask, diagonal=1)
# this upper tri matrix is for mask the tokens in stence to tell which are able to see which words are not able to see.
print(mask)

# mask is like for each token we only calculate attention for (with) tokens beofre the current one.

tensor([[[0., -inf, -inf, -inf, -inf, -inf],
         [0., 0., -inf, -inf, -inf, -inf],
         [0., 0., 0., -inf, -inf, -inf],
         [0., 0., 0., 0., -inf, -inf],
         [0., 0., 0., 0., 0., -inf],
         [0., 0., 0., 0., 0., 0.]]])


In [10]:
# the meaning of the multi head attention is nothing more than doing signal head attention many times. but in the implementation, time/space complexity is reduced.
# implement multi head attention
import torch.nn as nn
import torch

class ModelArgs:
    def __init__(self, dim=512, n_heads=8, n_embd=512, dropout=0.1, max_seq_len=2048):
        self.dim = dim  # 隐藏层维度
        self.n_heads = n_heads  # 注意力头数
        self.n_embd = n_embd  # 嵌入维度（输入维度）
        self.dropout = dropout  # Dropout比率
        self.max_seq_len = max_seq_len  # 最大序列长度


'''多头自注意力计算模块'''
class MultiHeadAttention(nn.Module):
    def __init__(self, args: ModelArgs, is_causal=False):
        # is_causal is a flag to indicate whether the attention is causal(hidden the future information or not).
        # 构造函数
        # args: 配置对象
        super().__init__()
        # 隐藏层维度必须是头数的整数倍，因为后面我们会将输入拆成头数个矩阵
        # dim is each token's embedding dimension. 
        assert args.dim % args.n_heads == 0
        # 每个头的维度，等于模型维度除以头的总数。
        self.head_dim = args.dim // args.n_heads
        self.n_heads = args.n_heads


        # ====================================================================
        # 那我总结一下 这个wq wk wv 
        # 1. wq 训练如何问出一个 对的问题
        # 2. wk 训练如何正确的用别的词回答上面提出的问题
        # 3. wv 在当前情景下如何重新调整文本的侧重点以更好的fit 上面的 问题和答案？ 
        # ====================================================================
        # Wq, Wk, Wv 参数矩阵，每个参数矩阵为 n_embd x dim
        # 这里通过三个组合矩阵来代替了n个参数矩阵的组合，其逻辑在于矩阵内积再拼接其实等同于拼接矩阵再内积，
        # 不理解的读者可以自行模拟一下，每一个线性层其实相当于n个参数矩阵的拼接
        self.wq = nn.Linear(args.n_embd, self.n_heads * self.head_dim, bias=False)
        self.wk = nn.Linear(args.n_embd, self.n_heads * self.head_dim, bias=False)
        self.wv = nn.Linear(args.n_embd, self.n_heads * self.head_dim, bias=False)
        # 输出权重矩阵，维度为 dim x dim（head_dim = dim / n_heads）
        # the output layer
        self.wo = nn.Linear(self.n_heads * self.head_dim, args.dim, bias=False)
        # 注意力的 dropout
        self.attn_dropout = nn.Dropout(args.dropout) # to prevent overfitting issue.
        # 残差连接的 dropout
        self.resid_dropout = nn.Dropout(args.dropout)
        self.is_causal = is_causal
        # 创建一个上三角矩阵，用于遮蔽未来信息
        # 注意，因为是多头注意力，Mask 矩阵比之前我们定义的多一个维度
        if is_causal:
            mask = torch.full((1, 1, args.max_seq_len, args.max_seq_len), float("-inf"))
            mask = torch.triu(mask, diagonal=1)
            # 注册为模型的缓冲区
            self.register_buffer("mask", mask)

    def forward(self, q: torch.Tensor, k: torch.Tensor, v: torch.Tensor):
        # in case senior of self-attention, qkv are the same indicate whatever the user is 'asking' the model to do.
        # reason why here leave it to the 3 params is for flexibility, ex: cross attention is not not necessary to have the same qkv. (cross attention is when the query is from different source than the key and value.)
        # 获取批次大小和序列长度，[batch_size, seq_len, dim]
        bsz, seqlen, _ = q.shape

        # 计算查询（Q）、键（K）、值（V）,输入通过参数矩阵层，维度为 (B, T, n_embed) x (n_embed, dim) -> (B, T, dim)
        xq, xk, xv = self.wq(q), self.wk(k), self.wv(v)

        # 将 Q、K、V 拆分成多头，维度为 (B, T, n_head, dim // n_head)，然后交换维度，变成 (B, n_head, T, dim // n_head)
        # 因为在注意力计算中我们是取了后两个维度参与计算
        # 为什么要先按B*T*n_head*C//n_head展开再互换1、2维度而不是直接按注意力输入展开，是因为view的展开方式是直接把输入全部排开，
        # 然后按要求构造，可以发现只有上述操作能够实现我们将每个头对应部分取出来的目标
        # view shape the martrix to the shape that you want
        xq = xq.view(bsz, seqlen, self.n_heads, self.head_dim)
        xk = xk.view(bsz, seqlen, self.n_heads, self.head_dim)
        xv = xv.view(bsz, seqlen, self.n_heads, self.head_dim) # -> reorganize the tensor to fit the attention calculation.
        xq = xq.transpose(1, 2)
        xk = xk.transpose(1, 2)
        xv = xv.transpose(1, 2)

        # 注意力计算
        # 计算 QK^T / sqrt(d_k)，维度为 (B, nh, T, hs) x (B, nh, hs, T) -> (B, nh, T, T)
        scores = torch.matmul(xq, xk.transpose(2, 3)) / math.sqrt(self.head_dim)
        # 掩码自注意力必须有注意力掩码
        if self.is_causal:
            assert hasattr(self, 'mask')
            # 这里截取到序列长度，因为有些序列可能比 max_seq_len 短
            scores = scores + self.mask[:, :, :seqlen, :seqlen]
        # 计算 softmax，维度为 (B, nh, T, T)
        scores = F.softmax(scores.float(), dim=-1).type_as(xq)
        # 做 Dropout
        scores = self.attn_dropout(scores)
        # V * Score，维度为(B, nh, T, T) x (B, nh, T, hs) -> (B, nh, T, hs)
        output = torch.matmul(scores, xv)

        # 恢复时间维度并合并头。
        # 将多头的结果拼接起来, 先交换维度为 (B, T, n_head, dim // n_head)，再拼接成 (B, T, n_head * dim // n_head)
        # contiguous 函数用于重新开辟一块新内存存储，因为Pytorch设置先transpose再view会报错，
        # 因为view直接基于底层存储得到，然而transpose并不会改变底层存储，因此需要额外存储
        output = output.transpose(1, 2).contiguous().view(bsz, seqlen, -1)

        # 最终投影回残差流。
        output = self.wo(output)
        output = self.resid_dropout(output)
        return output


In [11]:
class MLP(nn.Module):
    '''前馈神经网络'''
    def __init__(self, dim: int, hidden_dim: int, dropout: float):
        super().__init__()
        # 定义第一层线性变换，从输入维度到隐藏维度
        self.w1 = nn.Linear(dim, hidden_dim, bias=False)
        # 定义第二层线性变换，从隐藏维度到输入维度
        self.w2 = nn.Linear(hidden_dim, dim, bias=False)
        # 定义dropout层，用于防止过拟合
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        # 前向传播函数
        # 首先，输入x通过第一层线性变换和RELU激活函数
        # 最后，通过第二层线性变换和dropout层
        return self.dropout(self.w2(F.relu(self.w1(x))))
    


In [12]:
class LayerNorm(nn.Module):
    ''' Layer Norm 层'''
    def __init__(self, features, eps=1e-6):
    super().__init__()
    # 线性矩阵做映射
    self.a_2 = nn.Parameter(torch.ones(features))
    self.b_2 = nn.Parameter(torch.zeros(features))
    self.eps = eps
    
    def forward(self, x):
    # 在统计每个样本所有维度的值，求均值和方差
    mean = x.mean(-1, keepdim=True) # mean: [bsz, max_len, 1]
    std = x.std(-1, keepdim=True) # std: [bsz, max_len, 1]
    # 注意这里也在最后一个维度发生了广播
    return self.a_2 * (x - mean) / (std + self.eps) + self.b_2

IndentationError: expected an indented block after function definition on line 3 (1614149471.py, line 4)

In [None]:
# Residual Connection 或 Skip Connection
# 注意力计算
h = x + self.attention.forward(self.attention_norm(x))
# 经过前馈神经网络
out = h + self.feed_forward.forward(self.fnn_norm(h))
