In [2]:
import copy
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision
import torchvision.transforms as transforms

print(torch.__version__)        # 查看 PyTorch 版本
print(torch.version.cuda)       # 查看 CUDA 版本
print(torch.cuda.is_available())  # 查看是否检测到 CUDA 可用

print(torch.cuda.is_available())  # 如果返回 True，说明可以使用 GPU
print(torch.cuda.current_device())  # 获取当前 GPU 的索引
print(torch.cuda.device(0))  # 获取 GPU 设备信息
print(torch.cuda.get_device_name(0))  # 获取 GPU 设备名称






2.5.1
12.1
True
True
0
<torch.cuda.device object at 0x0000017EBC067F20>
NVIDIA GeForce RTX 4060 Laptop GPU


# PyTorch
In this notebook you will gain some hands-on experience with [PyTorch](https://pytorch.org/), one of the major frameworks for deep learning. To install PyTorch. follow [the official installation instructions](https://pytorch.org/get-started/locally/). Make sure that you select the correct OS & select the version with CUDA if your computer supports it.
If you do not have an Nvidia GPU, you can install the CPU version by setting `CUDA` to `None`.
However, in this case we recommend using [Google Colab](https://colab.research.google.com/).
Make sure that you enable GPU acceleration in `Runtime > Change runtime type`.

You will start by re-implementing some common features of deep neural networks (dropout and batch normalization) and then implement a very popular modern architecture for image classification (ResNet) and improve its training loop.

# 1. Dropout
Dropout is a form of regularization for neural networks. It works by randomly setting activations (values) to 0, each one with equal probability `p`. The values are then scaled by a factor $\frac{1}{1-p}$ to conserve their mean.

Dropout effectively trains a pseudo-ensemble of models with stochastic gradient descent. During evaluation we want to use the full ensemble and therefore have to turn off dropout. Use `self.training` to check if the model is in training or evaluation mode.

Do not use any dropout implementation from PyTorch for this!

In [6]:
class Dropout(nn.Module):
    """
    Dropout, as discussed in the lecture and described here:
    https://pytorch.org/docs/stable/nn.html#torch.nn.Dropout
    
    Args:
        p: float, dropout probability
    """
    def __init__(self, p=0.5):
        super(Dropout, self).__init__()
        assert 0 <= p <= 1, "Dropout rate p must be between 0 and 1."
        self.p = p
        
    def forward(self, input):
        """
        The module's forward pass.
        This has to be implemented for every PyTorch module.
        PyTorch then automatically generates the backward pass
        by dynamically generating the computational graph during
        execution.
        
        Args:
            input: PyTorch tensor, arbitrary shape

        Returns:
            PyTorch tensor, same shape as input
        """
        
        # TODO: Set values randomly to 0.
        # Check if we are in training mode
        if self.training:
            # Create a mask with the same shape as the input
            mask = (torch.rand_like(input) > self.p).float()  # Random mask based on p
            return input * mask / (1 - self.p)  # Scale the input to maintain the expected value
        else:
            # If not in training, return the input as is (no dropout applied)
            return input

In [8]:
# Test dropout
test = torch.rand(10_000)
dropout = Dropout(0.2)
test_dropped = dropout(test)

# These assertions can in principle fail due to bad luck, but
# if implemented correctly they should almost always succeed.
assert np.isclose(test_dropped.mean().item(), test.mean().item(), atol=1e-2)
assert np.isclose((test_dropped > 0).float().mean().item(), 0.8, atol=1e-2)

# 2. Batch normalization
Batch normalization is a trick use to smoothen the loss landscape and improve training. It is defined as the function
$$y = \frac{x - \mu_x}{\sigma_x + \epsilon} \cdot \gamma + \beta$$,
where $\gamma$ and $\beta$ and learnable parameters and $\epsilon$ is a some small number to avoid dividing by zero. The Statistics $\mu_x$ and $\sigma_x$ are taken separately for each feature. In a CNN this means averaging over the batch and all pixels.

Do not use any batch normalization implementation from PyTorch for this!

In [11]:
class BatchNorm(nn.Module):
    """
    Batch normalization, as discussed in the lecture and similar to
    https://pytorch.org/docs/stable/nn.html#torch.nn.BatchNorm1d
    
    Only uses batch statistics (no running mean for evaluation).
    Batch statistics are calculated for a single dimension.
    Gamma is initialized as 1, beta as 0.
    
    Args:
        num_features: Number of features to calculate batch statistics for.
    """
    def __init__(self, num_features):
        super().__init__()
        
        # TODO: Initialize the required parameters
        # Initialize the required parameters
        self.gamma = nn.Parameter(torch.ones(1, num_features, 1))  # Shape: [1, C, 1]
        self.beta = nn.Parameter(torch.zeros(1, num_features, 1))  # Shape: [1, C, 1]
        
    def forward(self, input):
        """
        Batch normalization over the dimension C of (N, C, L).
        
        Args:
            input: PyTorch tensor, shape [N, C, L]
            
        Return:
            PyTorch tensor, same shape as input
        """
        eps = 1e-5
        
        # TODO: Implement the required transformation
        assert len(input.shape) == 3, "Input must have shape [N, C, L]"
        mean = input.mean(dim=(0, 2), keepdim=True)
        var = input.var(dim=(0, 2), keepdim=True, unbiased=False)
        return self.gamma * (input - mean) / (torch.sqrt(var + 1e-5)) + self.beta

       

In [13]:
# Tests the batch normalization implementation
torch.random.manual_seed(42)
test = torch.randn(8, 2, 4)

b1 = BatchNorm(2)
test_b1 = b1(test)

b2 = nn.BatchNorm1d(2, affine=False, track_running_stats=False)
test_b2 = b2(test)

assert torch.allclose(test_b1, test_b2, rtol=0.02)

# 3. ResNet
ResNet is the models that first introduced residual connections (a form of skip connections). It is a rather simple, but successful and very popular architecture. In this part of the exercise we will re-implement it step by step.

Note that there is also an [improved version of ResNet](https://arxiv.org/abs/1603.05027) with optimized residual blocks. Here we will implement the [original version](https://arxiv.org/abs/1512.03385) for CIFAR-10. Your dropout and batchnorm implementations won't help you here. Just use PyTorch's own layers.

This is just a convenience function to make e.g. `nn.Sequential` more flexible. It is e.g. useful in combination with `x.squeeze()`.

In [17]:
class Lambda(nn.Module):
    def __init__(self, func):
        super().__init__()
        self.func = func

    def forward(self, x):
        return self.func(x)

We begin by implementing the residual blocks. The block is illustrated by this sketch:

![Residual connection](img/residual_connection.png)

Note that we use 'SAME' padding, no bias, and batch normalization after each convolution. You do not need `nn.Sequential` here. The skip connection is already implemented as `self.skip`. It can handle different strides and increases in the number of channels.

In [20]:
class ResidualBlock(nn.Module):
    """
    The residual block used by ResNet.
    
    Args:
        in_channels: The number of channels (feature maps) of the incoming embedding
        out_channels: The number of channels after the first convolution
        stride: Stride size of the first convolution, used for downsampling
    """
    
    def __init__(self, in_channels, out_channels, stride=1):
        
        super().__init__()
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.relu = nn.ReLU()
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1)
        self.bn2 = nn.BatchNorm2d(out_channels)
        
        # Skip connection for dimension matching
        self.skip = nn.Sequential()
        if stride != 1 or in_channels != out_channels:
            self.skip = nn.Sequential(
                nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=stride),
                nn.BatchNorm2d(out_channels)
            )
        # TODO: Initialize the required layers
        
    def forward(self, input):
        # TODO: Execute the required layers and functions
        # Pass input through the first convolutional layer
        out = self.conv1(input)
        out = self.bn1(out)#batch norm
        out = self.relu(out)
        
        # Pass the output through the second convolutional layer
        out = self.conv2(out)
        out = self.bn2(out)
        
        # Add the skip connection
        skip = self.skip(input)
        
        # The output of the residual block is the sum of the processed input and the skip connection
        out += skip
        
        # Apply the ReLU activation after the addition of the skip connection
        out = self.relu(out)
        
        return out

Next we implement a stack of residual blocks for convenience. The first layer in the block is the one changing the number of channels and downsampling. You can use `nn.ModuleList` to use a list of child modules.

In [23]:
class ResidualStack(nn.Module):
    """
    A stack of residual blocks.
    
    Args:
        in_channels: The number of channels (feature maps) of the incoming embedding
        out_channels: The number of channels after the first layer
        stride: Stride size of the first layer, used for downsampling
        num_blocks: Number of residual blocks
    """
    
    def __init__(self, in_channels, out_channels, stride, num_blocks):
        super().__init__()
        
        # TODO: Initialize the required layers (blocks)
        assert num_blocks > 0, "num_blocks must be a positive integer."
        self.blocks = nn.ModuleList()
        
        for i in range(num_blocks):
            self.blocks.append(ResidualBlock(in_channels, out_channels, stride))
            # Set in_channels for the next block to out_channels after the first block
            in_channels = out_channels
        
        
    def forward(self, input):
        # TODO: Execute the layers (blocks)
        # Pass the input through all the residual blocks sequentially
        for block in self.blocks:
            input = block(input)
        return input

Now we are finally ready to implement the full model! To do this, use the `nn.Sequential` API and carefully read the following paragraph from the paper (Fig. 3 is not important):

![ResNet CIFAR10 description](img/resnet_cifar10_description.png)

Note that a convolution layer is always convolution + batch norm + activation (ReLU), that each ResidualBlock contains 2 layers, and that you might have to `squeeze` the embedding before the dense (fully-connected) layer.

In [26]:
n = 5
num_classes = 10

# TODO: Implement ResNet via nn.Sequential
class ResNet(nn.Module):
    """
    A simple ResNet model using nn.Sequential.
    
    Args:
        n: Number of residual blocks
        num_classes: Number of output classes for classification
    """
    
    def __init__(self, n, num_classes):
        super().__init__()
        
        # Initial convolution layer
        self.initial_conv = nn.Conv2d(3, 64, kernel_size=7, stride=2, padding=3)
        self.initial_bn = nn.BatchNorm2d(64)
        self.initial_relu = nn.ReLU(inplace=True)
        
        # Define the residual blocks stack
        self.residual_stack = ResidualStack(64, 64, stride=1, num_blocks=n)
        
        # Global Average Pooling
        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        
        # Fully connected layer for classification
        self.fc = nn.Linear(64, num_classes)
    
    def forward(self, x):
        # Initial layers
        x = self.initial_conv(x)
        x = self.initial_bn(x)
        x = self.initial_relu(x)
        
        # Pass through residual stack
        x = self.residual_stack(x)
        
        # Global Average Pooling
        x = self.avgpool(x)  # (B, 64, 1, 1)
        x = torch.flatten(x, 1)  # Flatten for fully connected layer
        x = self.fc(x)  # Final classification layer
        return x


# Create the ResNet model
model = ResNet(n=n, num_classes=num_classes)

# Example input tensor with shape [batch_size=32, channels=3, height=224, width=224]
input_tensor = torch.randn(32, 3, 224, 224)

# Forward pass
output_tensor = model(input_tensor)
print(output_tensor.shape)  # Should output [32, 10] since there are 10 classes


torch.Size([32, 10])


Next we need to initialize the weights of our model.

In [29]:
def initialize_weight(module):
    if isinstance(module, (nn.Linear, nn.Conv2d)):
        nn.init.kaiming_normal_(module.weight, nonlinearity='relu')
    elif isinstance(module, nn.BatchNorm2d):
        nn.init.constant_(module.weight, 1)
        nn.init.constant_(module.bias, 0)
resnet = ResNet(n=5, num_classes = 10)      
resnet.apply(initialize_weight)
print(resnet)

ResNet(
  (initial_conv): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3))
  (initial_bn): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (initial_relu): ReLU(inplace=True)
  (residual_stack): ResidualStack(
    (blocks): ModuleList(
      (0-4): 5 x ResidualBlock(
        (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
        (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (relu): ReLU()
        (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
        (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (skip): Sequential()
      )
    )
  )
  (avgpool): AdaptiveAvgPool2d(output_size=(1, 1))
  (fc): Linear(in_features=64, out_features=10, bias=True)
)


# 4. Training
So now we have a shiny new model, but that doesn't really help when we can't train it. So that's what we do next.

First we need to load the data. Note that we split the official training data into train and validation sets, because you must not look at the test set until you are completely done developing your model and report the final results. Some people don't do this properly, but you should not copy other people's bad habits.

In [32]:
class CIFAR10Subset(torchvision.datasets.CIFAR10):
    """
    Get a subset of the CIFAR10 dataset, according to the passed indices.
    """
    def __init__(self, *args, idx=None, **kwargs):
        super().__init__(*args, **kwargs)
        
        if idx is None:
            return
        
        self.data = self.data[idx]
        targets_np = np.array(self.targets)
        self.targets = targets_np[idx].tolist()

We next define transformations that change the images into PyTorch tensors, standardize the values according to the precomputed mean and standard deviation, and provide data augmentation for the training set.

In [35]:
normalize = transforms.Normalize(mean=[0.485, 0.456, 0.406],
                                 std=[0.229, 0.224, 0.225])
transform_train = transforms.Compose([
    transforms.RandomHorizontalFlip(),
    transforms.RandomCrop(32, 4),
    transforms.ToTensor(),
    normalize,
])
transform_eval = transforms.Compose([
    transforms.ToTensor(),
    normalize
])

In [37]:
ntrain = 45_000
train_set = CIFAR10Subset(root='./data', train=True, idx=range(ntrain),
                          download=True, transform=transform_train)
val_set = CIFAR10Subset(root='./data', train=True, idx=range(ntrain, 50_000),
                        download=True, transform=transform_eval)
test_set = torchvision.datasets.CIFAR10(root='./data', train=False,
                                        download=True, transform=transform_eval)

Files already downloaded and verified
Files already downloaded and verified
Files already downloaded and verified


In [53]:
dataloaders = {}
dataloaders['train'] = torch.utils.data.DataLoader(train_set, batch_size=128,
                                                   shuffle=True, num_workers=2,
                                                   pin_memory=True)
dataloaders['val'] = torch.utils.data.DataLoader(val_set, batch_size=128,
                                                 shuffle=False, num_workers=2,
                                                 pin_memory=True)
dataloaders['test'] = torch.utils.data.DataLoader(test_set, batch_size=128,
                                                  shuffle=False, num_workers=2,
                                                  pin_memory=True)

Next we push the model to our GPU (if there is one).

In [None]:
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
resnet.to(device);
import torch
print(torch.cuda.is_available())  # 检查 CUDA 是否可用
print(torch.cuda.get_device_name(0))  # 显示 GPU 名称



Next we define a helper method that does one epoch of training or evaluation. We have only defined training here, so you need to implement the necessary changes for evaluation!

In [None]:
def run_epoch(model, optimizer, dataloader, train):
    """
    Run one epoch of training or evaluation.
    
    Args:
        model: The model used for prediction
        optimizer: Optimization algorithm for the model
        dataloader: Dataloader providing the data to run our model on
        train: Whether this epoch is used for training or evaluation
        
    Returns:
        Loss and accuracy in this epoch.
    """
    # TODO: Change the necessary parts to work correctly during evaluation (train=False)
    
    device = next(model.parameters()).device
    print(f"Model is on device: {device}")
    
    # Set the model to training or evaluation mode
    if train:
        model.train()
        print("Model set to TRAINING mode.")
    else:
        model.eval()
        print("Model set to EVALUATION mode.")
    
    epoch_loss = 0.0
    epoch_acc = 0.0
    total_samples = 0

    # Iterate over data
    for batch_idx, (xb, yb) in enumerate(dataloader):
        print(f"\nProcessing batch {batch_idx + 1}/{len(dataloader)}")
        xb, yb = xb.to(device), yb.to(device)
        print(f"Input batch shape: {xb.shape}, Target batch shape: {yb.shape}")

        # Zero the parameter gradients (only in training mode)
        if train:
            optimizer.zero_grad()
            print("Optimizer gradients zeroed.")

        # Forward pass
        with torch.set_grad_enabled(train):  # Disable gradients during evaluation
            pred = model(xb)
            print(f"Prediction shape: {pred.shape}")
            loss = F.cross_entropy(pred, yb)
            print(f"Batch loss: {loss.item():.4f}")

            # Calculate accuracy
            top1 = torch.argmax(pred, dim=1)
            ncorrect = torch.sum(top1 == yb).item()
            print(f"Number of correct predictions in this batch: {ncorrect}/{len(yb)}")

        # Update the loss and accuracy
        epoch_loss += loss.item() * len(yb)
        epoch_acc += ncorrect
        total_samples += len(yb)

        # Backpropagation and optimization (only during training)
        if train:
            loss.backward()
            optimizer.step()
            print("Model parameters updated.")


    # Average loss and accuracy over the dataset
    epoch_loss /= total_samples
    epoch_acc /= total_samples
    
    return epoch_loss, epoch_acc

Next we implement a method for fitting (training) our model. For many models early stopping can save a lot of training time. Your task is to add early stopping to the loop (based on validation accuracy). Early stopping usually means exiting the training loop if the validation accuracy hasn't improved for `patience` number of steps. Don't forget to save the best model parameters according to validation accuracy. You will need `copy.deepcopy` and the `state_dict` for this.

In [47]:
def fit(model, optimizer, lr_scheduler, dataloaders, max_epochs, patience):
    """
    Fit the given model on the dataset.
    
    Args:
        model: The model used for prediction
        optimizer: Optimization algorithm for the model
        lr_scheduler: Learning rate scheduler that improves training
                      in late epochs with learning rate decay
        dataloaders: Dataloaders for training and validation
        max_epochs: Maximum number of epochs for training
        patience: Number of epochs to wait with early stopping the
                  training if validation loss has decreased
                  
    Returns:
        Loss and accuracy in this epoch.
    """
    
    best_acc = 0
    best_model_weights = copy.deepcopy(model.state_dict())
    curr_patience = 0
    
    # for epoch in range(max_epochs):
    #     train_loss, train_acc = run_epoch(model, optimizer, dataloaders['train'], train=True)
    #     lr_scheduler.step()
    #     print(f"Epoch {epoch + 1: >3}/{max_epochs}, train loss: {train_loss:.2e}, accuracy: {train_acc * 100:.2f}%")
        
    #     val_loss, val_acc = run_epoch(model, None, dataloaders['val'], train=False)
    #     print(f"Epoch {epoch + 1: >3}/{max_epochs}, val loss: {val_loss:.2e}, accuracy: {val_acc * 100:.2f}%")
        
    #     # TODO: Add early stopping and save the best weights (in best_model_weights)
    #     # Check if validation loss has improved
    #     if val_acc > best_val_acc:
    #         best_val_acc = val_acc
    #         best_model_weights = copy.deepcopy(model.state_dict())  # Save the best model weights
    #         curr_patience = 0  # Reset patience counter
    #         print(f"New best validation accuracy: {best_val_acc * 100:.2f}% — Saving model weights.")
    #     else:
    #         curr_patience += 1  # Increment patience if no improvement
    #         print(f"No improvement in validation accuracy for {curr_patience} epoch(s).")

    #     # Early stopping check
    #     if curr_patience >= patience:
    #         print(f"Early stopping triggered. Validation accuracy did not improve for {patience} epochs.")
    #         break

    # # Restore the best model weights
    # model.load_state_dict(best_model_weights)
    # print(f"Training complete. Best validation accuracy: {best_val_acc * 100:.2f}%")
    
    # return best_val_acc, model
    for epoch in range(max_epochs):
        print(f"Starting epoch {epoch + 1}")

        # 训练阶段
        model.train()
        running_loss = 0.0
        for inputs, targets in dataloaders['train']:
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = nn.CrossEntropyLoss()(outputs, targets)
            loss.backward()
            optimizer.step()
            running_loss += loss.item()

        avg_train_loss = running_loss / len(dataloaders['train'])
        print(f"Epoch {epoch + 1} Training Loss: {avg_train_loss:.4f}")

        # 学习率调度器更新
        lr_scheduler.step()

        # 验证阶段
        model.eval()
        correct = 0
        total = 0
        with torch.no_grad():
            for inputs, targets in dataloaders['val']:
                outputs = model(inputs)
                _, predicted = torch.max(outputs, 1)
                correct += (predicted == targets).sum().item()
                total += targets.size(0)

        val_acc = correct / total
        print(f"Epoch {epoch + 1} Validation Accuracy: {val_acc:.4f}")

        # Early Stopping 判断
        if val_acc > best_val_acc:
            best_val_acc = val_acc
            best_model_weights = copy.deepcopy(model.state_dict())
            curr_patience = 0  # 重置耐心值
            print(f"New best validation accuracy: {best_val_acc:.4f}, model saved.")
        else:
            curr_patience += 1
            print(f"No improvement in validation accuracy for {curr_patience} epochs.")

        # 判断是否触发 Early Stopping
        if curr_patience >= patience:
            print("Early stopping triggered.")
            break

    # 加载最佳模型权重
    model.load_state_dict(best_model_weights)
    print("Training complete. Best model loaded.")
    return model

In most cases you should just use the Adam optimizer for training, because it works well out of the box. However, a well-tuned SGD (with momentum) will in most cases outperform Adam. And since the original paper gives us a well-tuned SGD we will just use that.

In [None]:
optimizer = torch.optim.SGD(resnet.parameters(), lr=0.1, momentum=0.9, weight_decay=1e-4)
lr_scheduler = torch.optim.lr_scheduler.MultiStepLR(optimizer, milestones=[100, 150], gamma=0.1)

# Fit model
fit(resnet, optimizer, lr_scheduler, dataloaders, max_epochs=200, patience=50)

Once the model is trained we run it on the test set to obtain our final accuracy.
Note that we can only look at the test set once, everything else would lead to overfitting. So you _must_ ignore the test set while developing your model!

In [None]:
test_loss, test_acc = run_epoch(resnet, None, dataloaders['test'], train=False)
print(f"Test loss: {test_loss:.1e}, accuracy: {test_acc * 100:.2f}%")

That's almost what was reported in the paper (92.49%) and we didn't even train on the full training set.

# Optional task: Squeeze out all the juice!

Can you do even better? Have a look at [A Recipe for Training Neural Networks](https://karpathy.github.io/2019/04/25/recipe/) and some state-of-the-art architectures such as [EfficientNet architecture](https://ai.googleblog.com/2019/05/efficientnet-improving-accuracy-and.html). Play around with the possibilities PyTorch offers you and see how close you can get to the [state of the art on CIFAR-10](https://paperswithcode.com/sota/image-classification-on-cifar-10).