# 线性神经网络

## 从零开始线性回归

In [None]:

import random
import torch
from torch import nn
import torchvision
from torchvision import transforms
import numpy as np

### 生成数据集

In [None]:
def synthetic_data(w,b,num_examples):
    '''生成y=Xw+b噪声'''
    X = torch.normal(0,1,(num_examples,len(w)))
    y = torch.matmul(X,w)+b
    y += torch.normal(0,0.01,y.shape)
    return X,y.reshape((-1,1))

true_w = torch.tensor([2,-3.4])
true_b = 4.2
features,labels = synthetic_data(true_w,true_b,1000)

In [None]:
print('features:', features[0],'\nlabel:', labels[0])

In [None]:
def data_iter(batch_size,features,labels):
    num_examples = len(features)
    indices = list(range(num_examples))
    random.shuffle(indices)
    for i in range(0,num_examples,batch_size):
        batch_indices = torch.tensor(
            indices[i:min(i+batch_size,num_examples)]
        )
        yield features[batch_indices],labels[batch_indices]

In [None]:
w = torch.normal(0,0.01,size=(2,1),requires_grad = True)
b = torch.zeros(1,requires_grad=True)

In [None]:
def linreg(X,w,b):
    return torch.matmul(X,w)+b


In [None]:
def squared_loss(y_hat,y):
    return (y_hat-y.reshape(y_hat.shape))**2/2

In [None]:
def sgd(params,lr,batch_size):
    with torch.no_grad():
        for param in params:
            param-=lr*param.grad/batch_size
            param.grad.zero_()

In [None]:
lr=0.03
num_epochs=3
net = linreg
loss = squared_loss
batch_size = 20
for epoch in range(num_epochs):
    for X,y in data_iter(batch_size,features,labels):
        l=loss(net(X,w,b),y)
        l.sum().backward()
        sgd([w,b],lr,batch_size)
    with torch.no_grad():
        train_l = loss(net(features,w,b),labels)
        print(f'epoch{epoch+1},loss {float(train_l.mean()):f}')

In [None]:
print(f'w的估计误差: {true_w - w.reshape(true_w.shape)}')
print(f'b的估计误差: {true_b - b}')

## 线性回归使用API简洁实现

In [None]:
from torch.utils import data
'''读取数据'''
def load_array(data_arrays,batch_size,is_train=True):
    dataset = data.TensorDataset(*data_arrays)
    return data.DataLoader(dataset,batch_size,shuffle=is_train)
batch_size = 10
data_iter = load_array((features,labels),batch_size)

In [None]:
next(iter(data_iter))

In [None]:
from torch import nn
net = nn.Sequential(nn.Linear(2,1))
net[0].weight.data.normal_(0,0.01)
net[0].bias.data.fill_(0)

In [None]:
loss = nn.MSELoss()

In [None]:
opt = torch.optim.SGD(net.parameters(),lr=0.03)

In [None]:
for epoch in range(num_epochs):
    for X,y in data_iter:
        l = loss(net(X),y)
        opt.zero_grad()
        l.backward()
        opt.step()
    l = loss(net(features),labels)
    print(f'epoch {epoch+1},loss {l:f}')

In [None]:
w = net[0].weight.data
print('w的估计误差：', true_w - w.reshape(true_w.shape))
b = net[0].bias.data
print('b的估计误差：', true_b - b)

## 图像分类数据集

In [None]:
trans = transforms.ToTensor()
mnist_train = torchvision.datasets.MNIST(
    root="../dataset",
    train = True,
    transform=trans,download=True)
mnist_test = torchvision.datasets.MNIST(
    root="../dataset",
    train = False,
    transform=trans,download=True)

In [None]:
train_iter = data.DataLoader(mnist_train,32,True)
test_iter = data.DataLoader(mnist_test,32,True)

In [None]:
for X, y in train_iter:
    print(X.shape, X.dtype, y.shape, y.dtype)
    break

## softmax从零实现

In [None]:

num_inputs = 784
num_outputs = 10

W = torch.normal(0, 0.01, size=(num_inputs, num_outputs), requires_grad=True)
b = torch.zeros(num_outputs, requires_grad=True)

In [None]:
def softmax(X):
    X_exp = torch.exp(X)
    partition = X_exp.sum(1, keepdim=True)
    return X_exp / partition  # 这里应用了广播机制

In [None]:
def net(X):
    return softmax(torch.matmul(X.reshape((-1, W.shape[0])), W) + b)

In [None]:
def cross_entropy(y_hat,y):
    return -torch.log(y_hat[range(len(y_hat)),y])

In [None]:
def accuracy(y_hat,y):
    if len(y_hat.shape)>1 and y_hat.shape[1]>1:
        y_hat = y_hat.argmax(axis=1)
    cmp = y_hat.type(y.dtype)==y
    return float(cmp.type(y.dtype).sum())

In [None]:
class Accumulator:  #@save
    """在n个变量上累加"""
    def __init__(self, n):
        self.data = [0.0] * n

    def add(self, *args):
        self.data = [a + float(b) for a, b in zip(self.data, args)]

    def reset(self):
        self.data = [0.0] * len(self.data)

    def __getitem__(self, idx):
        return self.data[idx]

In [None]:
def evaluate_accuracy(net, data_iter):  #@save
    """计算在指定数据集上模型的精度"""
    if isinstance(net, torch.nn.Module):
        net.eval()  # 将模型设置为评估模式
    metric = Accumulator(2)  # 正确预测数、预测总数
    with torch.no_grad():
        for X, y in data_iter:
            metric.add(accuracy(net(X), y), y.numel())
    return metric[0] / metric[1]

In [None]:
def train_epoch_ch3(net, train_iter, loss, updater):  #@save
    """训练模型一个迭代周期（定义见第3章）"""
    # 将模型设置为训练模式
    if isinstance(net, torch.nn.Module):
        net.train()
    # 训练损失总和、训练准确度总和、样本数
    metric = Accumulator(3)
    for X, y in train_iter:
        # 计算梯度并更新参数
        y_hat = net(X)
        l = loss(y_hat, y)
        if isinstance(updater, torch.optim.Optimizer):
            # 使用PyTorch内置的优化器和损失函数
            updater.zero_grad()
            l.mean().backward()
            updater.step()
        else:
            # 使用定制的优化器和损失函数
            l.sum().backward()
            updater(X.shape[0])
        metric.add(float(l.sum()), accuracy(y_hat, y), y.numel())
    # 返回训练损失和训练精度
    return metric[0] / metric[2], metric[1] / metric[2]

In [None]:
def train_ch3(net, train_iter, test_iter, loss, num_epochs, updater):  #@save
    """训练模型（定义见第3章）"""
    for epoch in range(num_epochs):
        train_metrics = train_epoch_ch3(net, train_iter, loss, updater)
        test_acc = evaluate_accuracy(net, test_iter)
    train_loss, train_acc = train_metrics
    assert train_loss < 0.5, train_loss
    assert train_acc <= 1 and train_acc > 0.7, train_acc
    assert test_acc <= 1 and test_acc > 0.7, test_acc

In [None]:
lr = 0.1

def updater(batch_size):
    return sgd([W, b], lr, batch_size)

In [None]:
num_epochs = 10
train_ch3(net, train_iter, test_iter, cross_entropy, num_epochs, updater)

In [None]:
def predict_ch3(net,test_iter,n=6):
    for X,y in test_iter:
        break
    true = y
    pred = net(X).argmax(axis=1)
    print(true)
    print(pred)

In [None]:
predict_ch3(net,test_iter)

## softmax简洁实现

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [None]:
net = nn.Sequential(nn.Flatten(),nn.Linear(784,10))

def init_weights(m):
    if type(m)==nn.Linear:
        nn.init.normal_(m.weight,std=0.01)
net.apply(init_weights)
net.to(device)

In [None]:
loss=nn.CrossEntropyLoss(reduction='None')
loss.to(device)

In [None]:
opt = torch.optim.SGD(net.parameters(),lr=0.1)

In [None]:
def evaluate_accuracy(net, data_iter):  #@save
    """计算在指定数据集上模型的精度"""
    if isinstance(net, torch.nn.Module):
        net.eval()  # 将模型设置为评估模式
    metric = Accumulator(2)  # 正确预测数、预测总数
    with torch.no_grad():
        for X, y in data_iter:
            metric.add(accuracy(net(X.to(device)).detach().cpu(), y), y.numel())
    return metric[0] / metric[1]

In [None]:
def train_epoch_ch3(net, train_iter, loss, updater):  #@save
    """训练模型一个迭代周期（定义见第3章）"""
    # 将模型设置为训练模式
    if isinstance(net, torch.nn.Module):
        net.train()
    # 训练损失总和、训练准确度总和、样本数
    metric = Accumulator(3)
    for X, y in train_iter:
        # 计算梯度并更新参数
        y_hat = net(X.to(device))
        l = loss(y_hat, y.to(device))
        if isinstance(updater, torch.optim.Optimizer):
            # 使用PyTorch内置的优化器和损失函数
            updater.zero_grad()
            l.mean().backward()
            updater.step()
        else:
            # 使用定制的优化器和损失函数
            l.sum().backward()
            updater(X.shape[0])
        metric.add(float(l.sum()), accuracy(y_hat.detach().cpu(), y.detach().cpu()), y.detach().cpu().numel())
    # 返回训练损失和训练精度
    return metric[0] / metric[2], metric[1] / metric[2]

In [None]:
def train_ch3(net, train_iter, test_iter, loss, num_epochs, updater):  #@save
    """训练模型（定义见第3章）"""
    for epoch in range(num_epochs):
        train_metrics = train_epoch_ch3(net, train_iter, loss, updater)
        test_acc = evaluate_accuracy(net, test_iter)
    train_loss, train_acc = train_metrics
    assert train_loss < 0.5, train_loss
    assert train_acc <= 1 and train_acc > 0.7, train_acc
    assert test_acc <= 1 and test_acc > 0.7, test_acc

In [None]:
num_epochs = 5
train_ch3(net, train_iter, test_iter, cross_entropy, num_epochs, updater)

In [None]:
def predict_ch3(net,test_iter,n=6):
    for X,y in test_iter:
        break
    true = y
    pred = net(X.to(device)).detach().cpu().argmax(axis=1)
    print(true)
    print(pred)

In [None]:
predict_ch3(net,test_iter)

# 多层感知机

## 多层感知机从零开始

In [1]:
%matplotlib inline
import torch
from torch import nn
from torch.utils import data
import torchvision
from torchvision import transforms
from matplotlib import pyplot as plt

In [2]:
# 超参数
bs = 16
lr = 0.1
epoch = 10
num_inputs = 784
num_outputs = 10
num_hiddens = 256

In [3]:
#数据初始化
trans = transforms.ToTensor()
train_Data = torchvision.datasets.MNIST(
    root = "../dataset/",
    train = True,
    download=True,
    transform=trans)
test_Data = torchvision.datasets.MNIST(
    root = "../dataset/",
    train = False,
    download=True,
    transform=trans)
train_iter = data.DataLoader(train_Data,bs,True)
test_iter = data.DataLoader(test_Data,bs,False)

In [4]:
# 权重初始化
num_inputs, num_outputs, num_hiddens = 784, 10, 256

W1 = nn.Parameter(torch.randn(
    num_inputs, num_hiddens, requires_grad=True) * 0.01)
b1 = nn.Parameter(torch.zeros(num_hiddens, requires_grad=True))
W2 = nn.Parameter(torch.randn(
    num_hiddens, num_outputs, requires_grad=True) * 0.01)
b2 = nn.Parameter(torch.zeros(num_outputs, requires_grad=True))

params = [W1, b1, W2, b2]

In [5]:
def relu(X):
    a = torch.zeros_like(X)
    return torch.max(X,a)

In [6]:
def net(X):
    X = X.reshape((-1, num_inputs))
    H = relu(X@W1 + b1)  # 这里“@”代表矩阵乘法
    return (H@W2 + b2)

In [7]:
loss = nn.CrossEntropyLoss(reduction='none')

In [8]:
opt = torch.optim.SGD(params,lr)

In [9]:
losses=[]
for e in range(epoch):
    for X,y in train_iter:
        y_hat = net(X)
        ls = loss(y_hat,y)

        opt.zero_grad()
        ls.mean().backward()
        opt.step()

        losses.append(ls.mean().detach().numpy())

    #计算平均
    avg_ls = sum(losses[-len(train_Data):])/len(train_Data)
    print(f"epoch:{e+1},loss:{avg_ls}")




epoch:1,loss:0.018360688431886957
epoch:2,loss:0.02544479181638259
epoch:3,loss:0.03029485331397009
epoch:4,loss:0.03391813324080673
epoch:5,loss:0.03669297842666905
epoch:6,loss:0.0389666342414633
epoch:7,loss:0.040689516837054907
epoch:8,loss:0.04205986548701003
epoch:9,loss:0.043110383928773775
epoch:10,loss:0.04389703679359182


In [12]:
with torch.no_grad():
    for X,y in test_iter:
        y_hat = net(X)
        res = y_hat.argmax(1)
        print(y)
        print(res)
        break

tensor([7, 2, 1, 0, 4, 1, 4, 9, 5, 9, 0, 6, 9, 0, 1, 5])
tensor([7, 2, 1, 0, 4, 1, 4, 9, 5, 9, 0, 6, 9, 0, 1, 5])


## 多层感知机简洁实现

In [11]:
%matplotlib inline
import torch
from torch import nn
from torch.utils import data
import torchvision
from torchvision import transforms
from matplotlib import pyplot as plt

In [13]:
# 超参数
bs = 16
lr = 0.1
epoch = 10
num_inputs = 784
num_outputs = 10
num_hiddens = 256
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [14]:
#数据初始化
trans = transforms.ToTensor()
train_Data = torchvision.datasets.MNIST(
    root = "../dataset/",
    train = True,
    download=True,
    transform=trans)
test_Data = torchvision.datasets.MNIST(
    root = "../dataset/",
    train = False,
    download=True,
    transform=trans)
train_iter = data.DataLoader(train_Data,bs,True)
test_iter = data.DataLoader(test_Data,bs,False)

In [15]:
net = nn.Sequential(
    nn.Flatten(),
    nn.Linear(784,256),
    nn.ReLU(),
    nn.Linear(256,10)
)

In [16]:
# 初始化权重
def init_weights(m):
    if type(m) == nn.Linear:
        nn.init.normal_(m.weight,std=0.01)

net.apply(init_weights)

Sequential(
  (0): Flatten(start_dim=1, end_dim=-1)
  (1): Linear(in_features=784, out_features=256, bias=True)
  (2): ReLU()
  (3): Linear(in_features=256, out_features=10, bias=True)
)

In [21]:
loss = nn.CrossEntropyLoss(reduction='none')

In [22]:
opt = torch.optim.SGD(net.parameters(),lr)

In [24]:
losses=[]
net.to(device)
for e in range(epoch):
    for X,y in train_iter:
        y_hat = net(X.to(device))
        ls = loss(y_hat,y.to(device))

        opt.zero_grad()
        ls.mean().backward()
        opt.step()

        losses.append(ls.mean().detach().cpu().numpy())

    #计算平均
    avg_ls = sum(losses[-len(train_Data):])/len(train_Data)
    print(f"epoch:{e+1},loss:{avg_ls}")

epoch:1,loss:0.01848777808760448
epoch:2,loss:0.025609011399627585
epoch:3,loss:0.030505573893068745
epoch:4,loss:0.03410685307652699
epoch:5,loss:0.03688734450017946
epoch:6,loss:0.039101340002926005
epoch:7,loss:0.04080757116000398
epoch:8,loss:0.042185578079716346
epoch:9,loss:0.0432394524592976
epoch:10,loss:0.044033875508109534


In [25]:
with torch.no_grad():
    for X,y in test_iter:
        y_hat = net(X.to(device))
        res = y_hat.detach().cpu().argmax(1)
        print(y)
        print(res)
        break

tensor([7, 2, 1, 0, 4, 1, 4, 9, 5, 9, 0, 6, 9, 0, 1, 5])
tensor([7, 2, 1, 0, 4, 1, 4, 9, 5, 9, 0, 6, 9, 0, 1, 5])


## 权重衰减

In [None]:

trainer = torch.optim.SGD([{"params":net[0].weight,'weight_decay':3},
                               {"params":net[0].bias}],lr)

## Dropout

In [None]:
net = nn.Sequential(
    nn.Flatten(),
    nn.ReLU(),

    nn.Dropout(0.2),
    nn.Linear(256,256),
    nn.ReLU(),

    nn.Dropout(0.3),
    nn.Linear(256,10)
)
def init_weights(m):
    if type(m)==nn.Linear:
        nn.init.normal_(m.weight,std=0.01)
net.apply(init_weights)

## kaggle房价预测

In [None]:
import pandas as pd
#数据预处理
#将所有缺失的值替换为相应特征的平均值。
#为了将所有特征放在一个共同的尺度上， 我们通过将特征重新缩放到零均值和单位方差来标准化数据
# 若无法获得测试数据，则可根据训练数据计算均值和标准差
all_features = pd.concat((train_data.iloc[:, 1:-1], test_data.iloc[:, 1:]))
numeric_features = all_features.dtypes[all_features.dtypes != 'object'].index
all_features[numeric_features] = all_features[numeric_features].apply(
    lambda x: (x - x.mean()) / (x.std()))
# 在标准化数据之后，所有均值消失，因此我们可以将缺失值设置为0
all_features[numeric_features] = all_features[numeric_features].fillna(0)

In [None]:
#处理缺失值
# “Dummy_na=True”将“na”（缺失值）视为有效的特征值，并为其创建指示符特征
all_features = pd.get_dummies(all_features, dummy_na=True)
all_features.shape

In [None]:
n_train = train_data.shape[0]
#通过values属性，我们可以 从pandas格式中提取NumPy格式，并将其转换为张量表示用于训练
train_features = torch.tensor(all_features[:n_train].values, dtype=torch.float32)
test_features = torch.tensor(all_features[n_train:].values, dtype=torch.float32)
train_labels = torch.tensor(
    train_data.SalePrice.values.reshape(-1, 1), dtype=torch.float32)

In [None]:
#训练
loss = nn.MSELoss()
in_features = train_features.shape[1]
def get_net():
    net = nn.Sequential(nn.Linear(in_features,1))
    return net

In [None]:
def log_rmse(net,features,labels):
    # 为了在取对数时进一步稳定该值，将小于1的值设置为1
    clipped_preds = torch.clamp(net(features),1,float('inf'))
    rmse = torch.sqrt(loss(torch.log(clipped_preds),torch.log(labels)))
    return rmse.item()

In [None]:
def train(net,train_features,train_labels,test_features,test_lables,num_epochs,lr,wd,bs):
    train_ls,test_ls = [],[]
    train_iter = load_array((train_features,train_labels),bs)
    opt = torch.optim.Adam(net.parameters(),lr,weight_decay=wd)

    for e in range(num_epochs):
        for X,y in train_iter:
            opt.zero_grad()
            l = loss(net(X),y)
            l.backward()
            opt.step()
        train_ls.append(log_rmse(net,train_features,train_labels))
        if test_lables is not None:
            test_ls.append(log_rmse(net,test_features,test_lables))
    return train_ls,test_ls

In [None]:
# k交叉验证
def get_k_fold_data(k,i,x,y):
    fold_size=X.shape[0]
    X_train,y_train = None,None

    for j in range(k):
        idx = slice(j*fold_size,(j+1)*fold_size)
        X_part,y_part = X[idx,:],y[idx]
        if j ==i:
            X_valid,y_valid =  X_part,y_part
        elif X_train is None:
            X_train,y_train = X_part,y_part
        else:
            X_train = torch.cat([X_train, X_part], 0)
            y_train = torch.cat([y_train, y_part], 0)
    return X_train, y_train, X_valid, y_valid

In [None]:
def k_fold(k, X_train, y_train, num_epochs, learning_rate, weight_decay,
           batch_size):
    train_l_sum, valid_l_sum = 0, 0
    for i in range(k):
        data = get_k_fold_data(k, i, X_train, y_train)
        net = get_net()
        train_ls, valid_ls = train(net, *data, num_epochs, learning_rate,
                                   weight_decay, batch_size)
        train_l_sum += train_ls[-1]
        valid_l_sum += valid_ls[-1]
        if i == 0:
            d2l.plot(list(range(1, num_epochs + 1)), [train_ls, valid_ls],
                     xlabel='epoch', ylabel='rmse', xlim=[1, num_epochs],
                     legend=['train', 'valid'], yscale='log')
        print(f'折{i + 1}，训练log rmse{float(train_ls[-1]):f}, '
              f'验证log rmse{float(valid_ls[-1]):f}')
    return train_l_sum / k, valid_l_sum / k