In [None]:
#-----------------------安装和初始化测试-----------------------

In [12]:
# 安装支持CUDA的PyTorch版本
!pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu121

Looking in indexes: https://download.pytorch.org/whl/cu121
Collecting torch
  Using cached https://download.pytorch.org/whl/cu121/torch-2.5.1%2Bcu121-cp312-cp312-win_amd64.whl (2449.3 MB)
Collecting torchvision
  Using cached https://download.pytorch.org/whl/cu121/torchvision-0.20.1%2Bcu121-cp312-cp312-win_amd64.whl (6.1 MB)
Collecting torchaudio
  Using cached https://download.pytorch.org/whl/cu121/torchaudio-2.5.1%2Bcu121-cp312-cp312-win_amd64.whl (4.1 MB)
Installing collected packages: torch, torchvision, torchaudio
Successfully installed torch-2.5.1+cu121 torchaudio-2.5.1+cu121 torchvision-0.20.1+cu121


In [1]:
import torch
import numpy as np

In [3]:
import torch
import time

print("🚀 RTX 4060 GPU加速测试开始！")
print("=" * 50)

# 检查GPU状态
print(f"PyTorch版本: {torch.__version__}")
print(f"CUDA是否可用: {torch.cuda.is_available()}")
print(f"CUDA版本: {torch.version.cuda}")
print(f"GPU名称: {torch.cuda.get_device_name(0)}")
print(f"GPU内存: {torch.cuda.get_device_properties(0).total_memory / 1024**3:.1f} GB")

# 设置设备
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"\n使用设备: {device}")

# 性能对比测试
def performance_test():
    print("\n🎯 开始性能对比测试...")
    
    # 创建大张量进行矩阵乘法
    size = 20000
    print(f"矩阵大小: {size} x {size}")
    
    # CPU计算
    a_cpu = torch.randn(size, size)
    b_cpu = torch.randn(size, size)
    
    start_time = time.time()
    result_cpu = a_cpu @ b_cpu
    cpu_time = time.time() - start_time
    print(f"⏱️  CPU计算时间: {cpu_time:.4f} 秒")
    
    # GPU计算
    if torch.cuda.is_available():
        a_gpu = a_cpu.to(device)
        b_gpu = b_cpu.to(device)
        
        # 预热GPU
        _ = a_gpu @ b_gpu
        torch.cuda.synchronize()
        
        start_time = time.time()
        result_gpu = a_gpu @ b_gpu
        torch.cuda.synchronize()
        gpu_time = time.time() - start_time
        
        print(f"⚡ GPU计算时间: {gpu_time:.4f} 秒")
        print(f"🚀 速度提升: {cpu_time/gpu_time:.1f} 倍!")
        
        # 验证结果一致性
        difference = torch.max(torch.abs(result_cpu - result_gpu.cpu()))
        print(f"✅ 结果差异: {difference:.6f}")

# 张量操作示例
def tensor_operations():
    print("\n🔧 张量操作演示...")
    
    # 在GPU上创建张量
    x = torch.tensor([[1.0, 2.0], [3.0, 4.0]], device=device)
    y = torch.tensor([[5.0, 6.0], [7.0, 8.0]], device=device)
    
    print("张量 x:")
    print(x)
    print("张量 y:")
    print(y)
    
    # GPU上的各种运算
    z1 = x + y  # 加法
    z2 = x * y  # 乘法
    z3 = x @ y  # 矩阵乘法
    
    print("\nGPU运算结果:")
    print(f"x + y:\n{z1}")
    print(f"x * y:\n{z2}")
    print(f"x @ y (矩阵乘法):\n{z3}")

# 自动求导示例
def autograd_demo():
    print("\n🎓 自动求导演示...")
    
    # 在GPU上进行自动求导
    x = torch.tensor(2.0, requires_grad=True, device=device)
    y = x**3 + 2*x**2 + 5*x + 1
    
    y.backward()
    print(f"函数: y = x³ + 2x² + 5x + 1")
    print(f"当 x = 2 时:")
    print(f"y = {y.item()}")
    print(f"dy/dx = {x.grad.item()}")  # 导数: 3x² + 4x + 5 = 3*4 + 4*2 + 5 = 25

if __name__ == "__main__":
    performance_test()
    tensor_operations()
    autograd_demo()
    
    print("\n🎉 恭喜！你的RTX 4060 GPU加速环境已完美配置！")
    print("✨ 现在你可以享受飞快的深度学习训练速度了！")

🚀 RTX 4060 GPU加速测试开始！
PyTorch版本: 2.5.1+cu121
CUDA是否可用: True
CUDA版本: 12.1
GPU名称: NVIDIA GeForce RTX 4060 Laptop GPU
GPU内存: 8.0 GB

使用设备: cuda

🎯 开始性能对比测试...
矩阵大小: 20000 x 20000
⏱️  CPU计算时间: 15.4621 秒
⚡ GPU计算时间: 2.2528 秒
🚀 速度提升: 6.9 倍!
✅ 结果差异: 0.006592

🔧 张量操作演示...
张量 x:
tensor([[1., 2.],
        [3., 4.]], device='cuda:0')
张量 y:
tensor([[5., 6.],
        [7., 8.]], device='cuda:0')

GPU运算结果:
x + y:
tensor([[ 6.,  8.],
        [10., 12.]], device='cuda:0')
x * y:
tensor([[ 5., 12.],
        [21., 32.]], device='cuda:0')
x @ y (矩阵乘法):
tensor([[19., 22.],
        [43., 50.]], device='cuda:0')

🎓 自动求导演示...
函数: y = x³ + 2x² + 5x + 1
当 x = 2 时:
y = 27.0
dy/dx = 25.0

🎉 恭喜！你的RTX 4060 GPU加速环境已完美配置！
✨ 现在你可以享受飞快的深度学习训练速度了！


In [None]:
#-----------------------正文-----------------------

Tensor

In [4]:
#-------tensor basic grammar-------

#直接创建
data = [[1, 2],[3, 4]]
x_data = torch.tensor(data)

#从 NumPy 数组
np_array = np.array(data)
x_np = torch.from_numpy(np_array)

#从另一个张量 (新张量保留参数张量的属性（形状、数据类型），除非显式覆盖)
#创建一个与 x_data 形状相同但所有元素都为1的新张量
x_ones = torch.ones_like(x_data) # retains the properties of x_data
print(f"Ones Tensor: \n {x_ones} \n")

#创建与x_data相同形状的随机张量
x_rand = torch.rand_like(x_data, dtype=torch.float) # overrides the datatype of x_data
print(f"Random Tensor: \n {x_rand} \n")

#With random or constant values

#shape is a tuple of tensor dimensions. In the functions below, it determines the dimensionality of the output tensor
shape_1d = (5,)        # 1维，5个元素的一维数组    单元素要逗号，多元素不用管
shape_2d = (2, 3)      # 2维，2行3列的矩阵  
shape_3d = (2, 3, 4)   # 3维，2个3x4的矩阵

shape = (2,3)
rand_tensor = torch.rand(shape) # random variable
ones_tensor = torch.ones(shape) # creat a tensor with all value = 1 
zeros_tensor = torch.zeros(shape) # creat a tensor with all value = 0 

print(f"Random Tensor: \n {rand_tensor} \n")
print(f"Ones Tensor: \n {ones_tensor} \n")
print(f"Zeros Tensor: \n {zeros_tensor}") # 使用随机值或常量值

# We move our tensor to the current accelerator if available
# if torch.accelerator.is_available():
#     tensor = tensor.to(torch.accelerator.current_accelerator())

#Standard numpy-like indexing and slicing
tensor = torch.ones(4, 4)
print(f"First row: {tensor[0]}") # 输出第一行
print(f"First column: {tensor[:, 0]}") # 输出第一列
print(f"Last column: {tensor[..., -1]}") # 输出最后一列
tensor[:,1] = 0 # 把第二列都换成0
print(tensor)

# Joinint tensors using torch.cat
t1 = torch.cat([tensor, tensor, tensor], dim=1)
print(t1)

#Arithmetic operations
# This computes the matrix multiplication between two tensors. y1, y2, y3 will have the same value
# ``tensor.T`` returns the transpose of a tensor
y1 = tensor @ tensor.T # 计算 tensor 与其转置的矩阵乘法
y2 = tensor.matmul(tensor.T) # 计算 tensor 与 tensor.T 的矩阵乘法
y3 = torch.rand_like(y1) 
torch.matmul(tensor, tensor.T, out=y3)   # 三者等效


# This computes the element-wise product. z1, z2, z3 will have the same value
z1 = tensor * tensor #对 tensor 的每个元素进行平方（逐元素操作，不是矩阵乘法）
z2 = tensor.mul(tensor) 
z3 = torch.rand_like(tensor) 
torch.mul(tensor, tensor, out=z3) # 三者等效

# convert Single-element tensors to a Python numerical value using :item()
agg = tensor.sum()
agg_item = agg.item()
print(agg_item, type(agg_item))

# 原地操作 （In-place Operations）
tensor.add_(5)  # 直接修改tensor的内容  （在原本的指令上加个下划线就是原地操作）

# 非原地操作 - 创建新张量
new_tensor = tensor.add(5)  # 不改变原tensor

# PyTorch张量 → NumPy数组
t = torch.ones(5)
n = t.numpy()

# NumPy数组 → PyTorch张量
n = np.ones(5)
t = torch.from_numpy(n)

Ones Tensor: 
 tensor([[1, 1],
        [1, 1]]) 

Random Tensor: 
 tensor([[0.2883, 0.9344],
        [0.2310, 0.7040]]) 

Random Tensor: 
 tensor([[0.5848, 0.0959, 0.2012],
        [0.0842, 0.9143, 0.4248]]) 

Ones Tensor: 
 tensor([[1., 1., 1.],
        [1., 1., 1.]]) 

Zeros Tensor: 
 tensor([[0., 0., 0.],
        [0., 0., 0.]])
First row: tensor([1., 1., 1., 1.])
First column: tensor([1., 1., 1., 1.])
Last column: tensor([1., 1., 1., 1.])
tensor([[1., 0., 1., 1.],
        [1., 0., 1., 1.],
        [1., 0., 1., 1.],
        [1., 0., 1., 1.]])
tensor([[1., 0., 1., 1., 1., 0., 1., 1., 1., 0., 1., 1.],
        [1., 0., 1., 1., 1., 0., 1., 1., 1., 0., 1., 1.],
        [1., 0., 1., 1., 1., 0., 1., 1., 1., 0., 1., 1.],
        [1., 0., 1., 1., 1., 0., 1., 1., 1., 0., 1., 1.]])
12.0 <class 'float'>


Datasets & DataLoaders

$#-------Datasets & DataLoaders basic grammar-------$
DataLoader的关键作用：

批量处理：自动将样本组织成批次

数据打乱：防止模型过拟合

并行加载：使用多进程加速数据读取

内存优化：支持锁页内存等优化

使用模式：

创建Dataset

用DataLoader包装Dataset

在训练循环中迭代DataLoader

将批次数据传递给模型

重要参数：

batch_size：根据GPU内存调整

shuffle：训练集为True，测试集为False

num_workers：根据CPU核心数调整

pin_memory：使用GPU时设为True

In [None]:
#------创建数据集基础语法------
#导入库
import os # 用于处理文件路径
import pandas as pd # 用于读取CSV标注文件
from torchvision.io import decode_image # 从文件路径读取并解码图像为Tensor


class CustomImageDataset(Dataset):
    
    # 参数说明：
        # annotations_file: CSV文件路径，包含图像文件名和对应标签
        # img_dir: 图像文件存储的目录路径
        # transform: 图像预处理变换（如调整大小、归一化等）
        # target_transform: 标签预处理变换
    
    # __init__ 作用：初始化数据集对象，读取标注文件，设置图像目录和变换函数
    def __init__(self, annotations_file, img_dir, transform=None, target_transform=None):
        self.img_labels = pd.read_csv(annotations_file)
        self.img_dir = img_dir
        self.transform = transform
        self.target_transform = target_transform  #如 transform=ToTensor(), 将图像转换为Tensor

    #__len__ 作用：返回数据集中样本的数量
    def __len__(self):
        return len(self.img_labels) #实现：通常返回标注文件中的行数

    #__getitem__ 作用：根据索引idx获取一个样本（图像和标签）
    def __getitem__(self, idx):
        img_path = os.path.join(self.img_dir, self.img_labels.iloc[idx, 0])
        image = decode_image(img_path) #读取图像文件并解码为PyTorch Tensor， 返回的Tensor形状为 [channels, height, width]
        label = self.img_labels.iloc[idx, 1]
        if self.transform:                        #应用图像变换（如归一化、数据增强等）
            image = self.transform(image)
        if self.target_transform:                 #应用标签变换（如one-hot编码等）
            label = self.target_transform(label)
        return image, label   #返回图像Tensor和对应的标签


# 用 DataLoaders 来准备训练数据
from torch.utils.data import DataLoader

train_dataloader = DataLoader(training_data, batch_size=64, shuffle=True)  
test_dataloader = DataLoader(test_data, batch_size=64, shuffle=True)











# train_dataloader = DataLoader(
#     dataset=training_data,      # 数据集对象
#     batch_size=64,              # 批量大小
#     shuffle=True,               # 是否打乱数据
#     num_workers=4,              # 用于数据加载的子进程数
#     pin_memory=True,            # 锁页内存，加速GPU传输
#     drop_last=False,            # 是否丢弃最后一个不完整的批次
#     persistent_workers=True     # 保持worker进程活跃
# )



#迭代DataLoader （Iterate through the DataLoader）

#基本迭代方式
    # 方法1: 直接迭代
    for batch_idx, (images, labels) in enumerate(train_dataloader):
        print(f"Batch {batch_idx}: images shape {images.shape}, labels shape {labels.shape}")
        
        # 训练代码...
        if batch_idx == 2:  # 只演示前3个批次
            break

# 使用 next(iter()) 获取单个批次
    # 获取第一个批次进行演示
    train_features, train_labels = next(iter(train_dataloader))
    
    print(f"Feature batch shape: {train_features.size()}")
    print(f"Labels batch shape: {train_labels.size()}")




# DataLoader 高级用法
    # 不同批量大小设置
        # 根据任务类型设置不同的批量大小
        small_batch_loader = DataLoader(training_data, batch_size=16, shuffle=True)   # 小批量，适合小模型
        large_batch_loader = DataLoader(training_data, batch_size=256, shuffle=True)  # 大批量，适合大模型
        
        print(f"小批量数据形状: {next(iter(small_batch_loader))[0].shape}")  # [16, 1, 28, 28]
        print(f"大批量数据形状: {next(iter(large_batch_loader))[0].shape}")  # [256, 1, 28, 28]

    # 性能优化参数
        # 优化性能的DataLoader配置
        optimized_loader = DataLoader(
            training_data,
            batch_size=64,
            shuffle=True,
            num_workers=4,           # 并行加载进程数
            pin_memory=True,         # 锁页内存，加速GPU数据传输
            drop_last=True,          # 丢弃最后一个不完整批次
            persistent_workers=True  # 保持worker进程，避免重复创建
        )
        
        # 注意：在Windows上，num_workers=0可以避免一些问题

# 性能优化
    # 使用DataLoader的多进程加载
    dataloader = DataLoader(
        dataset,
        batch_size=32,
        shuffle=True,
        num_workers=4,        # 并行加载进程数
        pin_memory=True,      # 加速GPU传输
        persistent_workers=True  # 保持worker进程
    )


In [None]:
#----重要注意事项----
# decode_image应该是read_image




# Transforms
    from torchvision import transforms
    
    # 常用的图像变换组合
    train_transform = transforms.Compose([
        transforms.Resize((256, 256)),      # 调整大小
        transforms.RandomHorizontalFlip(),  # 数据增强：随机水平翻转
        transforms.ToTensor(),              # 转换为Tensor (0-1范围)
        transforms.Normalize(               # 归一化
            mean=[0.485, 0.456, 0.406],    # ImageNet均值
            std=[0.229, 0.224, 0.225]      # ImageNet标准差
        )
    ])
    
    # 标签变换示例
    def target_transform(label):
        # 例如：将标签转换为one-hot编码
        return torch.nn.functional.one_hot(torch.tensor(label), num_classes=10)


# 错误处理
    def __getitem__(self, idx):
        try:
            img_path = os.path.join(self.img_dir, self.img_labels.iloc[idx, 0])
            image = read_image(img_path)
            label = self.img_labels.iloc[idx, 1]
            
            if self.transform:
                image = self.transform(image)
            if self.target_transform:
                label = self.target_transform(label)
                
            return image, label
            
        except Exception as e:
            print(f"加载样本 {idx} 时出错: {e}")
            # 返回一个默认样本或跳过
            return torch.zeros(3, 224, 224), -1


# 支持不同的数据格式
class FlexibleDataset(Dataset):
    def __init__(self, data_frame, img_dir, transform=None):
        self.data_frame = data_frame
        self.img_dir = img_dir
        self.transform = transform
    
    def __getitem__(self, idx):
        row = self.data_frame.iloc[idx]
        
        # 支持不同的列名
        img_path = os.path.join(self.img_dir, 
                               getattr(row, 'filename', None) or 
                               getattr(row, 'image_path', None) or 
                               row[0])
        
        # 支持不同的标签列名
        label = getattr(row, 'label', None) or \
                getattr(row, 'class', None) or \
                getattr(row, 'category', None) or \
                row[1]
        
        image = read_image(img_path)
        if self.transform:
            image = self.transform(image)
            
        return image, label

# shuffle参数的重要性
    # 训练集：shuffle=True（防止模型记住数据顺序）
        train_loader = DataLoader(training_data, batch_size=64, shuffle=True)
    
    # 测试集：shuffle=False（保持一致性，便于分析）
        test_loader = DataLoader(test_data, batch_size=64, shuffle=False)

# drop_last参数
    # 当数据集大小不能被batch_size整除时
        dataset_size = 100
        batch_size = 30
            
        loader_drop_false = DataLoader(dataset, batch_size=30, drop_last=False)
        loader_drop_true = DataLoader(dataset, batch_size=30, drop_last=True)
        
        print(f"drop_last=False 的批次数: {len(loader_drop_false)}")  # 4个批次 (30,30,30,10)
        print(f"drop_last=True 的批次数: {len(loader_drop_true)}")   # 3个批次 (30,30,30)

# 内存管理
    # 监控数据加载的内存使用
        for batch_idx, (data, target) in enumerate(train_dataloader):
            print(f"批次 {batch_idx} 内存使用: {data.element_size() * data.nelement() / 1024 / 1024:.2f} MB")
            
            if batch_idx == 0:
                # 检查设备
                print(f"数据设备: {data.device}")
                print(f"标签设备: {target.device}")

$DataLoader 完整公式化例程$

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
from torchvision import datasets, transforms
import matplotlib.pyplot as plt
import numpy as np
import time

# 设置随机种子保证可重复性
torch.manual_seed(42)
np.random.seed(42)

class CompleteDataLoaderExample:
    def __init__(self):
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        print(f"使用设备: {self.device}")
        
        # 数据变换
        self.transform = transforms.Compose([
            transforms.ToTensor(),
            transforms.Normalize((0.5,), (0.5,))
        ])
        
        # 加载数据
        self.train_dataset, self.test_dataset = self.load_data()
        
        # 创建DataLoader
        self.train_loader, self.test_loader = self.create_dataloaders()
        
        # 创建模型
        self.model = self.create_model().to(self.device)
        
        # 定义损失函数和优化器
        self.criterion = nn.CrossEntropyLoss()
        self.optimizer = optim.Adam(self.model.parameters(), lr=0.001)
        
        # 训练记录
        self.train_losses = []
        self.test_accuracies = []

    def load_data(self):
        """加载训练和测试数据集"""
        print("正在下载和加载数据...")
        
        train_dataset = datasets.FashionMNIST(
            root='./data',
            train=True,
            download=True,
            transform=self.transform
        )
        
        test_dataset = datasets.FashionMNIST(
            root='./data',
            train=False,
            download=True,
            transform=self.transform
        )
        
        print(f"训练集大小: {len(train_dataset)}")
        print(f"测试集大小: {len(test_dataset)}")
        
        return train_dataset, test_dataset

    def create_dataloaders(self):
        """创建训练和测试DataLoader"""
        train_loader = DataLoader(
            self.train_dataset,
            batch_size=64,
            shuffle=True,
            num_workers=2,
            pin_memory=True
        )
        
        test_loader = DataLoader(
            self.test_dataset,
            batch_size=1000,
            shuffle=False,
            num_workers=2,
            pin_memory=True
        )
        
        print(f"训练批次数: {len(train_loader)}")
        print(f"测试批次数: {len(test_loader)}")
        
        return train_loader, test_loader

    def create_model(self):
        """创建简单的神经网络模型"""
        class SimpleNN(nn.Module):
            def __init__(self):
                super(SimpleNN, self).__init__()
                self.flatten = nn.Flatten()
                self.linear_relu_stack = nn.Sequential(
                    nn.Linear(28*28, 512),
                    nn.ReLU(),
                    nn.Linear(512, 512),
                    nn.ReLU(),
                    nn.Linear(512, 10)
                )
            
            def forward(self, x):
                x = self.flatten(x)
                logits = self.linear_relu_stack(x)
                return logits
        
        return SimpleNN()

    def train_epoch(self, epoch):
        """训练一个epoch"""
        self.model.train()
        running_loss = 0.0
        
        for batch_idx, (data, target) in enumerate(self.train_loader):
            # 将数据移动到设备
            data, target = data.to(self.device), target.to(self.device)
            
            # 前向传播
            self.optimizer.zero_grad()
            output = self.model(data)
            loss = self.criterion(output, target)
            
            # 反向传播
            loss.backward()
            self.optimizer.step()
            
            running_loss += loss.item()
            
            # 每100个批次打印一次进度
            if batch_idx % 100 == 0:
                print(f'Train Epoch: {epoch} [{batch_idx * len(data)}/{len(self.train_loader.dataset)} '
                      f'({100. * batch_idx / len(self.train_loader):.0f}%)]\tLoss: {loss.item():.6f}')
        
        avg_loss = running_loss / len(self.train_loader)
        self.train_losses.append(avg_loss)
        return avg_loss

    def test(self):
        """测试模型性能"""
        self.model.eval()
        test_loss = 0
        correct = 0
        
        with torch.no_grad():
            for data, target in self.test_loader:
                data, target = data.to(self.device), target.to(self.device)
                output = self.model(data)
                test_loss += self.criterion(output, target).item()
                pred = output.argmax(dim=1, keepdim=True)
                correct += pred.eq(target.view_as(pred)).sum().item()
        
        test_loss /= len(self.test_loader)
        accuracy = 100. * correct / len(self.test_loader.dataset)
        self.test_accuracies.append(accuracy)
        
        print(f'\n测试集: 平均损失: {test_loss:.4f}, 准确率: {correct}/{len(self.test_loader.dataset)} '
              f'({accuracy:.2f}%)\n')
        
        return accuracy

    def visualize_batch(self, num_images=12):
        """可视化一个批次的样本"""
        # 获取一个批次
        data_iter = iter(self.train_loader)
        images, labels = next(data_iter)
        
        # 标签映射
        class_names = ['T-shirt/top', 'Trouser', 'Pullover', 'Dress', 'Coat',
                      'Sandal', 'Shirt', 'Sneaker', 'Bag', 'Ankle boot']
        
        # 创建图像网格
        fig, axes = plt.subplots(3, 4, figsize=(12, 9))
        for i in range(num_images):
            ax = axes[i // 4, i % 4]
            image = images[i].squeeze()
            label = labels[i].item()
            
            ax.imshow(image, cmap='gray')
            ax.set_title(f'{class_names[label]} ({label})')
            ax.axis('off')
        
        plt.tight_layout()
        plt.show()

    def visualize_training(self):
        """可视化训练过程"""
        fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 4))
        
        # 训练损失
        ax1.plot(self.train_losses)
        ax1.set_title('训练损失')
        ax1.set_xlabel('Epoch')
        ax1.set_ylabel('Loss')
        ax1.grid(True)
        
        # 测试准确率
        ax2.plot(self.test_accuracies)
        ax2.set_title('测试准确率')
        ax2.set_xlabel('Epoch')
        ax2.set_ylabel('Accuracy (%)')
        ax2.grid(True)
        
        plt.tight_layout()
        plt.show()

    def run_training(self, epochs=5):
        """运行完整训练过程"""
        print("开始训练...")
        start_time = time.time()
        
        for epoch in range(1, epochs + 1):
            epoch_start = time.time()
            
            # 训练一个epoch
            train_loss = self.train_epoch(epoch)
            
            # 测试
            test_accuracy = self.test()
            
            epoch_time = time.time() - epoch_start
            print(f'Epoch {epoch} 完成, 耗时: {epoch_time:.2f}秒')
            print('-' * 50)
        
        total_time = time.time() - start_time
        print(f'训练完成! 总耗时: {total_time:.2f}秒')
        
        # 可视化训练过程
        self.visualize_training()

    def demonstrate_dataloader_features(self):
        """演示DataLoader的各种特性"""
        print("\n" + "="*50)
        print("DataLoader 特性演示")
        print("="*50)
        
        # 1. 显示批次信息
        print("1. 批次信息:")
        for batch_idx, (data, target) in enumerate(self.train_loader):
            if batch_idx == 0:
                print(f"  数据形状: {data.shape}")  # [batch_size, channels, height, width]
                print(f"  标签形状: {target.shape}")  # [batch_size]
                print(f"  数据类型: {data.dtype}")
                print(f"  数据范围: [{data.min():.3f}, {data.max():.3f}]")
            break
        
        # 2. 显示shuffle效果
        print("\n2. Shuffle效果演示:")
        first_batch_labels = []
        for i in range(3):
            data_iter = iter(self.train_loader)
            _, labels = next(data_iter)
            first_batch_labels.append(labels[:5].tolist())  # 前5个标签
        
        print("   前3次迭代的第一个批次的前5个标签:")
        for i, labels in enumerate(first_batch_labels):
            print(f"   迭代 {i+1}: {labels}")
        
        # 3. 显示数据集信息
        print("\n3. 数据集统计:")
        print(f"   训练集总样本数: {len(self.train_loader.dataset)}")
        print(f"   测试集总样本数: {len(self.test_loader.dataset)}")
        print(f"   训练批次数: {len(self.train_loader)}")
        print(f"   测试批次数: {len(self.test_loader)}")
        print(f"   批次大小: {self.train_loader.batch_size}")

def main():
    """主函数"""
    # 创建示例
    example = CompleteDataLoaderExample()
    
    # 可视化一些样本
    print("正在可视化训练样本...")
    example.visualize_batch()
    
    # 演示DataLoader特性
    example.demonstrate_dataloader_features()
    
    # 运行训练
    example.run_training(epochs=5)
    
    # 保存模型
    torch.save(example.model.state_dict(), 'fashion_mnist_model.pth')
    print("模型已保存为 'fashion_mnist_model.pth'")

if __name__ == "__main__":
    main()

$关键组件说明 数据加载 → 模型定义 → 训练 → 测试 → 可视化$

In [None]:
#1 数据加载和预处理
# 标准数据变换
transform = transforms.Compose([
    transforms.ToTensor(),           # 转换为Tensor
    transforms.Normalize((0.5,), (0.5,))  # 归一化到[-1,1]
])

# 数据集加载
train_dataset = datasets.FashionMNIST(...)
test_dataset = datasets.FashionMNIST(...)

#2 DataLoader配置
# 训练DataLoader - 打乱数据，适合训练
train_loader = DataLoader(
    train_dataset,
    batch_size=64,
    shuffle=True,        # 重要：训练时需要打乱
    num_workers=2,       # 并行加载进程
    pin_memory=True      # 加速GPU传输
)

# 测试DataLoader - 不打乱数据，适合评估
test_loader = DataLoader(
    test_dataset,
    batch_size=1000,     # 测试时可以用更大的批次
    shuffle=False,       # 重要：测试时不需要打乱
    num_workers=2,
    pin_memory=True
)

#3 标准训练循环模板
def train_epoch(self, epoch):
    self.model.train()  # 设置为训练模式
    for batch_idx, (data, target) in enumerate(self.train_loader):
        # 1. 数据移动到设备
        data, target = data.to(self.device), target.to(self.device)
        
        # 2. 梯度清零
        self.optimizer.zero_grad()
        
        # 3. 前向传播
        output = self.model(data)
        loss = self.criterion(output, target)
        
        # 4. 反向传播
        loss.backward()
        
        # 5. 参数更新
        self.optimizer.step()

#4 标准测试循环模板
def test(self):
    self.model.eval()  # 设置为评估模式
    with torch.no_grad():  # 禁用梯度计算
        for data, target in self.test_loader:
            data, target = data.to(self.device), target.to(self.device)
            output = self.model(data)
            # 计算准确率等指标...

$Transforms$

$Transforms是数据预处理的工具，用于将原始数据转换为适合模型训练的格式$

In [None]:
import torch
from torchvision import datasets
from torchvision.transforms import ToTensor, Lambda

# 应用ToTensor转换
to_tensor = ToTensor()
tensor_image = to_tensor(pil_image)

# Lambda Transforms
target_transform = Lambda(lambda y: torch.zeros(
    10, dtype=torch.float).scatter_(dim=0, index=torch.tensor(y), value=1))

    # Within Lambda Transforms
        # 步骤1: 创建全零张量
            zero_tensor = torch.zeros(10, dtype=torch.float)
            # tensor([0., 0., 0., 0., 0., 0., 0., 0., 0., 0.])
        
        # 步骤2: 使用scatter_在指定位置设置值
            result = zero_tensor.scatter_(dim=0, index=torch.tensor(3), value=1)
            # tensor([0., 0., 0., 1., 0., 0., 0., 0., 0., 0.])

# 常用Transforms
from torchvision import transforms

# 常用的图像变换组合
transform = transforms.Compose([
    transforms.Resize(256),                    # 调整大小
    transforms.CenterCrop(224),                # 中心裁剪
    transforms.RandomHorizontalFlip(0.5),     # 随机水平翻转
    transforms.RandomRotation(10),             # 随机旋转
    transforms.ColorJitter(0.2, 0.2, 0.2),    # 颜色抖动
    transforms.ToTensor(),                     # 转换为Tensor
    transforms.Normalize(                      # 归一化
        mean=[0.485, 0.456, 0.406],           # ImageNet均值
        std=[0.229, 0.224, 0.225]             # ImageNet标准差
    )
])

$ToTensor()$完成两个主要转换：

转换前：PIL Image 或 numpy.ndarray

转换后：torch.FloatTensor

具体变化：
1. 数据类型：PIL/NumPy → torch.Tensor
2. 数值范围：[0, 255] → [0.0, 1.0]
3. 维度顺序：(H, W, C) → (C, H, W)

In [None]:
# 实际应用（训练和测试的不同变换）
    # 训练变换：包含数据增强
    train_transform = transforms.Compose([
        transforms.RandomResizedCrop(224),
        transforms.RandomHorizontalFlip(),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ])
    
    # 测试变换：只有基础变换
    test_transform = transforms.Compose([
        transforms.Resize(256),
        transforms.CenterCrop(224),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ])
    
    # 分别应用于训练集和测试集
    train_dataset = datasets.FashionMNIST(
        root="data", train=True, download=True, transform=train_transform
    )
    
    test_dataset = datasets.FashionMNIST(
        root="data", train=False, download=True, transform=test_transform
    )

#----注意事项----
    # 变换的顺序很重要
        # 错误顺序
        wrong_order = transforms.Compose([
            transforms.Normalize((0.5,), (0.5,)),  # 错误：在ToTensor之前
            transforms.ToTensor()
        ])
        
        # 正确顺序
        correct_order = transforms.Compose([
            transforms.ToTensor(),                  # 先转换为Tensor
            transforms.Normalize((0.5,), (0.5,))   # 然后归一化
        ])
    
    # 内存和性能考虑
        # 避免在变换中进行昂贵的操作
        # 不好：在每次获取样本时都进行复杂计算
        expensive_transform = Lambda(lambda x: complicated_processing(x))
        
        # 好：预处理或使用缓存的变换
        efficient_transform = transforms.Compose([
            transforms.ToTensor(),
            transforms.Normalize((0.5,), (0.5,))
        ])
    
    # 数据类型一致性
        # 确保变换后的数据类型与模型期望一致
        def check_data_types(dataset):
            image, label = dataset[0]
            
            print(f"图像类型: {image.dtype}")    # 应该是torch.float32
            print(f"标签类型: {label.dtype}")    # 应该是torch.float32（对于one-hot）
            
            # 模型通常期望float32输入
            assert image.dtype == torch.float32, f"图像数据类型错误: {image.dtype}"
            assert label.dtype == torch.float32, f"标签数据类型错误: {label.dtype}"

$Build-the-Neural-Network$

In [None]:
# Import the necessary library
import os
import torch
from torch import nn  # 神经网络核心模块 torch.nn：包含所有神经网络层和损失函数
from torch.utils.data import DataLoader  # 数据加载 DataLoader：用于批量加载数据
from torchvision import datasets, transforms  # 计算机视觉相关 torchvision：提供预训练模型和数据集


# Get Device for Training
# 方法1：使用最新的加速器API（推荐）
device = torch.accelerator.current_accelerator().type if torch.accelerator.is_available() else "cpu"
print(f"Using {device} device")

    # 方法2：传统方法（兼容性更好）
        # if torch.cuda.is_available():
        #     device = "cuda"
        # elif hasattr(torch.backends, 'mps') and torch.backends.mps.is_available():
        #     device = "mps"  # Apple Silicon
        # else:
        #     device = "cpu"


# Define the Class 神经网络类的骨架
"""" 
关键规则：

必须继承 nn.Module

必须调用 super().__init__()

必须实现 forward 方法
"""" 
class NeuralNetwork(nn.Module):  # 必须继承nn.Module
    def __init__(self):
        super().__init__()  # 必须调用父类初始化 super().__init__()
        # 在这里定义网络层
        
    def forward(self, x):
        # 在这里定义数据如何流动 （必须实现 forward 方法）
        return x

"""" 例子：
class NeuralNetwork(nn.Module):
    def __init__(self):
        super().__init__()
        self.flatten = nn.Flatten()
        self.linear_relu_stack = nn.Sequential(
            nn.Linear(28*28, 512),
            nn.ReLU(),
            nn.Linear(512, 512),
            nn.ReLU(),
            nn.Linear(512, 10),
        )

    def forward(self, x):
        x = self.flatten(x)
        logits = self.linear_relu_stack(x)
        return logits
"""" 
#实例化模型并移动到设备
# 创建模型实例
model = NeuralNetwork()

# 将模型移动到指定设备（GPU/CPU）
model = model.to(device)

# 打印模型结构
print(model)


#使用模型进行预测
"""" 
Logits：原始预测分数，未归一化

Softmax：将logits转换为概率，所有概率和为1 (各个类的可能性)

argmax：获取概率最大的类别索引 （最大可能性的类）
""""
# 创建随机输入数据（模拟一张28x28的图像）
X = torch.rand(1, 28, 28, device=device)  # 形状：[1, 28, 28]

# 使用模型进行预测（不要直接调用model.forward()！）
logits = model(X)  # 自动调用forward方法

# 将logits转换为概率
pred_probab = nn.Softmax(dim=1)(logits)

# 获取预测类别
y_pred = pred_probab.argmax(1)

print(f"Predicted class: {y_pred}")



In [37]:
# Model Layers
# 创建3张28x28的随机图像（模拟一个批次）
input_image = torch.rand(3, 28, 28)
print(f"输入图像大小: {input_image.size()}")  # torch.Size([3, 28, 28])

输入图像大小: torch.Size([3, 28, 28])


核心层理解：

Flatten：多维→一维，保持批次维度

Linear：全连接层，y = xW^T + b

ReLU：非线性激活，f(x) = max(0, x)

Sequential：层容器，按顺序执行

Softmax：概率归一化，dim参数很重要

In [38]:
# nn.Flatten  Flatten layer
# 创建Flatten层
flatten = nn.Flatten()

# 应用Flatten
flat_image = flatten(input_image)
print(f"展平后大小: {flat_image.size()}")  # torch.Size([3, 784])

# Flatten做了什么？
# 原始: [3, 28, 28] → 展平: [3, 784]
# 保持批次维度(3)，将每个28x28图像拉平成784个像素的向量

展平后大小: torch.Size([3, 784])


In [39]:
# nn.Linear  Linear Layer （改变维度， 调整学习部分）
# 创建Linear(全连接)层
layer1 = nn.Linear(in_features=28*28, out_features=20)

# 应用全连接层
hidden1 = layer1(flat_image)
print(f"全连接层输出大小: {hidden1.size()}")  # torch.Size([3, 20])

# Linear层数学运算：
# output = input × weight^T + bias
# [3, 784] × [784, 20]^T + [20] = [3, 20]


# Sample
# Linear层将输入特征空间映射到新的特征空间
# 例如：将784像素值 → 512个高级特征

class FeatureTransformationDemo:
    def __init__(self):
        self.layer1 = nn.Linear(784, 256)  # 784维 → 256维
        self.layer2 = nn.Linear(256, 128)  # 256维 → 128维
        self.layer3 = nn.Linear(128, 10)   # 128维 → 10维（分类）
    
    def forward(self, x):
        x = self.layer1(x)  # 学习低级特征组合
        x = torch.relu(x)   # 引入非线性
        x = self.layer2(x)  # 学习中级特征
        x = torch.relu(x)
        x = self.layer3(x)  # 学习高级特征用于分类
        return x

class ImageClassifier(nn.Module):
    def __init__(self):
        super().__init__()
        # 将28x28图像展平为784维向量
        self.flatten = nn.Flatten()
        
        # 多层Linear层学习层次特征
        self.network = nn.Sequential(
            nn.Linear(28*28, 512),  # 学习边缘、纹理等低级特征
            nn.ReLU(),
            nn.Linear(512, 256),    # 学习形状、部件等中级特征  
            nn.ReLU(),
            nn.Linear(256, 10)      # 学习类别相关的高级特征
        )
    
    def forward(self, x):
        x = self.flatten(x)
        return self.network(x)

全连接层输出大小: torch.Size([3, 20])


In [None]:
# nn.ReLU  ReLU激活函数
print("ReLU激活函数演示:")
print(f"ReLU前:\n{hidden1}")

# 应用ReLU激活函数
hidden1 = nn.ReLU()(hidden1)
print(f"ReLU后:\n{hidden1}")

# ReLU函数：f(x) = max(0, x)
# 作用：引入非线性，让神经网络可以学习复杂模式
# 特点：将所有负值设为0，正值保持不变

# ReLU效果示例
输入:  [0.5, -0.3, 1.2, -2.0]
ReLU后: [0.5, 0.0, 1.2, 0.0]

In [None]:
# nn.Sequential   （Sequential， 层容器）
# 使用Sequential组合多个层
seq_modules = nn.Sequential(
    flatten,           # 展平层
    layer1,            # 第一个全连接层
    nn.ReLU(),         # 激活函数
    nn.Linear(20, 10)  # 输出层
)

# 使用Sequential进行前向传播
input_image = torch.rand(3, 28, 28)
logits = seq_modules(input_image)
print(f"Sequential输出大小: {logits.size()}")  # torch.Size([3, 10])

# Softmax 函数

对于输入向量 **z** = [z₁, z₂, ..., zₖ]，Softmax 的计算公式为：

$$
\text{Softmax}(z_i) = \frac{e^{z_i}}{\sum_{j=1}^{K} e^{z_j}}
$$

## 公式说明

- **zᵢ**：第 i 个类别的原始分数（logit）
- **eᶻⁱ**：第 i 个类别的指数
- **分母**：所有类别指数的总和
- **K**：类别总数

## 数学特性

1. **输出范围**：Softmax(zᵢ) ∈ (0, 1)
2. **概率分布**：∑ Softmax(zᵢ) = 1
3. **单调性**：保持原始分数的相对顺序


In [None]:
# nn.Softmax  （Softmax层，概率归一化，dim参数很重要）
""""
Softmax函数将一个包含任意实数的向量（logits）转换为一个概率分布
所有值都在(0,1)范围内

所有值的和为1

较大的输入值对应较大的输出概率
""""
# 创建Softmax层
softmax = nn.Softmax(dim=1)

# 应用Softmax
pred_probab = softmax(logits)
print(f"Softmax输出: {pred_probab}")
print(f"概率总和: {pred_probab.sum(dim=1)}")  # 每行和为1.0

# dim=1的含义：在哪个维度上进行归一化
# 对于形状 [3, 10]，dim=1 表示对每个样本的10个类别分数进行归一化

In [None]:
# nn.Dropout 防止过拟合工具
""""

Dropout是一种正则化技术，
在训练过程中随机"丢弃"（设置为0）一部分神经元，
强制网络学习更鲁棒的特征，防止过拟合

想象你在准备考试：

没有Dropout：你只依赖几个特定的记忆点

使用Dropout：你被迫用不同的方式回忆知识，建立更全面的理解

""""
    
    # 创建Dropout层，p=0.5表示50%的神经元会被随机丢弃
    dropout = nn.Dropout(p=0.5)
    
    # 创建输入数据
    input_data = torch.ones(2, 5)  # 2个样本，每个样本5个特征
    print(f"输入数据:\n{input_data}")
    
    # 训练模式下的Dropout
    dropout.train()  # 设置为训练模式
    output_train = dropout(input_data)
    print(f"训练模式输出:\n{output_train}")
    
    # 推理模式下的Dropout  
    dropout.eval()   # 设置为推理模式
    output_eval = dropout(input_data)
    print(f"推理模式输出:\n{output_eval}")


参数类型说明：

weight：权重矩阵，形状为 [in_features 输出特征, out_features 输入特征]

bias：偏置向量，形状为 [out_features 输出特征]

In [None]:
# Model Parameters
print(f"Model structure: {model}\n\n")

for name, param in model.named_parameters():
    print(f"Layer: {name} | Size: {param.size()} | Values : {param[:2]} \n")