In [9]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torchvision import datasets, transforms
from torch.utils.data import DataLoader
import numpy as np

In [15]:
# Adaptive Learning Rate
class AdaptiveLR():
    """ Implements adaptive learning rate adjustment. """
    def __init__(self, optimizer, factor=0.5, patience=3, min_lr=1e-6):
        """
        Args:
            optimizer (torch.optim.Optimizer): How long to wait after last time validation loss improved.
                            Default: 5
            factor (float): Factor by which the learning rate will be reduced.
                        Default: 0.9
            patience (int): Number of epochs with no improvement before reducing LR.
                        Default: 5
            min_lr (float): Lower bound on the learning rate.
                        Default: 1e-6
        """
        self.optimizer = optimizer
        self.factor = factor
        self.patience = patience
        self.min_lr = min_lr
        self.best_loss = float('inf')
        self.wait = 0

    def step(self, loss):
        if loss < self.best_loss:
            self.best_loss = loss
            self.wait = 0
        else:
            self.wait += 1
            if self.wait >= self.patience:
                for param_group in self.optimizer.param_groups:
                    new_lr = max(param_group['lr'] * self.factor, self.min_lr)
                    param_group['lr'] = new_lr
                self.wait = 0


In [3]:
# L2 Regularization
def l2_regularization(model, lambda_reg):
    """ Computes L2 regularization loss. """
    """
    Args:
        model (nn.Module): The neural network model.
        lambda_reg (float): Regularization coefficient.
    """
    l2_loss = sum(p.pow(2.0).sum() for p in model.parameters())
    return lambda_reg * l2_loss

In [4]:
# Early Stopping
class EarlyStopping():
    """Early stops the training if validation loss doesn't improve after a given patience."""

    def __init__(self, patience=5, delta=0, path=None):
        """
        Args:
            patience (int): How long to wait after last time validation loss improved.
                            Default: 5
            verbose (bool): If True, prints a message for each validation loss improvement.
                            Default: False
            delta (float): Minimum change in the monitored quantity to qualify as an improvement.
                            Default: 0
            path (str or None): Path for the checkpoint to be saved to.
                            Default: 'checkpoint.pt'
        """
        self.patience = patience
        self.counter = 0
        self.best_score = None
        self.early_stop = False
        self.val_loss_min = np.Inf
        self.delta = delta
        self.path = path

    def __call__(self, val_loss, model):

        score = -val_loss

        if self.best_score is None:
            self.best_score = score
            self.save_checkpoint(val_loss, model)
        elif score < self.best_score + self.delta:
            self.counter += 1
            if self.counter >= self.patience:
                self.early_stop = True
        else:
            self.best_score = score
            self.save_checkpoint(val_loss, model)
            self.counter = 0

    def save_checkpoint(self, val_loss, model):
        '''Saves model when validation loss decrease.'''
        if self.path is not None:
            torch.save(model.state_dict(), self.path)
        self.val_loss_min = val_loss

In [5]:
# Dropout (Manual Implementation)
class CustomDropout(nn.Module):
    """ Implements dropout. """
    def __init__(self, p=0.5):
        """
        Args:
            p (float): Dropout probability.
                      Default: 0.5
        """
        super().__init__()
        self.p = p

    def forward(self, x):
        if self.training:
            mask = (torch.rand_like(x) > self.p).float()
            return x * mask / (1 - self.p)
        return x

In [6]:
# Weight Initialization (still implementing?)
def WeightInit(m):
    """ Applies Xavier initialization to linear layers. """
    """
    Args:
        m (nn.Module): Module to be initialized.
    """
    if isinstance(m, nn.Linear):
        torch.nn.init.xavier_uniform_(m.weight)
        if m.bias is not None:
            torch.nn.init.zeros_(m.bias)

In [7]:
# Batch Normalization
class CustomBatchNorm(nn.Module):
    """ Implements batch normalization without using PyTorch's built-in BN layers. """
    def __init__(self, num_features, momentum=0.1, eps=1e-5):
        """
        Args:
          num_features (int): Number of features in input.
          momentum (float): Momentum factor for moving average.
                            Default: 0.1
          eps (float): Small value to avoid division by zero.
                      Default: 1e-5
        """
        super().__init__()
        self.gamma = nn.Parameter(torch.ones(num_features))
        self.beta = nn.Parameter(torch.zeros(num_features))
        self.momentum = momentum
        self.eps = eps
        self.running_mean = torch.zeros(num_features)
        self.running_var = torch.ones(num_features)

    def forward(self, x):
        if self.training:
            mean = x.mean(dim=0)
            var = x.var(dim=0, unbiased=False)
            self.running_mean = (1 - self.momentum) * self.running_mean + self.momentum * mean
            self.running_var = (1 - self.momentum) * self.running_var + self.momentum * var
        else:
            mean = self.running_mean
            var = self.running_var
        x_norm = (x - mean) / torch.sqrt(var + self.eps)
        return self.gamma * x_norm + self.beta

In [17]:
#Full Example

class ExampleModel(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super().__init__()
        self.apply(WeightInit)
        self.fc1 = nn.Linear(input_size, hidden_size)
        self.bn1 = CustomBatchNorm(hidden_size)
        self.dropout1 = CustomDropout(0.3)
        self.fc2 = nn.Linear(hidden_size, output_size)


    def forward(self, x):
        x = self.fc1(x)
        x = self.bn1(x)
        x = self.dropout1(x)
        x = F.relu(x)

        x = self.fc2(x)
        return x

# Training loop with all implemented techniques
def train(model, train_loader, optimizer, criterion, num_epochs=50):
    adaptive_lr = AdaptiveLR(optimizer)
    early_stopping = EarlyStopping(patience=5)
    for epoch in range(num_epochs):
        total_loss = 0
        for inputs, targets in train_loader:
            inputs, targets = inputs.view(inputs.size(0), -1), targets
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, targets) + l2_regularization(model, lambda_reg=1e-4)
            loss.backward()
            optimizer.step()
            total_loss += loss.item()
        avg_loss = total_loss / len(train_loader)
        adaptive_lr.step(avg_loss)
        early_stopping.__call__(avg_loss, model)
        if early_stopping.early_stop:
            print("Early stopping triggered.")
            break
        print(f"Epoch {epoch+1}, Loss: {avg_loss:.4f}")

# Example usage
dataset = datasets.MNIST(root='./data', train=True, transform=transforms.ToTensor(), download=True)
train_loader = DataLoader(dataset, batch_size=64, shuffle=True)
model = ExampleModel(28*28, 128, 10)
optimizer = optim.Adam(model.parameters(), lr=0.005)
criterion = nn.CrossEntropyLoss()
train(model, train_loader, optimizer, criterion)

Epoch 1, Loss: 0.3332
Epoch 2, Loss: 0.2694
Epoch 3, Loss: 0.2670
Epoch 4, Loss: 0.2681
Epoch 5, Loss: 0.2670
Epoch 6, Loss: 0.2711
Epoch 7, Loss: 0.2680
Epoch 8, Loss: 0.2712
Epoch 9, Loss: 0.2260
Epoch 10, Loss: 0.2116
Epoch 11, Loss: 0.2057
Epoch 12, Loss: 0.2034
Epoch 13, Loss: 0.2029
Epoch 14, Loss: 0.2007
Epoch 15, Loss: 0.1995
Epoch 16, Loss: 0.2016
Epoch 17, Loss: 0.1980
Epoch 18, Loss: 0.1976
Epoch 19, Loss: 0.2000
Epoch 20, Loss: 0.1977
Epoch 21, Loss: 0.1971
Epoch 22, Loss: 0.1990
Epoch 23, Loss: 0.1986
Epoch 24, Loss: 0.1986
Epoch 25, Loss: 0.1751
Epoch 26, Loss: 0.1645
Epoch 27, Loss: 0.1596
Epoch 28, Loss: 0.1618
Epoch 29, Loss: 0.1590
Epoch 30, Loss: 0.1555
Epoch 31, Loss: 0.1539
Epoch 32, Loss: 0.1548
Epoch 33, Loss: 0.1558
Epoch 34, Loss: 0.1558
Epoch 35, Loss: 0.1394
Epoch 36, Loss: 0.1348
Epoch 37, Loss: 0.1318
Epoch 38, Loss: 0.1305
Epoch 39, Loss: 0.1274
Epoch 40, Loss: 0.1283
Epoch 41, Loss: 0.1256
Epoch 42, Loss: 0.1257
Epoch 43, Loss: 0.1258
Epoch 44, Loss: 0.12

In [18]:
#Adaptive LR Example

class ExampleModel1(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super().__init__()
        self.apply(WeightInit)
        self.fc1 = nn.Linear(input_size, hidden_size)
        self.bn1 = CustomBatchNorm(hidden_size)
        self.dropout1 = CustomDropout(0.3)
        self.fc2 = nn.Linear(hidden_size, output_size)


    def forward(self, x):
        x = self.fc1(x)
        x = self.bn1(x)
        x = self.dropout1(x)
        x = F.relu(x)

        x = self.fc2(x)
        return x

# Training loop with all implemented techniques
def train(model, train_loader, optimizer, criterion, num_epochs=20):
    adaptive_lr = AdaptiveLR(optimizer)
    for epoch in range(num_epochs):
        total_loss = 0
        for inputs, targets in train_loader:
            inputs, targets = inputs.view(inputs.size(0), -1), targets
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, targets) + l2_regularization(model, lambda_reg=1e-4)
            loss.backward()
            optimizer.step()
            total_loss += loss.item()
        avg_loss = total_loss / len(train_loader)
        adaptive_lr.step(avg_loss)
        print(f"Epoch {epoch+1}, Loss: {avg_loss:.4f}")

# Example usage
dataset = datasets.MNIST(root='./data', train=True, transform=transforms.ToTensor(), download=True)
train_loader = DataLoader(dataset, batch_size=64, shuffle=True)
model = ExampleModel1(28*28, 128, 10)
optimizer = optim.Adam(model.parameters(), lr=0.005)
criterion = nn.CrossEntropyLoss()
train(model, train_loader, optimizer, criterion)

Epoch 1, Loss: 0.3334
Epoch 2, Loss: 0.2730
Epoch 3, Loss: 0.2698
Epoch 4, Loss: 0.2683
Epoch 5, Loss: 0.2703
Epoch 6, Loss: 0.2716
Epoch 7, Loss: 0.2675
Epoch 8, Loss: 0.2710
Epoch 9, Loss: 0.2687
Epoch 10, Loss: 0.2682
Epoch 11, Loss: 0.2231
Epoch 12, Loss: 0.2077
Epoch 13, Loss: 0.2073
Epoch 14, Loss: 0.2044
Epoch 15, Loss: 0.2005
Epoch 16, Loss: 0.1998
Epoch 17, Loss: 0.2014
Epoch 18, Loss: 0.1994
Epoch 19, Loss: 0.1975
Epoch 20, Loss: 0.1993


In [22]:
#Early Stopping Example
class ExampleModel2(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super().__init__()
        self.apply(WeightInit)
        self.fc1 = nn.Linear(input_size, hidden_size)
        self.bn1 = CustomBatchNorm(hidden_size)
        self.dropout1 = CustomDropout(0.3)
        self.fc2 = nn.Linear(hidden_size, output_size)


    def forward(self, x):
        x = self.fc1(x)
        x = self.bn1(x)
        x = self.dropout1(x)
        x = F.relu(x)

        x = self.fc2(x)
        return x

# Training loop with all implemented techniques
def train(model, train_loader, optimizer, criterion, num_epochs=50):
    early_stopping = EarlyStopping(patience=5)
    for epoch in range(num_epochs):
        total_loss = 0
        for inputs, targets in train_loader:
            inputs, targets = inputs.view(inputs.size(0), -1), targets
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, targets) + l2_regularization(model, lambda_reg=1e-4)
            loss.backward()
            optimizer.step()
            total_loss += loss.item()
        avg_loss = total_loss / len(train_loader)
        early_stopping.__call__(avg_loss, model)
        if early_stopping.early_stop:
            print("Early stopping triggered.")
            break
        print(f"Epoch {epoch+1}, Loss: {avg_loss:.4f}")

# Example usage
dataset = datasets.MNIST(root='./data', train=True, transform=transforms.ToTensor(), download=True)
train_loader = DataLoader(dataset, batch_size=64, shuffle=True)
model = ExampleModel2(28*28, 128, 10)
optimizer = optim.Adam(model.parameters(), lr=0.01)
criterion = nn.CrossEntropyLoss()
train(model, train_loader, optimizer, criterion)

Epoch 1, Loss: 0.3880
Epoch 2, Loss: 0.3620
Epoch 3, Loss: 0.3661
Epoch 4, Loss: 0.3670
Epoch 5, Loss: 0.3709
Epoch 6, Loss: 0.3677
Early stopping triggered.
