# 3-2,中阶API示范

下面的范例使用Pytorch的中阶API实现线性回归模型和和DNN二分类模型。

Pytorch的中阶API主要包括各种模型层，损失函数，优化器，数据管道等等。

In [None]:
import datetime  # 导入datetime模块，用于处理日期和时间
import os  # 导入os模块，用于操作操作系统功能


# 定义一个函数printbar，用于打印当前时间的分隔线
def printbar():
    # 获取当前时间，并以指定格式进行格式化
    nowtime = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')

    # 打印分隔线，使用等号字符多次重复
    print("\n" + "==========" * 8 + "%s" % nowtime)


# 在mac系统上，由于某些库的问题，需要设置环境变量以允许重复加载库
os.environ["KMP_DUPLICATE_LIB_OK"] = "TRUE"


In [None]:
import torch

print("torch.__version__=" + torch.__version__)


### 一，线性回归模型

**1，准备数据**

In [None]:
from matplotlib import pyplot as plt  # 导入matplotlib库中的pyplot模块，用于绘图
import torch  # 导入PyTorch库
from torch import nn  # 导入PyTorch中的神经网络模块
from torch.utils.data import DataLoader, TensorDataset  # 导入PyTorch中的数据加载和处理工具

# 样本数量
n = 400  # 定义一个变量n，表示样本数量

# 生成测试用数据集
X = 10 * torch.rand([n, 2]) - 5.0  # 生成一个形状为[n, 2]的随机数据张量X，数值范围在[-5.0, 5.0]之间
w0 = torch.tensor([[2.0], [-3.0]])  # 创建一个权重张量w0，形状为[2, 1]，用于线性关系
b0 = torch.tensor([[10.0]])  # 创建一个偏置张量b0，形状为[1, 1]，用于线性关系
Y = X @ w0 + b0 + torch.normal(0.0, 2.0, size=[n, 1])
# 通过线性关系生成目标值Y，@表示矩阵乘法，最后加上正态分布的噪声扰动

# 这段代码的主要功能是生成一个测试用的数据集X和对应的目标值Y，其中X是随机生成的，Y是根据线性关系计算得到的，同时加入了正态分布的噪声扰动。


In [None]:
# 数据可视化
%config InlineBackend.figure_format = 'svg'  # 设置绘图的格式为SVG矢量图，使图形更清晰

# 创建一个包含两个子图的图形，指定图形的大小为12x5英寸
plt.figure(figsize=(12, 5))

# 创建第一个子图(ax1)，放置在1x2的图形中的左侧
ax1 = plt.subplot(121)

# 绘制散点图，X[:, 0]表示X中的第一列特征，Y[:, 0]表示对应的目标值Y，散点颜色为蓝色，添加标签"samples"
ax1.scatter(X[:, 0], Y[:, 0], c="b", label="samples")

# 添加图例
ax1.legend()

# 设置x轴标签为"x1"，y轴标签为"y"，rotation=0表示不旋转y轴标签
plt.xlabel("x1")
plt.ylabel("y", rotation=0)

# 创建第二个子图(ax2)，放置在1x2的图形中的右侧
ax2 = plt.subplot(122)

# 绘制散点图，X[:, 1]表示X中的第二列特征，Y[:, 0]表示对应的目标值Y，散点颜色为绿色，添加标签"samples"
ax2.scatter(X[:, 1], Y[:, 0], c="g", label="samples")

# 添加图例
ax2.legend()

# 设置x轴标签为"x2"，y轴标签为"y"，rotation=0表示不旋转y轴标签
plt.xlabel("x2")
plt.ylabel("y", rotation=0)

# 显示图形
plt.show()


In [None]:
# 构建输入数据管道

# 创建一个TensorDataset，将输入特征X和目标值Y合并为一个数据集
ds = TensorDataset(X, Y)

# 创建一个DataLoader，用于批量加载数据
# - ds: 数据集
# - batch_size: 每个批次的样本数量，这里设置为10，即每个批次包含10个样本
# - shuffle: 是否在每个epoch中随机打乱数据集，这里设置为True，表示打乱数据
# - num_workers: 用于数据加载的并行处理数量，这里设置为2，可以加速数据加载
dl = DataLoader(ds, batch_size=10, shuffle=True, num_workers=2)


**2，定义模型**

In [None]:
# 创建线性回归模型
model = nn.Linear(2, 1)
# - nn.Linear(2, 1) 创建一个线性层，输入特征的维度为2，输出维度为1

# 设置损失函数为均方误差损失（MSE）
model.loss_fn = nn.MSELoss()
# - nn.MSELoss() 创建一个均方误差损失函数，用于衡量模型预测值与真实值之间的差距

# 创建一个随机梯度下降（SGD）优化器，用于更新模型参数
model.optimizer = torch.optim.SGD(model.parameters(), lr=0.01)
# - torch.optim.SGD(model.parameters(), lr=0.01) 创建一个SGD优化器，传入模型参数model.parameters()，
#   设置学习率为0.01，用于更新模型的权重和偏置以最小化损失函数


**3，训练模型**

In [None]:
# 定义训练步骤函数train_step，用于训练模型
def train_step(model, features, labels):
    # 前向传播：计算模型的预测值
    predictions = model(features)

    # 计算损失：使用模型的损失函数计算预测值与真实值之间的误差
    loss = model.loss_fn(predictions, labels)

    # 反向传播：计算损失关于模型参数的梯度
    loss.backward()

    # 更新模型参数：使用优化器更新模型的权重和偏置
    model.optimizer.step()

    # 清零梯度：重置梯度，以便下一次迭代
    model.optimizer.zero_grad()

    # 返回损失值（以便记录训练过程中的损失）
    return loss.item()


# 测试train_step函数的效果
features, labels = next(iter(dl))  # 从数据管道中加载一个批次的特征和标签
train_step(model, features, labels)  # 使用train_step函数训练模型，并返回损失值

In [None]:
# 定义训练模型的函数train_model
def train_model(model, epochs):
    # 遍历指定的训练轮数（epochs）
    for epoch in range(1, epochs + 1):
        # 遍历数据管道中的每个批次数据
        for features, labels in dl:
            # 调用训练步骤函数train_step来训练模型，并获取损失值
            loss = train_step(model, features, labels)

        # 每隔10个epoch打印一次训练信息
        if epoch % 10 == 0:
            # 打印分隔线
            printbar()

            # 获取当前模型的权重和偏置参数
            w = model.state_dict()["weight"]
            b = model.state_dict()["bias"]

            # 打印当前训练轮数（epoch）和损失值（loss）
            print("epoch =", epoch, "loss =", loss)

            # 打印模型的权重和偏置
            print("w =", w)
            print("b =", b)


# 调用train_model函数，进行50轮的模型训练
train_model(model, epochs=50)

In [None]:
# 结果可视化
%config InlineBackend.figure_format = 'svg'  # 设置绘图的格式为SVG矢量图，使图形更清晰

# 获取训练后的模型权重和偏置
w, b = model.state_dict()["weight"], model.state_dict()["bias"]

# 创建一个包含两个子图的图形，指定图形的大小为12x5英寸
plt.figure(figsize=(12, 5))

# 创建第一个子图(ax1)，放置在1x2的图形中的左侧
ax1 = plt.subplot(121)

# 绘制散点图，X[:, 0]表示X中的第一列特征，Y[:, 0]表示对应的目标值Y，散点颜色为蓝色，添加标签"samples"
ax1.scatter(X[:, 0], Y[:, 0], c="b", label="samples")

# 绘制模型的拟合曲线，使用红色实线表示，线宽为5.0
ax1.plot(X[:, 0], w[0, 0] * X[:, 0] + b[0], "-r", linewidth=5.0, label="model")

# 添加图例
ax1.legend()

# 设置x轴标签为"x1"，y轴标签为"y"，rotation=0表示不旋转y轴标签
plt.xlabel("x1")
plt.ylabel("y", rotation=0)

# 创建第二个子图(ax2)，放置在1x2的图形中的右侧
ax2 = plt.subplot(122)

# 绘制散点图，X[:, 1]表示X中的第二列特征，Y[:, 0]表示对应的目标值Y，散点颜色为绿色，添加标签"samples"
ax2.scatter(X[:, 1], Y[:, 0], c="g", label="samples")

# 绘制模型的拟合曲线，使用红色实线表示，线宽为5.0
ax2.plot(X[:, 1], w[0, 1] * X[:, 1] + b[0], "-r", linewidth=5.0, label="model")

# 添加图例
ax2.legend()

# 设置x轴标签为"x2"，y轴标签为"y"，rotation=0表示不旋转y轴标签
plt.xlabel("x2")
plt.ylabel("y", rotation=0)

# 显示图形
plt.show()


### 二， DNN二分类模型

**1，准备数据**

In [None]:
import numpy as np
from matplotlib import pyplot as plt
import torch
from torch.utils.data import DataLoader, TensorDataset

# 在Jupyter Notebook中使用内联绘图，以及设置绘图的格式为SVG矢量图
%matplotlib inline
%config InlineBackend.figure_format = 'svg'

# 正负样本数量
n_positive, n_negative = 2000, 2000

# 生成正样本，小圆环分布
r_p = 5.0 + torch.normal(0.0, 1.0, size=[n_positive, 1])  # 生成半径，加上正态分布的噪声
theta_p = 2 * np.pi * torch.rand([n_positive, 1])  # 生成角度
Xp = torch.cat([r_p * torch.cos(theta_p), r_p * torch.sin(theta_p)], axis=1)  # 构建正样本的坐标
Yp = torch.ones_like(r_p)  # 生成正样本的标签，全为1

# 生成负样本，大圆环分布
r_n = 8.0 + torch.normal(0.0, 1.0, size=[n_negative, 1])  # 生成半径，加上正态分布的噪声
theta_n = 2 * np.pi * torch.rand([n_negative, 1])  # 生成角度
Xn = torch.cat([r_n * torch.cos(theta_n), r_n * torch.sin(theta_n)], axis=1)  # 构建负样本的坐标
Yn = torch.zeros_like(r_n)  # 生成负样本的标签，全为0

# 汇总样本
X = torch.cat([Xp, Xn], axis=0)  # 拼接正负样本的特征
Y = torch.cat([Yp, Yn], axis=0)  # 拼接正负样本的标签

# 可视化
plt.figure(figsize=(6, 6))
plt.scatter(Xp[:, 0], Xp[:, 1], c="r")  # 绘制正样本，颜色为红色
plt.scatter(Xn[:, 0], Xn[:, 1], c="g")  # 绘制负样本，颜色为绿色
plt.legend(["positive", "negative"])  # 添加图例标签


In [None]:
# 构建输入数据管道

# 创建一个TensorDataset，将特征X和标签Y合并为一个数据集
ds = TensorDataset(X, Y)

# 创建一个DataLoader，用于批量加载数据
# - ds: 数据集
# - batch_size: 每个批次的样本数量，这里设置为10，即每个批次包含10个样本
# - shuffle: 是否在每个epoch中随机打乱数据集，这里设置为True，表示打乱数据
# - num_workers: 用于数据加载的并行处理数量，这里设置为2，可以加速数据加载
dl = DataLoader(ds, batch_size=10, shuffle=True, num_workers=2)


**2, 定义模型**

In [None]:
import torch
from torch import nn
import torch.nn.functional as F


# 定义一个深度神经网络模型
class DNNModel(nn.Module):
    def __init__(self):
        super(DNNModel, self).__init__()
        # 定义神经网络的层结构
        self.fc1 = nn.Linear(2, 4)  # 第一个全连接层，输入维度2，输出维度4
        self.fc2 = nn.Linear(4, 8)  # 第二个全连接层，输入维度4，输出维度8
        self.fc3 = nn.Linear(8, 1)  # 第三个全连接层，输入维度8，输出维度1

    # 正向传播函数
    def forward(self, x):
        x = F.relu(self.fc1(x))  # 使用ReLU激活函数进行非线性变换
        x = F.relu(self.fc2(x))  # 使用ReLU激活函数进行非线性变换
        y = nn.Sigmoid()(self.fc3(x))  # 使用Sigmoid激活函数输出概率值
        return y

    # 损失函数
    def loss_fn(self, y_pred, y_true):
        return nn.BCELoss()(y_pred, y_true)  # 使用二分类交叉熵损失函数

    # 评估函数（准确率）
    def metric_fn(self, y_pred, y_true):
        y_pred = torch.where(y_pred > 0.5, torch.ones_like(y_pred, dtype=torch.float32),
                             torch.zeros_like(y_pred, dtype=torch.float32))
        acc = torch.mean(1 - torch.abs(y_true - y_pred))
        return acc

    # 优化器属性，用于创建优化器
    @property
    def optimizer(self):
        return torch.optim.Adam(self.parameters(), lr=0.001)  # 使用Adam优化器，学习率为0.001


# 创建一个DNNModel实例
model = DNNModel()


In [None]:
# 从数据管道中加载一个批次的特征和标签
(features, labels) = next(iter(dl))

# 使用模型进行前向传播，获取预测结果
predictions = model(features)

# 计算模型的初始损失
loss = model.loss_fn(predictions, labels)

# 计算模型的初始评估指标（准确率）
metric = model.metric_fn(predictions, labels)

# 打印初始损失和评估指标
print("init loss:", loss.item())  # 打印初始损失值
print("init metric:", metric.item())  # 打印初始评估指标


**3，训练模型**

In [None]:
# 定义训练步骤函数train_step
def train_step(model, features, labels):
    # 正向传播求损失
    predictions = model(features)  # 使用模型进行前向传播，获取预测结果
    loss = model.loss_fn(predictions, labels)  # 计算损失
    metric = model.metric_fn(predictions, labels)  # 计算评估指标（准确率）

    # 反向传播求梯度
    loss.backward()  # 计算损失关于模型参数的梯度

    # 更新模型参数
    model.optimizer.step()  # 使用优化器更新模型参数
    model.optimizer.zero_grad()  # 清零梯度，以便下一次迭代

    return loss.item(), metric.item()


# 测试train_step函数的效果
features, labels = next(iter(dl))  # 从数据管道中加载一个批次的特征和标签
loss, metric = train_step(model, features, labels)  # 使用train_step函数进行一次训练步骤


In [None]:
# 定义训练模型的函数train_model
def train_model(model, epochs):
    for epoch in range(1, epochs + 1):
        loss_list, metric_list = [], []

        # 遍历数据管道中的每个批次数据，执行训练步骤并记录损失和评估指标
        for features, labels in dl:
            lossi, metrici = train_step(model, features, labels)
            loss_list.append(lossi)
            metric_list.append(metrici)

        # 计算每个epoch的平均损失和评估指标
        loss = np.mean(loss_list)
        metric = np.mean(metric_list)

        # 每隔10个epoch打印一次训练信息
        if epoch % 10 == 0:
            printbar()
            print("epoch =", epoch, "loss =", loss, "metric =", metric)


# 调用train_model函数，进行50轮的模型训练
train_model(model, epochs=50)


In [None]:
# 创建一个包含两个子图的图形，分别放置在一行两列
fig, (ax1, ax2) = plt.subplots(nrows=1, ncols=2, figsize=(12, 5))

# 在第一个子图(ax1)中绘制真实标签的分布
ax1.scatter(Xp[:, 0], Xp[:, 1], c="r")  # 绘制正样本，颜色为红色
ax1.scatter(Xn[:, 0], Xn[:, 1], c="g")  # 绘制负样本，颜色为绿色
ax1.legend(["positive", "negative"])  # 添加图例标签
ax1.set_title("y_true")  # 设置子图标题为"y_true"

# 根据模型的预测结果分割数据点为正样本和负样本
Xp_pred = X[torch.squeeze(model.forward(X) >= 0.5)]  # 预测为正样本的数据点
Xn_pred = X[torch.squeeze(model.forward(X) < 0.5)]  # 预测为负样本的数据点

# 在第二个子图(ax2)中绘制模型的预测结果
ax2.scatter(Xp_pred[:, 0], Xp_pred[:, 1], c="r")  # 绘制预测为正样本的数据点，颜色为红色
ax2.scatter(Xn_pred[:, 0], Xn_pred[:, 1], c="g")  # 绘制预测为负样本的数据点，颜色为绿色
ax2.legend(["positive", "negative"])  # 添加图例标签
ax2.set_title("y_pred")  # 设置子图标题为"y_pred"
