In [1]:
import torch
torch.autograd.set_detect_anomaly(True)

<torch.autograd.anomaly_mode.set_detect_anomaly at 0x7f3cfe467520>

In [2]:
import os
print("Available cores:", os.cpu_count())

# Check and display the GPU environment details using TensorFlow
if torch.cuda.is_available():
    print("Available GPUs:", [torch.cuda.get_device_name(i) for i in range(torch.cuda.device_count())])
    print("Num GPUs Available: ", torch.cuda.device_count())
else:
    print("No GPUs available.")

Available cores: 16
Available GPUs: ['NVIDIA GeForce RTX 2060 with Max-Q Design']
Num GPUs Available:  1


In [3]:
import torch
from torch import nn
from torch.autograd import Function

class DefaultBatchNorm2dGrad(Function):
    @staticmethod
    def forward(ctx, input, weight, bias, running_mean, running_var, training, momentum, eps):
        N, C, H, W = input.shape
        if training or running_mean is None:
            mean = input.mean([0, 2, 3], keepdim=True)  # Shape: [1, C, 1, 1]
            var = input.var([0, 2, 3], keepdim=True, unbiased=False)  # Shape: [1, C, 1, 1]
            if running_mean is not None:
                running_mean.mul_(1 - momentum).add_(mean.squeeze() * momentum)
                running_var.mul_(1 - momentum).add_(var.squeeze() * momentum)
        else:
            mean = running_mean.view(1, C, 1, 1)
            var = running_var.view(1, C, 1, 1)

        ctx.save_for_backward(input, weight, bias, mean, var)
        ctx.eps = eps

        input_normalized = (input - mean) / torch.sqrt(var + eps)
        output = weight.view(1, C, 1, 1) * input_normalized + bias.view(1, C, 1, 1)
        return output
    
    @staticmethod
    def backward(ctx, grad_output):
        input, weight, bias, mean, var = ctx.saved_tensors
        eps = ctx.eps
        input_normalized = (input - mean) / torch.sqrt(var + eps)
        grad_input = grad_weight = grad_bias = None
        if ctx.needs_input_grad[0]:
            grad_input = (grad_output * weight.view(1, -1, 1, 1)).div(torch.sqrt(var + eps))
        if ctx.needs_input_grad[1]:
            grad_weight = (grad_output * input_normalized).sum(dim=[0, 2, 3])
        if ctx.needs_input_grad[2]:
            grad_bias = grad_output.sum(dim=[0, 2, 3])
        return grad_input, grad_weight, grad_bias, None, None, None, None, None

class DefaultBatchNorm2d(nn.Module):
    def __init__(self, num_features, eps=1e-5, momentum=0.1, affine=True, track_running_stats=True):
        super(DefaultBatchNorm2d, self).__init__()
        self.num_features = num_features
        self.eps = eps
        self.momentum = momentum
        self.affine = affine
        self.track_running_stats = track_running_stats
        if affine:
            self.weight = nn.Parameter(torch.ones(num_features))
            self.bias = nn.Parameter(torch.zeros(num_features))
        if track_running_stats:
            self.register_buffer('running_mean', torch.zeros(num_features))
            self.register_buffer('running_var', torch.ones(num_features))
        else:
            self.register_buffer('running_mean', None)
            self.register_buffer('running_var', None)
        self.reset_parameters()

    def reset_parameters(self):
        if self.track_running_stats:
            self.running_mean.zero_()
            self.running_var.fill_(1)
        if self.affine:
            nn.init.ones_(self.weight)
            nn.init.zeros_(self.bias)

    def forward(self, input):
        return DefaultBatchNorm2dGrad.apply(
            input, 
            self.weight, 
            self.bias, 
            self.running_mean, 
            self.running_var, 
            self.training, 
            self.momentum, 
            self.eps
        )


In [4]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class AdvancedConvNet(nn.Module):
    def __init__(self, in_channels, height, width, num_classes,
                 conv_module=nn.Conv2d,
                 linear_module=nn.Linear,
                 bn_module=nn.BatchNorm2d,
                 bn_options={'affine': True, 'track_running_stats': True}):
        super(AdvancedConvNet, self).__init__()
        self.in_channels = in_channels
        self.height = height
        self.width = width
        self.num_classes = num_classes

        self.conv1 = conv_module(in_channels, 64, kernel_size=3, padding=1)
        self.bn1 = bn_module(64, **bn_options)
        self.conv2 = conv_module(64, 128, kernel_size=3, padding=1)
        self.bn2 = bn_module(128, **bn_options)
        self.pool1 = nn.MaxPool2d(2, 2)
        self.conv3 = conv_module(128, 256, kernel_size=3, padding=1)
        self.bn3 = bn_module(256, **bn_options)
        self.conv4 = conv_module(256, 256, kernel_size=3, padding=1)
        self.bn4 = bn_module(256, **bn_options)
        self.pool2 = nn.MaxPool2d(2, 2)

        fc_input_features = 256 * (height // 4) * (width // 4)
        self.dropout = nn.Dropout(0.0)
        self.fc1 = linear_module(fc_input_features, 1024)
        self.fc2 = linear_module(1024, 512)
        self.fc3 = linear_module(512, num_classes)

    def forward(self, x):
        x = F.relu(self.bn1(self.conv1(x)))
        x = self.pool1(F.relu(self.bn2(self.conv2(x))))
        x = F.relu(self.bn3(self.conv3(x)))
        x = self.pool2(F.relu(self.bn4(self.conv4(x))))
        x = x.view(-1, 256 * (self.height // 4) * (self.width // 4))
        x = F.relu(self.fc1(self.dropout(x)))
        x = F.relu(self.fc2(self.dropout(x)))
        x = self.fc3(x)
        return x

In [5]:
import torch.optim as optim
import random
import numpy as np
import torch
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader
from torchvision import datasets, transforms
import random
import numpy as np


def set_seed(seed=0):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False
      
transform = transforms.Compose([
    transforms.RandomHorizontalFlip(),
    transforms.RandomCrop(32, padding=4),
    transforms.ToTensor(),
    transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
])

train_dataset = datasets.CIFAR10(root='./data', train=True, download=True, transform=transform)
test_dataset = datasets.CIFAR10(root='./data', train=False, download=True, transform=transform)

train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)

_smp_data, _ = train_dataset[0]
num_channels, height, width = _smp_data.shape
num_classes = len(train_dataset.classes)

print("Number of Channels:", num_channels)
print("Width:", width)
print("Height:", height)
print("Number of Classes:", num_classes)


def train_and_evaluate(bn_module, affine=True, track_running_stats=True):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = AdvancedConvNet(num_channels, height, width, num_classes,
                            bn_module=bn_module,
                            bn_options={'affine': affine, 'track_running_stats': track_running_stats}).to(device)
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.SGD(model.parameters(), lr=0.005, momentum=0.9)

    for epoch in range(1):
        model.train()
        for data, target in train_loader:
            data, target = data.to(device), target.to(device)
            optimizer.zero_grad()
            output = model(data)
            loss = criterion(output, target)
            loss.backward()
            optimizer.step()

        model.eval()
        test_loss = 0
        correct = 0
        with torch.no_grad():
            for data, target in test_loader:
                data, target = data.to(device), target.to(device)
                output = model(data)
                test_loss += criterion(output, target).item()
                pred = output.argmax(dim=1, keepdim=True)
                correct += pred.eq(target.view_as(pred)).sum().item()

        test_loss /= len(test_loader.dataset)
        accuracy = correct / len(test_loader.dataset)
        print(f'Epoch {epoch + 1}: Test Loss: {test_loss:.4f}, Accuracy: {accuracy:.4f}')

for affine in [True, False]:
    for track_running_stats in [True, False]:
        set_seed()
        print(f"Training with DefaultBatchNorm2d, affine={affine}, track_running_stats={track_running_stats}:")
        train_and_evaluate(DefaultBatchNorm2d, affine, track_running_stats)

        set_seed()
        print(f"Training with nn.BatchNorm2d, affine={affine}, track_running_stats={track_running_stats}:")
        train_and_evaluate(nn.BatchNorm2d, affine, track_running_stats)


Files already downloaded and verified
Files already downloaded and verified
Number of Channels: 3
Width: 32
Height: 32
Number of Classes: 10
Training with DefaultBatchNorm2d, affine=True, track_running_stats=True:
Epoch 1: Test Loss: 0.0273, Accuracy: 0.3739
Training with nn.BatchNorm2d, affine=True, track_running_stats=True:
Epoch 1: Test Loss: 0.0216, Accuracy: 0.5238
Training with DefaultBatchNorm2d, affine=True, track_running_stats=False:
Epoch 1: Test Loss: 0.0255, Accuracy: 0.3944
Training with nn.BatchNorm2d, affine=True, track_running_stats=False:
Epoch 1: Test Loss: 0.0180, Accuracy: 0.5880
Training with DefaultBatchNorm2d, affine=False, track_running_stats=True:


AttributeError: 'DefaultBatchNorm2d' object has no attribute 'weight'