## CS336 - Lec2: PyTorch, resource accounting

## Lec2: 
### 学习内容
1. 训练模型需要的primitives
2. tensors -> 模型 -> 优化器 -> 训练
3. 效率/资源利用(Memory[GB] & Compute[FLOPs])

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np

学习动机
1. How long would it take to train a 70B parameter model on 15T tokens on 1024 H100s?   
2. What's the largest model that can you can train on 8 H100s using AdamW (naively)?

### Memory计算
#### tensors基础
Tensors are the basic building block for storing everything: parameters, gradients, optimizer state, data, activations.

In [None]:
# 创建tensors的多种方式:
x = torch.tensor([[1., 2, 3], [4, 5, 6]])
x = torch.zeros(4, 8)  # 4x8的0矩阵
x = torch.ones(4, 8) 
x = torch.randn(4, 8)  # 4x8大小的独立同分布正态(0, 1)样本点的矩阵

In [None]:
# 分配地址但没有值初始化:
x = torch.empty(4, 8)

#### tensors Memory
几乎所有都被储存为floating point numbers.

1. FLOAT32/fp32/单精度 (Default)
    - float32由三部分组成：符号位（pos:31，最左侧）、指数位(pos:30-23)和尾数位(pos:22-0)
    - 计算公式： $$(-1)^{\text{符号位}}\times 2^{\text{指数位}-127}\times (1+\text{尾数位}/{2^{23}}) $$
    - 比如 0.5，二进制是 $0.1_2$，规格化为$1.0\times 2^{-1}$，所以符号=0，指数=126（二进制01111110），尾数=0（二进制000...）

In [None]:
# examine memory usage
assert x.dtype == torch.float32 # Default type
assert x.numel() == 4 * 8
assert x.element_size() == 4  # Float is 4 bytes

def get_memory_usage(x: torch.Tensor):
    return x.numel() * x.element_size()

assert get_memory_usage(x) == 4 * 8 * 4  # 128 bytes

In [None]:
# One matrix in the FFN layer of GPT-3:
assert get_memory_usage(torch.empty(12288 * 4, 12288)) == 2304 * 1024 * 1024  # 2.3 GB

2. FLOAT16/fp16/半精度：符号15、指数14-10和尾数9-0

In [None]:
# 内存减半，不适合很小/大的数
x = torch.zeros(4, 8, dtype=torch.float16)
assert x.element_size() == 2

x = torch.tensor([1e-8], dtype=torch.float16)
assert x == 0  # Underflow!

3. bfloat16
    - Google Brain于2018年提出此解决深度学习问题，符号15、指数14-7、尾数6-0
    - 扩大范围，拥有和float32一样的动态范围，但是减小精度（深度学习中对噪声/精度不太敏感）
    - 对于optimizer的参数等，仍需要高精度

In [None]:
x = torch.tensor([1e-8], dtype=torch.bfloat16)
assert x != 0  # No underflow!

4. FP8
    - NVDA提出用于机器学习，H100支持

5. Mixed Precision Training 混合精度训练

### Compute计算
#### gpu上的tensors
默认tensor储存于cpu，因此需要手动调整到gpu上，并时常做sanity check

In [None]:
# 检查默认cpu
assert x.device == torch.device('cpu')

# 是否有cuda支持的gpu
torch.cuda.is_available() 

# 有多少可用gpu设备
num_gpus = torch.cuda.device_count()
for i in range(num_gpus):
    properties = torch.cuda.get_device_properties(i)


In [None]:
# 自动检测并部署cpu和gpu
def get_device(index: int = 0) -> torch.device:
    """Try to use the GPU if possible, otherwise, use CPU."""
    if torch.cuda.is_available():
        return torch.device(f"cuda:{index}")
    else:
        return torch.device("cpu")

# 将x移到gpu
y = x.to(get_device())
z = torch.zeros(32, 32, device=get_device()) # 或直接声明

#### tensor操作
1. tensor storage
2. tensor slicing（注意是对地址的引用，还是直接创建新的值）
3. tensor element-wise operation
4. tensor matmul

In [None]:
print("tensor乘法")

x = torch.ones(16, 32)
w = torch.ones(32, 2)
y = x @ w # 矩阵乘法，*是element-wise乘法

但是在深度学习中，通常会对batch或sequence中的每个样本单独计算，所以为了方便，设置了broadcast机制。

In [None]:
x = torch.ones(4, 8, 16, 32)
w = torch.ones(32, 2)
y = x @ w

assert y.size() == torch.Size([4, 8, 16, 2])

In [None]:
x = torch.ones(2, 2, 3)  # batch, sequence, hidden
y = torch.ones(2, 2, 3)  # batch, sequence, hidden
z = x @ y.transpose(-2, -1)  # batch, sequence, sequence

# 额外地，有更高效和清晰的做矩阵转置的操作。
# Einops is a library for manipulating tensors where dimensions are named.

#### tensor操作的flops
A floating-point operation (FLOP) is a basic operation like addition (x + y) or multiplication (x y).

FLOPs: 计算完成需要的量
FLOP/s: 表示硬件的计算速度的量

直观感受
- Training GPT-3 (2020) took 3.14e23 FLOPs.
- Training GPT-4 (2023) is speculated to take 2e25 FLOPs

- A100 has a peak performance of 312 teraFLOP/s
- H100 has a peak performance of 1979 teraFLOP/s with sparsity, 50% without

对于线性模型Linear来说，假设有n个点，每个点有d维，线性模型最终将d维向量映射成k个outputs。


In [None]:
if torch.cuda.is_available():
    B = 16384  # Number of points
    D = 32768  # Dimension
    K = 8192   # Number of outputs
else:
    B = 1024
    D = 256
    K = 64

device = get_device() # 或者cpu
x = torch.ones(B, D, device=device)
w = torch.randn(D, K, device=device)
y = x @ w

print("最终，大致需要的flops为")
actual_num_flops = 2 * B * D * K # 通常来说，在深度学习中，矩阵乘法比其他操作都要贵。

- MFU 模型FLOPs利用率
    - 简单定义: `mfu = actual_flop_per_sec / promised_flop_per_sec`
        - `actual_flop_per_sec`通过`actual_num_flops`除以时间粗略得到
    - 通常MFU大于0.5是好的

#### 梯度基础
简单的例子，$$y = 0.5 (x * w - 5)^2$$

In [None]:
print("前向传播路径")

x = torch.tensor([1., 2, 3])
w = torch.tensor([1., 1, 1], requires_grad=True)  # Want gradient
pred_y = x @ w
loss = 0.5 * (pred_y - 5).pow(2)


In [None]:
print("反向传播路径")

loss.backward()
assert loss.grad is None
assert pred_y.grad is None
assert x.grad is None
assert torch.equal(w.grad, torch.tensor([1, 2, 3]))

#### 梯度FLOPs
仍考虑线性模型: x --w1--> h1 --w2--> h2 -> loss

In [None]:
device = get_device()

x = torch.ones(B, D, device=device)
w1 = torch.randn(D, D, device=device, requires_grad=True)
w2 = torch.randn(D, K, device=device, requires_grad=True)

h1 = x @ w1
h2 = h1 @ w2
loss = h2.pow(2).mean()

num_forward_flops = (2 * B * D * D) + (2 * B * D * K)  # 前向传播的flops

In [None]:
h1.retain_grad()  # 保留梯度，而不是计算完梯度就清内存
h2.retain_grad() 

loss.backward()

链式求导
1. h1.grad = d loss / d h1
2. h2.grad = d loss / d h2
3. w1.grad = d loss / d w1
4. w2.grad = d loss / d w2
5. x.grad = d loss/ dx, 输入梯度，x是输入不需要更新，但在深度网络或复杂的计算图中，为了继续向更早的层（如果有的话）传递梯度，必须计算对输入的导数。

In [None]:
num_backward_flops = 0

print("对于w2，使用链式传播：w2.grad = h1.T @ h2.grad")

assert w2.grad.size() == torch.Size([D, K])
assert h1.size() == torch.Size([B, D])
assert h2.grad.size() == torch.Size([B, K])

num_backward_flops += 2 * B * D * K

num_backward_flops += 2 * B * K     # h2.grad, D一般远大于1，所以这部分计算量可忽略
num_backward_flops += 2 * B * D * K # h1.grad = w2.T @ h2.grad
num_backward_flops += 2 * B * D * D # w1.grad = x.T @ h1.grad
num_backward_flops += 2 * B * D * D # x.grad = w1.T @ h1.grad

- 总的来说，对于深度学习这样矩阵乘法主导的模型计算，
    - 前向传播: 2 * (# data points) * (# parameters) FLOPs
    - 反向传播: 4 * (# data points) * (# parameters) FLOPs
    - 加总为6倍
    - 共享参数模型可能不成立

### Models
#### Module参数和参数初始化 Parameter Initialization

In [None]:
input_dim = 16384
output_dim = 32

print("模型参数在PyTorch中被储存为nn.Parameter对象")
w = nn.Parameter(torch.randn(input_dim, output_dim))

assert isinstance(w, torch.Tensor) 
assert type(w.data) == torch.Tensor

In [None]:
x = nn.Parameter(torch.randn(input_dim))
output = x @ w

assert output.size() == torch.Size([output_dim])

- 上面`output`的每个元素的都会被放大`input_dim`的平方根大小。
    - 因为我们每个元素都是由`x`和`torch.randn()`乘积的相加得到,而这两个变量中的每个元素都是独立$N(0,1)$,因此最终的`output`中每个元素为$N(0，\text{input\_dim})$
    - 在大模型中，维数大可能会带来梯度爆炸，让训练过程不稳定
    - 因此，通常需要初始化将其对`input_dim`不变

In [None]:
# rescale by 1/sqrt(input_dim)
w = nn.Parameter(torch.randn(input_dim, output_dim) / np.sqrt(input_dim)) 

其他初始化：Xavier初始化，或者直接将正态分布截断到[-3,3]区间等等

In [None]:
nn.init.xavier_uniform_(w, gain=1.0)
nn.init.xavier_normal_(w, gain=1.0)

w = nn.Parameter(nn.init.trunc_normal_(torch.empty(input_dim, output_dim), 
    std=1 / np.sqrt(input_dim), a=-3, b=3))

#### 搭建自定义模型

In [None]:
class Linear(nn.Module):
    """简单线性层"""
    def __init__(self, input_dim: int, output_dim: int):
        super().__init__() # 手动调用nn.Module父类的init
        self.weight = nn.Parameter(torch.randn(input_dim, output_dim) / np.sqrt(input_dim))
    def forward(self, x: torch.Tensor) -> torch.Tensor:
        return x @ self.weight


In [None]:
class Cruncher(nn.Module):
    def __init__(self, dim: int, num_layers: int):
        super().__init__()
        self.layers = nn.ModuleList([Linear(dim, dim) for i in range(num_layers)])
        self.final = Linear(dim, 1)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        # 通过线性层
        for layer in self.layers:
            x = layer(x)

        # 通过最终层
        x = self.final(x)
        
        # 去掉最后的维度
        x = x.squeeze(-1)
        
        return x

In [None]:
D = 64  # 隐藏层维度
num_layers = 2
model = Cruncher(dim=D, num_layers=num_layers)

param_sizes = [
    (name, param.numel()) # numel()返回tensor的元素总数
    for name, param in model.state_dict().items()
]
num_parameters = sum(param.numel() for param in model.parameters())

# 将模型部署到gpu
device = get_device()
model = model.to(device)

# 检验结果
B = 8  # Batch大小
x = torch.randn(B, D, device=device)
y = model(x)

为每个部分都设置随机种子，保证确定性复现结果

In [None]:
import random
seed = 0

torch.manual_seed(seed)
np.random.seed(seed)
random.seed(seed)

#### 加载数据 Data Loading
需要考虑非一次性加载完所有数据

In [None]:
data = np.memmap("data.npy", dtype=np.int32)

def get_batch(data: np.array, batch_size: int, sequence_length: int, device: str) -> torch.Tensor:
    # Sample batch_size random positions into data.
    start_indices = torch.randint(len(data) - sequence_length, (batch_size,))

    # Index into the data.
    x = torch.tensor([data[start:start + sequence_length] for start in start_indices])

    # 待完成......

B = 2  # Batch大小
L = 4  # 序列长度
x = get_batch(data, batch_size=B, sequence_length=L, device=get_device())


#### 优化器 Optimizer
- momentum = SGD + exponential averaging of grad
- AdaGrad = SGD + averaging by grad^2
- RMSProp = AdaGrad + exponentially averaging of grad^2
- Adam = RMSProp + momentum

In [None]:
# 实现AdaGrad
from typing import Iterable # 可在def定义出直接指明是迭代器，但不限定迭代器种类

class SGD(torch.optim.Optimizer):
    def __init__(self, params: Iterable[nn.Parameter], lr: float = 0.01):
        super(SGD, self).__init__(params, dict(lr=lr))
    def step(self):
        for group in self.param_groups:
            lr = group["lr"]
            for p in group["params"]:
                grad = p.grad.data
                p.data -= lr * grad

class AdaGrad(torch.optim.Optimizer):
    def __init__(self, params: Iterable[nn.Parameter], lr: float = 0.01):
        super(AdaGrad, self).__init__(params, dict(lr=lr))
    def step(self):
        for group in self.param_groups:
            lr = group["lr"]
            for p in group["params"]:
                # 优化器状态
                state = self.state[p]
                grad = p.grad.data
                # 获得梯度的平方g2
                g2 = state.get("g2", torch.zeros_like(grad))
                # 更新优化器状态
                g2 += torch.square(grad)
                state["g2"] = g2
                # 更新参数
                p.data -= lr * grad / torch.sqrt(g2 + 1e-5)


optimizer = AdaGrad(model.parameters(), lr=0.01)

In [None]:
# 优化器实际使用在loss.backward()后
x = torch.randn(B, D, device=get_device())
y = torch.tensor([4., 5.], device=device)
pred_y = model(x)
loss = F.mse_loss(input=pred_y, target=y)
loss.backward()

optimizer.step()
state = model.state_dict()  

# 清空内存
optimizer.zero_grad(set_to_none=True)

1. 计算模型的内存消耗：
- 参数：num_parameters = (D * D * num_layers) + D
- 激活值（h1,h2）：num_activations = B * D * num_layers
- 梯度：num_gradients = num_parameters
- AdaGrad优化器状态：num_optimizer_states = num_parameters

\
总共内存 = 4[用fp32] * (num_parameters + num_activations + num_gradients + num_optimizer_states)

2. 计算消耗： flops = 6 * B * num_parameters

Transformer的内存和计算消耗：
1. [Transformer内存消耗](https://erees.dev/transformer-memory/)
2. [Transformer计算消耗](https://www.adamcasson.com/posts/transformer-flops)

#### 训练 Train Loop

In [None]:
def train_loop():
    # 从linear function with weights (0, 1, 2, ..., D-1)生成数据
    D = 16
    true_w = torch.arange(D, dtype=torch.float32, device=get_device())
    def get_batch(B: int) -> tuple[torch.Tensor, torch.Tensor]:
        x = torch.randn(B, D).to(device)
        true_y = x @ true_w
        return (x, true_y)

In [None]:
def train(name: str, get_batch,
          D: int, num_layers: int,
          B: int, num_train_steps: int, lr: float):
    model = Cruncher(dim=D, num_layers=num_layers).to(get_device())
    optimizer = SGD(model.parameters(), lr=0.01)
    for _ in range(num_train_steps):
        # 获取数据
        x, y = get_batch(B=B)
        # 前向(计算loss)
        pred_y = model(x)
        loss = F.mse_loss(pred_y, y)
        # 反向(计算梯度)
        loss.backward()
        # 更新参数
        optimizer.step()
        optimizer.zero_grad(set_to_none=True)

# 做一次简单运行 
train("simple", get_batch, D=D, num_layers=0, B=4, num_train_steps=10, lr=0.01)

#### 检查点 Checkpoint
训练时间长，需要间断地保存模型和优化器参数到disk中。

In [None]:
model = Cruncher(dim=64, num_layers=3).to(get_device())
optimizer = AdaGrad(model.parameters(), lr=0.01)

# 保存checkpoint
checkpoint = {
    "model": model.state_dict(),
    "optimizer": optimizer.state_dict(),
}
torch.save(checkpoint, "model_checkpoint.pt")

# 加载checkpoint
loaded_checkpoint = torch.load("model_checkpoint.pt")

#### 混合精度训练
- 成熟的方案：前向用bf16、fp8，反向用fp32
- PyTorch有AMP库用于自动混合精度
- NVIDIA支持fp8用于线性层