课程
1. 训练过程可视化
2. 模型的存储和可视化
3. 课堂作业1
3. RNN 实现
4. 课堂作业 2

In [3]:
!pip install tensorboard 



In [4]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
from sklearn.datasets import load_digits
from sklearn.model_selection import train_test_split
import numpy as np
import time

# 设置设备
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"使用设备: {device}")

使用设备: cpu


加载数据

In [None]:
# 加载sklearn中的手写数字数据集
digits = load_digits()
X = digits.data  # 特征数据 (1797, 64)
y = digits.target  # 标签数据 (1797,)

# 数据预处理：重塑为图像格式并归一化
X = X.reshape(-1, 1, 8, 8)  # 重塑为 (样本数, 通道数, 高度, 宽度)
X = X / 16.0  # 将像素值归一化到 [0,1] 范围

# 分割数据集
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# 转换为PyTorch张量
X_train = torch.FloatTensor(X_train)
y_train = torch.LongTensor(y_train)
X_test = torch.FloatTensor(X_test)
y_test = torch.LongTensor(y_test)

# 创建数据加载器
train_dataset = TensorDataset(X_train, y_train)
test_dataset = TensorDataset(X_test, y_test)
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=1000)

RNN 的核心概念是循环，它通过在时间维度上对输入序列进行迭代来捕捉序列中的长期依赖关系。
对于图像信息， RNN 的应该循环什么？ 

In [117]:
# 定义CNN模型（适应8x8图像）
class CNN(nn.Module):
    def __init__(self):
        super(CNN, self).__init__()
        self.conv1 = nn.Conv2d(1, 10, kernel_size=3)  # 输入通道1，输出通道10
        self.conv2 = nn.Conv2d(10, 20, kernel_size=3)  # 输入通道10，输出通道20
        self.conv2_drop = nn.Dropout2d()  # 卷积层的Dropout
        self.fc1 = nn.Linear(20 * 1 * 1, 50)  # 全连接层
        self.fc2 = nn.Linear(50, 10)  # 输出层

    def forward(self, x):
        x = self.conv1(x)  # 卷积操作 (8-3+1=6) -> 6x6
        x = nn.functional.max_pool2d(x, 2)  # 最大池化 -> 3x3
        x = nn.functional.relu(x)  # ReLU激活函数
        
        x = self.conv2(x)  # 第二次卷积 (3-3+1=1) -> 1x1
        x = self.conv2_drop(x)  # Dropout防止过拟合
        x = nn.functional.max_pool2d(x, 1)  # 池化 (保持1x1)
        x = nn.functional.relu(x)  # ReLU激活函数
        
        x = x.view(-1, 20 * 1 * 1)  # 展平为一维向量
        x = self.fc1(x)  # 全连接层
        x = nn.functional.relu(x)  # ReLU激活函数
        x = nn.functional.dropout(x, training=self.training)  # Dropout
        x = self.fc2(x)  # 输出层
        return nn.functional.log_softmax(x, dim=1)  # 对数Softmax激活函数

In [None]:
# 初始化模型、损失函数和优化器
# 定义RNN模型
class RNN(nn.Module):
    def __init__(self, input_size=8, hidden_size=128, num_layers=2, num_classes=10):
        pass
        
    def forward(self, x):
        pass

Train & Test

In [118]:
# 初始化RNN模型
model = CNN().to(device)
# criterion = nn.NLLLoss()
# optimizer = optim.Adam(model.parameters(), lr=0.001)

#model = CNN().to(device)
criterion = nn.NLLLoss()  # 负对数似然损失
optimizer = optim.Adam(model.parameters(), lr=0.001)
#criterion = CustomLoss(lambda_1=2.0, lambda_9=-0.5)  # 调整权重参数


In [119]:
def train(epochs):
    model.train()
    for epoch in range(epochs):
        start_time = time.time()
        running_loss = 0.0
        for batch_idx, (data, target) in enumerate(train_loader):
            data, target = data.to(device), target.to(device)
            #print(data.shape) # [64,1,8,8]
            optimizer.zero_grad()
            output = model(data)
            loss = criterion(output, target)
            loss.backward()
            optimizer.step()
            
            running_loss += loss.item()
        
        end_time = time.time()
        print(f'Epoch {epoch+1}/{epochs}, 损失: {running_loss/len(train_loader):.4f}, 耗时: {end_time-start_time:.2f}秒')

# 评估模型

In [120]:

def test():
    model.eval()
    test_loss = 0
    correct = 0
    with torch.no_grad():
        for data, target in test_loader:
            data, target = data.to(device), target.to(device)
            output = model(data)
            test_loss += criterion(output, target).item()
            pred = output.argmax(dim=1, keepdim=True)  # 获取最大概率的索引
            correct += pred.eq(target.view_as(pred)).sum().item()

    test_loss /= len(test_loader)
    accuracy = 100. * correct / len(test_loader.dataset)
    print(f'测试集: 平均损失: {test_loss:.4f}, 准确率: {correct}/{len(test_loader.dataset)} ({accuracy:.2f}%)')



In [121]:
# 训练和评估
print("开始训练CNN模型...")
train(epochs=20)  # 由于数据集较小，增加训练轮次
print("\n开始评估模型...")
test()   

开始训练CNN模型...
Epoch 1/20, 损失: 2.3332, 耗时: 0.02秒
Epoch 2/20, 损失: 2.3177, 耗时: 0.00秒
Epoch 3/20, 损失: 2.3283, 耗时: 0.00秒
Epoch 4/20, 损失: 2.3225, 耗时: 0.00秒
Epoch 5/20, 损失: 2.2991, 耗时: 0.00秒
Epoch 6/20, 损失: 2.2856, 耗时: 0.00秒
Epoch 7/20, 损失: 2.2875, 耗时: 0.00秒
Epoch 8/20, 损失: 2.3035, 耗时: 0.00秒
Epoch 9/20, 损失: 2.2984, 耗时: 0.00秒
Epoch 10/20, 损失: 2.2838, 耗时: 0.00秒
Epoch 11/20, 损失: 2.2994, 耗时: 0.00秒
Epoch 12/20, 损失: 2.2548, 耗时: 0.00秒
Epoch 13/20, 损失: 2.2723, 耗时: 0.00秒
Epoch 14/20, 损失: 2.2705, 耗时: 0.00秒
Epoch 15/20, 损失: 2.2970, 耗时: 0.00秒
Epoch 16/20, 损失: 2.2693, 耗时: 0.00秒
Epoch 17/20, 损失: 2.2629, 耗时: 0.00秒
Epoch 18/20, 损失: 2.2383, 耗时: 0.00秒
Epoch 19/20, 损失: 2.2725, 耗时: 0.00秒
Epoch 20/20, 损失: 2.2652, 耗时: 0.00秒

开始评估模型...
测试集: 平均损失: 2.3012, 准确率: 176/1780 (9.89%)


# 训练过程可视化

1. 什么是 TensorBoard？

TensorBoard 是 TensorFlow 官方提供的可视化工具，支持：

    训练过程可视化（loss、accuracy 曲线）

    模型结构可视化（计算图 Graph）

    图像可视化（输入图片、生成图片）

    Embedding 可视化（降维、聚类）

    超参数调优（HParams 面板）

    它会把训练过程中的数据写入日志文件，然后通过 Web 界面实时展示。

2. 安装 TensorBoard

    如果已经安装了 TensorFlow（2.x 及以上版本），TensorBoard 通常会随附安装：

In [6]:
!pip install torch torchvision tensorboard





在 PyTorch 中使用 SummaryWriter 就能记录训练数据到 TensorBoard。

启动命令：tensorboard --logdir=你的日志路径。

常用方法：

add_scalar(tag, value, step) 记录标量。

add_image(tag, img_tensor, step) 记录图片。

add_graph(model, input_to_model) 记录模型结构。

In [127]:

from torch.utils.tensorboard import SummaryWriter
# 创建SummaryWriter对象，指定日志保存目录​
writer = SummaryWriter('./my_experiment')
def test(model,epoch,criterion):
    model.eval()
    test_loss = 0
    correct = 0
    all_preds = []
    all_targets = []
    with torch.no_grad():
        for data, target in test_loader:
            data, target = data.to(device), target.to(device)
            output = model(data)
            test_loss += criterion(output, target).item()
            pred = output.argmax(dim=1, keepdim=True)  # 获取最大概率的索引
            correct += pred.eq(target.view_as(pred)).sum().item()

            all_preds.append(torch.exp(output).cpu().numpy())  # 概率
            all_targets.append(target.cpu().numpy())

    test_loss /= len(test_loader)
    accuracy = 100. * correct / len(test_loader.dataset)
    print(f'----测试集: 平均损失: {test_loss:.4f}, 准确率: {correct}/{len(test_loader.dataset)} ({accuracy:.2f}%)')
    #print(f'\nTest set: Average loss: {test_loss:.4f}, Accuracy: {correct}/{len(test_loader.dataset)} '
    #      f'({100. * correct / len(test_loader.dataset):.0f}%)\n')
    writer.add_scalar('Test Loss', test_loss,epoch)
    writer.add_scalar('Test Accuracy', 100. * correct / len(test_loader.dataset),epoch)
    # 记录各层权重分布
    for name, param in model.named_parameters():
        writer.add_histogram(name, param, epoch)

    # 记录每个类别的 PR 曲线
    all_preds = np.concatenate(all_preds)
    all_targets = np.concatenate(all_targets)
    for class_id in range(10):
        labels = (all_targets == class_id).astype(int)
        scores = all_preds[:, class_id]
        # 计算当前类的准确率
        # labels: 该类别的0/1标签(mask)
        # scores: 属于该类别的概率
        preds_class = np.argmax(all_preds, axis=1)
        true_class = (all_targets == class_id)
        pred_class = (preds_class == class_id)
        correct_class = np.logical_and(pred_class, true_class).sum()
        total_class = true_class.sum()
        sc = correct_class / total_class if total_class > 0 else 0.0
        print("----- each class acc ", sc)
        #writer.add_pr_curve(f'PR_curve_class_{class_id}', labels, scores, epoch)    


In [128]:


def train(model,epochs,criterion,optimizer):
    model.train()
    for epoch in range(epochs):
        start_time = time.time()
        running_loss = 0.0
        correct = 0
        total = 0
        
        for batch_idx, (data, target) in enumerate(train_loader):
            data, target = data.to(device), target.to(device)
            optimizer.zero_grad()
            output = model(data)
            loss = criterion(output, target)
            loss.backward()
            optimizer.step()
            
            running_loss += loss.item()
            
            # 计算准确率
            _, predicted = output.max(1)
            total += target.size(0)
            correct += predicted.eq(target).sum().item()
        
        end_time = time.time()
        avg_loss = running_loss / len(train_loader)
        accuracy = 100. * correct / total
        
        # 记录训练损失和准确率到TensorBoard
        # 记录训练 loss
        writer.add_scalar('Training Loss', avg_loss, epoch)
        writer.add_scalar('Training Accuracy', accuracy, epoch)
        # 记录一张图片到 TensorBoard
        writer.add_image('MNIST sample', data[0], epoch * len(train_loader) + batch_idx)
        test(model,epoch,criterion)
        print(f'Epoch {epoch+1}/{epochs}, 损失: {avg_loss:.4f}, 准确率: {accuracy:.2f}%, 耗时: {end_time-start_time:.2f}秒')

        # writer.add_scalars(
        # 'Loss Comparison',  # 图表标题
        # {
        #     'Train': avg_loss,
        #     'Test': accuracy
        # },
        # epoch
        # )   

    return accuracy



In [129]:
model = CNN().to(device)
criterion = nn.NLLLoss()  # 负对数似然损失
optimizer = optim.Adam(model.parameters(), lr=0.001)

print("开始训练CNN模型...")
train(model,50,criterion,optimizer)  # 由于数据集较小，增加训练轮次
print("\n开始评估模型...")
test(model,-1,criterion)    

开始训练CNN模型...
----测试集: 平均损失: 2.3090, 准确率: 182/1780 (10.22%)
----- each class acc  0.0
----- each class acc  0.0
----- each class acc  0.0
----- each class acc  1.0
----- each class acc  0.0
----- each class acc  0.0
----- each class acc  0.0
----- each class acc  0.0
----- each class acc  0.0
----- each class acc  0.0
Epoch 1/50, 损失: 2.3431, 准确率: 5.88%, 耗时: 0.00秒
----测试集: 平均损失: 2.3083, 准确率: 182/1780 (10.22%)
----- each class acc  0.0
----- each class acc  0.0
----- each class acc  0.0
----- each class acc  1.0
----- each class acc  0.0
----- each class acc  0.0
----- each class acc  0.0
----- each class acc  0.0
----- each class acc  0.0
----- each class acc  0.0
Epoch 2/50, 损失: 2.3377, 准确率: 5.88%, 耗时: 0.00秒
----测试集: 平均损失: 2.3078, 准确率: 182/1780 (10.22%)
----- each class acc  0.0
----- each class acc  0.0
----- each class acc  0.0
----- each class acc  1.0
----- each class acc  0.0
----- each class acc  0.0
----- each class acc  0.0
----- each class acc  0.0
----- each class acc  0.0
---

In [130]:
# 记录模型结构
data, _ = next(iter(train_loader))
data = data.to(device)
writer.add_graph(model,data)
print("模型结构记录完成")
#data

模型结构记录完成


4. TensorBoard 可查看的内容

    Scalars：训练 loss 和测试 loss、accuracy 曲线。

    Images：MNIST 样本图片。

    Graphs：模型结构图。

    Histograms：各层权重分布（需要额外调用 add_histogram）。
    

5. 启动 TensorBoard

    在终端运行：

In [None]:
!tensorboard --logdir=./my_experiment

TensorFlow installation not found - running with reduced feature set.
Serving TensorBoard on localhost; to expose to the network, use a proxy or pass --bind_all
TensorBoard 2.20.0 at http://localhost:6006/ (Press CTRL+C to quit)


课堂练习： 
1. 通过 tensorboard 画出过拟合的结果


# RNN  模型

In [None]:
class RNN(nn.Module):
    def __init__(self, input_size=8, hidden_size=128, num_layers=2, num_classes=10):
        super(RNN, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        
        # LSTM层
        self.rnn = nn.RNN(input_size, hidden_size, num_layers, batch_first=True) 
        # # LSTM层
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
        
        # 全连接层
        self.fc = nn.Linear(hidden_size, num_classes)
        
    def forward(self, x):
        # 输入形状: (batch_size, 1, 8, 8)
        # 重塑为序列形式: (batch_size, 8, 8)
        batch_size = x.size(0)
        x = x.squeeze(1)  # 移除通道维度
       # print("x shape is ",x.shape)
        
        # 初始化隐藏状态
        h0 = torch.zeros(self.num_layers, batch_size, self.hidden_size).to(x.device)
        c0 = torch.zeros(self.num_layers, batch_size, self.hidden_size).to(x.device)
        
        # LSTM前向传播
        out, _ = self.lstm(x, (h0, c0))
        #out = self.rnn(x,(h0,c0))
        
        # 只使用最后一个时间步的输出
        out = self.fc(out[:, -1, :])
        return nn.functional.log_softmax(out, dim=1)


# 定义自定义损失函数
class CustomLoss(nn.Module):
    def __init__(self, lambda_1=1.5, lambda_9=-0.5):
        super(CustomLoss, self).__init__()
        self.lambda_1 = lambda_1  # 增加数字1的权重
        self.lambda_9 = lambda_9  # 降低数字9的权重
        self.ce_loss = nn.CrossEntropyLoss(reduction='none')
        
    def forward(self, outputs, targets):
        # 标准交叉熵损失
        ce = self.ce_loss(outputs, targets)
        
        # 为数字1和9添加特殊权重
        batch_size = targets.size(0)
        for i in range(batch_size):
            if targets[i] == 1:
                ce[i] *= self.lambda_1  # 增加数字1的损失权重
            elif targets[i] == 9:
                ce[i] *= self.lambda_9  # 降低数字9的损失权重
                
        return ce.mean()
    

# 初始化RNN模型
model = RNN().to(device)
# criterion = nn.NLLLoss()
# optimizer = optim.Adam(model.parameters(), lr=0.001)

#model = CNN().to(device)
#criterion = nn.NLLLoss()  # 负对数似然损失
#criterion = CustomLoss(lambda_1=2.0, lambda_9=-0.5)  # 调整权重参数
#optimizer = optim.Adam(model.parameters(), lr=0.001)
#criterion = CustomLoss(lambda_1=2.0, lambda_9=-0.5)  # 调整权重参数



In [None]:
# Training and Test
model = RNN().to(device)
criterion = nn.NLLLoss()  # 负对数似然损失
#criterion = CustomLoss(lambda_1=2.0, lambda_9=-0.5)  # 调整权重参数#
optimizer = optim.Adam(model.parameters(), lr=0.001)

print("开始训练RNN模型...")
train(model,20,criterion,optimizer)  # 由于数据集较小，增加训练轮次
print("\n开始评估模型...")
test(model,-1,criterion)    

开始训练CNN模型...
----测试集: 平均损失: 2.2224, 准确率: 116/360 (32.22%)
----- each class acc  0.9696969696969697
----- each class acc  0.7142857142857143
----- each class acc  0.48484848484848486
----- each class acc  1.0
----- each class acc  0.0
----- each class acc  0.14893617021276595
----- each class acc  0.17142857142857143
----- each class acc  0.0
----- each class acc  0.03333333333333333
----- each class acc  0.0
Epoch 1/20, 损失: 2.2841, 准确率: 15.38%, 耗时: 0.20秒
----测试集: 平均损失: 1.5991, 准确率: 171/360 (47.50%)
----- each class acc  0.9696969696969697
----- each class acc  0.5714285714285714
----- each class acc  0.2727272727272727
----- each class acc  0.47058823529411764
----- each class acc  0.08695652173913043
----- each class acc  0.425531914893617
----- each class acc  0.6571428571428571
----- each class acc  0.7058823529411765
----- each class acc  0.03333333333333333
----- each class acc  0.65
Epoch 2/20, 损失: 1.9145, 准确率: 30.41%, 耗时: 0.20秒
----测试集: 平均损失: 1.0524, 准确率: 230/360 (63.89%)
----- 

TASK: 模型存储 & 可视化

In [115]:
# only save the weight 
torch.save(model, 'model.pth')
# 保存参数
torch.save(model.state_dict(), 'model_weights.pth')
# we visualzie it in https://netron.app/

课堂作业 2：  
Build a two-layer transformer-encoder  model to train mnist task 