# 线性回归从零开始实现

## Step1 setup

In [1]:
%matplotlib inline

In [2]:
import sys
import os

# 获取当前 Notebook 所在目录（通常是 notebooks 文件夹）
notebook_dir = os.getcwd()

# 定位到父目录（假设 notebooks 和 utils 是同级文件夹）
parent_dir = os.path.dirname(notebook_dir)

# 拼接 utils 文件夹的路径
utils_path = os.path.join(parent_dir, 'utils')

# 将 utils 路径添加到系统路径
if utils_path not in sys.path:
    sys.path.append(utils_path)

## Step2 构造数据集

In [3]:
import torch

def synthetic_data(w, b, num_examples):
    """
    生成 "y = Xw + b + noise" 数据
    """
    X = torch.normal(0, 1, (num_examples, len(w)))
    y = X @ w + b
    y += torch.normal(0, 0.01, y.shape)
    return X, y.reshape((-1, 1))

In [4]:
TRUE_W = torch.tensor([2, -3.4])
TRUE_B = 4.2
NUM_EXAMPLES = 1000

features, labels = synthetic_data(TRUE_W, TRUE_B, NUM_EXAMPLES)

In [None]:
# 检查前 10 条数据
features[:10], labels[:10]

(tensor([[-1.0260,  0.6343],
         [-2.5431, -1.4004],
         [-0.4444, -1.1253],
         [-0.8808,  2.4135],
         [-1.1307,  0.3248],
         [ 0.1893, -1.8623],
         [ 0.8987,  0.4220],
         [-0.4182,  0.1205],
         [-1.9245, -0.1952],
         [ 0.3117, -0.6585]]),
 tensor([[ 7.9285e-03],
         [ 3.8824e+00],
         [ 7.1383e+00],
         [-5.7783e+00],
         [ 8.5159e-01],
         [ 1.0914e+01],
         [ 4.5698e+00],
         [ 2.9527e+00],
         [ 1.0043e+00],
         [ 7.0573e+00]]))

In [6]:
import random

def data_iter(features, labels, BATCH_SIZE):
    num_examples = len(features)
    indices = list(range(num_examples))
    # 这些样本是随机读取的，没有特定的顺序
    random.shuffle(indices)
    
    for i in range(0, num_examples, BATCH_SIZE):
        batch_indices = torch.tensor(
            indices[i: min(i + BATCH_SIZE, num_examples)]
        )
        yield features[batch_indices], labels[batch_indices]

## Step3 构造线性回归模型

In [7]:
w = torch.normal(0, 0.01, size=(2, 1), requires_grad=True)
b = torch.zeros(1, requires_grad=True)

In [8]:
def linreg(X, w, b):
    """
    线性回归模型
    """
    return X @ w + b

## Step4 定义损失函数

In [9]:
def squared_LOSS(y_hat, y):
    """
    均方损失
    """
    return (y_hat - y.reshape(y_hat.shape)) ** 2 / 2

## Step5 定义优化算法

In [10]:
def sgd(params, LR, BATCH_SIZE):
    """
    小批量随机梯度下降
    """
    with torch.no_grad():
        for param in params:
            param -= LR * param.grad / BATCH_SIZE
            param.grad.zero_()

## Step6 训练

In [11]:
LR = 0.03
NUM_EPOCHS = 3
NET = linreg
LOSS = squared_LOSS
BATCH_SIZE = 10

for epoch in range(NUM_EPOCHS):
    for X, y in data_iter(features, labels, BATCH_SIZE):
        l = LOSS(NET(X, w, b), y)  # X 和 y 的小批量损失
        # 因为 l 形状是 (BATCH_SIZE, 1)，而不是一个标量。l 中的所有元素被加到一起
        # 并以此计算关于 [w, b] 的梯度
        l.sum().backward()
        sgd([w, b], LR, BATCH_SIZE)  # 使用参数的梯度更新参数
    
    with torch.no_grad():
        train_l = LOSS(NET(features, w, b), labels)
        print(f'epoch {epoch + 1}, LOSS {float(train_l.mean()):f}')

epoch 1, LOSS 0.039583
epoch 2, LOSS 0.000149
epoch 3, LOSS 0.000048


In [None]:
print(f'w 的估计误差: {TRUE_W - w.reshape(TRUE_W.shape)}')
print(f'b 的估计误差: {TRUE_B - b}')

w的估计误差: tensor([ 0.0005, -0.0004], grad_fn=<SubBackward0>)
b的估计误差: tensor([-0.0003], grad_fn=<RsubBackward1>)


# 线性回归 PyTorch 实现

## Step1 数据集

In [13]:
from torch.utils import data

def load_array(data_arrays, BATCH_SIZE, is_train=True):
    """
    构造一个PyTorch数据迭代器
    """
    dataset = data.TensorDataset(*data_arrays)
    return data.DataLoader(dataset, BATCH_SIZE, shuffle=is_train)

In [14]:
BATCH_SIZE = 10
data_iter = load_array((features, labels), BATCH_SIZE)

In [15]:
# 检查第一批数据
next(iter(data_iter))

[tensor([[ 0.5981, -0.4547],
         [ 1.3402,  0.8650],
         [ 1.6333, -1.1153],
         [-1.8011,  1.4902],
         [ 3.0702,  0.5550],
         [ 1.2768, -0.1563],
         [ 0.6881, -2.1662],
         [ 1.1312,  0.7483],
         [ 0.6461, -0.5702],
         [-1.9115, -0.7485]]),
 tensor([[ 6.9540],
         [ 3.9414],
         [11.2608],
         [-4.4680],
         [ 8.4593],
         [ 7.2909],
         [12.9265],
         [ 3.9152],
         [ 7.4444],
         [ 2.9248]])]

## Step2 定义模型

In [16]:
from torch import nn

net = nn.Sequential(nn.Linear(2, 1))

# 初始化模型参数
net[0].weight.data.normal_(0, 0.01)
net[0].bias.data.fill_(0)

tensor([0.])

## Step3 定义损失函数

In [17]:
loss = nn.MSELoss()

## Step4 定义优化算法

In [18]:
trainer = torch.optim.SGD(net.parameters(), lr=0.03)

## Step5 训练

In [19]:
NUM_EPOCHS = 3

for epoch in range(NUM_EPOCHS):
    for X, y in data_iter:
        l = loss(net(X) ,y)
        trainer.zero_grad()
        l.backward()
        trainer.step()
    
    l = loss(net(features), labels)
    print(f'epoch {epoch + 1}, loss {l:f}')

epoch 1, loss 0.000275
epoch 2, loss 0.000096
epoch 3, loss 0.000095


In [None]:
w = net[0].weight.data
print('w 的估计误差：', TRUE_W - w.reshape(TRUE_W.shape))
b = net[0].bias.data
print('b 的估计误差：', TRUE_B - b)

w的估计误差： tensor([4.0758e-04, 1.5497e-05])
b的估计误差： tensor([-0.0002])
