In [None]:
!pip install wandb

In [None]:
import wandb
key = input('Enter your API:')
wandb.login(key=key)

In [None]:
import torch
import torchvision.transforms as transforms
from torchvision.datasets import ImageFolder
from torch.utils.data import DataLoader, random_split
import torch.nn as nn
import torch.nn.functional as F

# Choose GPU if available, else CPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")


# 1. Data Loading and Transform
transform = transforms.Compose([
    transforms.Resize((128, 128)),  # Resize to fixed size
    transforms.ToTensor(),  # Convert to tensor
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])  # Standard normalization
])

# Load training and testing datasets
train_data = ImageFolder(root='/kaggle/input/inaturalist-dataset/inaturalist_12K/train', transform=transform)
test_data = ImageFolder(root='/kaggle/input/inaturalist-dataset/inaturalist_12K/val', transform=transform)

# Split train_data into training and validation sets (80-20 split)
train_size = int(0.8 * len(train_data)) # Calculate 80%  data used for training
val_size = len(train_data) - train_size # 20%  data used for validation
train_dataset, val_dataset = random_split(train_data, [train_size, val_size]) # Split the dataset training and validation

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True, num_workers=2) # Create DataLoaders for train
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False, num_workers=2)  # Create DataLoaders for  val
test_loader = DataLoader(test_data, batch_size=32, shuffle=False, num_workers=2) # Create DataLoaders for test


class CustomCNN(nn.Module):
    def __init__(self, input_channels=3, num_filters=None, kernel_size=3, 
                 activation='relu', dense_neurons=128, num_classes=10, 
                 dropout_rate=0.3, batch_norm=True, num_conv_layers=5, input_size=128):
        super(CustomCNN, self).__init__()
        self.activation = activation  # Activation function to use
        self.batch_norm = batch_norm  # Apply batch normalization
        self.dropout_rate = dropout_rate # Dropout rate for regularization

        # Define convolution filters 
        if num_filters is None:
            num_filters = [32, 64, 128, 256, 512] # number of defults filters
        elif isinstance(num_filters, int):
            num_filters = [num_filters] * num_conv_layers # Repeat same filter count
        elif isinstance(num_filters, list):
            assert len(num_filters) >= num_conv_layers  # Ensure enough filters are defined

        # Handle kernel_size as int or list
        if isinstance(kernel_size, int):
            kernel_size = [kernel_size] * num_conv_layers # If kernel_size is a single integer convert it into a list with the same value repeated for each conv layer
        elif isinstance(kernel_size, list):
            assert len(kernel_size) >= num_conv_layers # If it is already a list, make sure it has enough values (at least one for each conv layer)
        else:
            raise TypeError("kernel_size must be a list or an integer.") # Raise an error if kernel_size is neither an int nor a list

        # Handle dense neurons as int or list
        if isinstance(dense_neurons, int):
            dense_neurons = [dense_neurons] # If it is a single number, convert it to a list with one element
        elif isinstance(dense_neurons, list):
            assert all(isinstance(x, int) for x in dense_neurons) # If it is a list, check that every element in the list is an integer
        else:
            raise TypeError("dense_neurons must be a list or an integer.") # Raise an error if it is neither an int nor a list

        # Build convolutional layers
        layers = [] # Initialize an empty list to store convolutional blocks
        in_channels = input_channels # Set the number of input channels (e.g., 3 for RGB images)
        for i in range(num_conv_layers):
            out_channels = num_filters[i] # Get the number of output filters 
            k_size = kernel_size[i] # Get the kernel size for this convolution  layer
            layers.append(self.create_conv_block(in_channels, out_channels, k_size)) # Create a convolutional block and add the list
            in_channels = out_channels # Set the input for the next layer to be the current output
        self.conv_layers = nn.Sequential(*layers) # Combine convolution blocks into a sequence

        # Dynamically calculate flattened size
        with torch.no_grad():
            dummy_input = torch.zeros(1, input_channels, input_size, input_size) # Create a fake image input (batch size 1) with given size
            dummy_output = self.conv_layers(dummy_input) # Pass the dummy input through the convolution layers
            self.flattened_size = dummy_output.view(1, -1).size(1) # Flatten the output and get the total number of features for the first dense layer

        # Build fully connected (dense) layers
        self.flatten = nn.Flatten() # Layer to flatten convolution output into 1D for the dense layers
        fc_layers = [] # Initialize an empty list to store dense layers
        in_features = self.flattened_size # Set input size for the first dense layer
        for out_features in dense_neurons:
            fc_layers.append(nn.Linear(in_features, out_features)) # Linear layer
            fc_layers.append(self.get_activation())  # Activation function
            fc_layers.append(nn.Dropout(self.dropout_rate)) # Dropout layer
            in_features = out_features # Set the number of input features for the next dense layer to the current layer's output size
        fc_layers.append(nn.Linear(in_features, num_classes))  # Final output layer
        self.fc_layers = nn.Sequential(*fc_layers) # Combine all dense layers into a single sequential layer for the forward pass

    # Create a single convolutional block
    def create_conv_block(self, in_channels, out_channels, kernel_size):
        padding = kernel_size // 2  # Same padding
        layers = [nn.Conv2d(in_channels, out_channels, kernel_size=kernel_size, stride=1, padding=padding)] # Create a convolutional layer
        if self.batch_norm:
            layers.append(nn.BatchNorm2d(out_channels)) # Batch normalization if enabled
        layers.append(self.get_activation()) # Activation function
        layers.append(nn.MaxPool2d(kernel_size=2, stride=2)) # Downsample by 2
        layers.append(nn.Dropout(self.dropout_rate)) # Dropout layer
        return nn.Sequential(*layers)

    # Choose activation function
    def get_activation(self):
        activations = {
            'relu': nn.ReLU(),
            'gelu': nn.GELU(),
            'silu': nn.SiLU(),
            'mish': nn.Mish()
        }
        return activations.get(self.activation, nn.ReLU()) # Default to ReLU if not found

    # Forward pass through the network
    def forward(self, x):
        x = self.conv_layers(x) # Pass through conv layers
        x = self.flatten(x) # Flatten before dense layers
        x = self.fc_layers(x) # Pass through dense layers
        return x


# Get number of output classes
num_classes = len(train_data.classes)

# Create the model with custom configuration
model = CustomCNN(
    input_channels=3,
    input_size=128,
    num_classes=num_classes,
    num_conv_layers=5,
    num_filters=[32, 64, 128, 256, 512], # Define the number of filters for each convolutional layer
    kernel_size=[3, 5, 3, 5, 1],  # Different kernel sizes for each conv layer
    dense_neurons=[512, 256, 64] # Three fully connected layers before output
).to(device) # Move model to GPU/CPU

In [None]:
def train_model(config=None):
    with wandb.init(config=config): # Initialize Weights & Biases for logging hyperparameters and metrics
        config = wandb.config # Access the configuration set by Weights & Biases

        # Create CNN model with hyperparameters
        model = CustomCNN(
            input_channels=3,
            num_filters=config.num_filters,
            activation=config.activation,
            dense_neurons=128,
            num_classes=10,
            dropout_rate=config.dropout_rate,
            batch_norm=config.batch_norm
        ).to(device)

        # Define loss and optimizer
        criterion = nn.CrossEntropyLoss()# Cross-Entropy function use as a loss function for multi-class classification
        optimizer = torch.optim.Adam(model.parameters(), lr=0.001) # Set the Adam optimizer with a learning rate of 0.001


        # Training loop
        for epoch in range(10):  # 10 epochs for demonstration
            model.train() # Set the model to training mode
            running_loss = 0.0 # Initialize a variable to track the running loss
            correct, total = 0, 0 # Initialize counters for correct predictions and total samples

            for images, labels in train_loader:
                images, labels = images.to(device), labels.to(device) # Move images and labels to the device (GPU/CPU)
                optimizer.zero_grad() # Clear the gradients from the previous step
                outputs = model(images) # Pass the images through the model to get predictions
                loss = criterion(outputs, labels) # Backpropagate the loss to compute gradients
                loss.backward() # Backpropagate the loss to compute gradients
                optimizer.step() # Update the model's parameters based on gradients

                running_loss += loss.item() # Add the current loss to the running loss
                _, predicted = torch.max(outputs, 1) # Get the predicted class by choosing the highest output probability
                total += labels.size(0) # Update the total number of labels
                correct += (predicted == labels).sum().item() # Count the number of correct predictions


            train_acc = 100 * correct / total # Calculate the training accuracy
            val_acc, val_loss = evaluate_model(model, val_loader, criterion) # Evaluate the model on the validation data

            print({'train_loss': running_loss / len(train_loader), 'train_accuracy': train_acc,
                       'val_loss': val_loss, 'val_accuracy': val_acc})
            # Log metrics
            wandb.log({'train_loss': running_loss / len(train_loader), 'train_accuracy': train_acc,
                       'val_loss': val_loss, 'val_accuracy': val_acc})



In [None]:
def evaluate_model(model, data_loader, criterion):
    model.eval() # Set model to evaluation mode (turn off dropout, batchnorm, etc.)
    correct, total, running_loss = 0, 0, 0.0 # Initialize counters for correct predictions, total samples, and loss

    with torch.no_grad():
        # Loop through the data in batches
        for images, labels in data_loader:
            images, labels = images.to(device), labels.to(device) # Move data to the device (GPU or CPU)
            outputs = model(images) # Get model predictions for the batch
            loss = criterion(outputs, labels) # Calculate loss for this batch
            running_loss += loss.item() # Accumulate the batch loss

            _, predicted = torch.max(outputs, 1) # Get the class with the highest score for each sample
            total += labels.size(0) # Add number of samples in the batch to total
            correct += (predicted == labels).sum().item() # Count how many predictions are correct


    accuracy = 100 * correct / total # Calculate overall accuracy as a percentage
    avg_loss = running_loss / len(data_loader) # Calculate average loss per batch
    return accuracy, avg_loss # Return accuracy and average loss

In [None]:
# Define the sweep configuration dictionary
sweep_config = {
    'name': 'scratch_hyperparam_sweep-1',  # Name of the sweep
    'method': 'bayes',  # Optimization method: could be 'grid', 'random', or 'bayes'
    
    # Metric to optimize during the sweep
    'metric': {'name': 'val_accuracy', 'goal': 'maximize'},
    
    # Hyperparameters to tune and their candidate values
    'parameters': {
        # Different filter combinations for CNN layers
        'num_filters': {
            'values': [
                [16, 32, 64, 128, 256],      # Increasing filter sizes
                [64, 64, 64, 64, 64],        # Same number of filters in each layer
                [256, 128, 64, 32, 16],      # Decreasing filter sizes
                [32, 32, 32, 32, 32],        # Uniform filters
                [16, 32, 64, 32, 16]         # Symmetric pattern
            ]
        },
        
        # Activation functions to try
        'activation': {'values': ['relu', 'gelu', 'silu', 'mish']},
        
        # Whether to use batch normalization or not
        'batch_norm': {'values': [True, False]},
        
        # Dropout rate values to explore
        'dropout_rate': {'values': [0.2, 0.3]},
        
        # Whether to apply data augmentation techniques
        'data_augmentation': {'values': [True, False]},
        
        # How to organize filters: same as previous layer, double, or half
        'filter_organization': {'values': ['same', 'double', 'half']},
        
        # Different kernel size combinations for convolution layers
        'kernel_size': {
            'values': [
                [3, 3, 3, 3, 3],     # All 3x3 kernels
                [3, 3, 5, 3, 3],     # One wider layer in the middle
                [3, 5, 3, 5, 3],     # Alternating pattern
                [5, 5, 5, 5, 5],     # All 5x5 kernels
                [5, 7, 7, 3, 5]      # Mixed sizes
            ]
        },
        
        # Configurations for dense (fully connected) layers
        'dense_neurons': {
            'values': [
                [512],                # Single dense layer with 512 units
                [256],                # Single layer with fewer units
                [128],                # Even fewer
                [64, 128, 256],       # Multi-layer with increasing size
                [512, 256, 64]        # Multi-layer with decreasing size
            ]
        }
    }
}

# Start the sweep on WandB with the defined configuration under project 'DL_A2'
sweep_id = wandb.sweep(sweep_config, project='DL_A2')

# Launch the sweep agent to run training 100 times with different hyperparameter combinations
wandb.agent(sweep_id, train_model, count=100)

In [None]:

# Define best config from sweep BEFORE wandb.init
best_config = {
    'name': 'Best_configuration_test_acc-1',
    'num_filters':  [64,64,64,64,64],
    'activation': 'gelu',
    'dropout_rate': 0.2,
    'batch_norm': True
}

#  Initialize wandb project
wandb.init(
    project="DL_A2",  # project name
    config=best_config,
    name="best-model-run",  # optional run name
)


# Initialize the model with best config
best_model = CustomCNN(
    input_channels=3,
    num_filters=best_config['num_filters'],
    activation=best_config['activation'],
    dense_neurons=128,
    num_classes=len(train_data.classes),
    dropout_rate=best_config['dropout_rate'],
    batch_norm=best_config['batch_norm'],
    num_conv_layers=len(best_config['num_filters'])
).to(device)


# Loss and optimizer
criterion = nn.CrossEntropyLoss() # Use cross-entropy loss for classification
optimizer = torch.optim.Adam(best_model.parameters(), lr=0.001) # Adam optimizer with learning rate

# Train for N epochs
for epoch in range(20):    # Train for 20 epochs
    best_model.train() # Set model to training mode
    running_loss, correct, total = 0.0, 0, 0 # Initialize metrics

    for images, labels in train_loader: # Loop over training batches
        images, labels = images.to(device), labels.to(device) # Move data to device
        optimizer.zero_grad() # Clear previous gradients
        outputs = best_model(images) # Forward pass
        loss = criterion(outputs, labels) # Compute loss
        loss.backward() # Backpropagation
        optimizer.step() # Update model weights
 
        running_loss += loss.item() # Accumulate loss
        _, predicted = torch.max(outputs, 1) # Get predicted class
        total += labels.size(0) # Update total count
        correct += (predicted == labels).sum().item() # Count correct predictions

    train_acc = 100 * correct / total # Calculate training accuracy
    val_acc, val_loss = evaluate_model(best_model, val_loader, criterion) # Evaluate on validation set

    #  Log metrics to wandb
    wandb.log({
        "epoch": epoch + 1,
        "train_accuracy": train_acc,
        "train_loss": running_loss / len(train_loader),
        "val_accuracy": val_acc,
        "val_loss": val_loss
     })

    print(f"[Epoch {epoch+1}] Train Acc: {train_acc:.2f}%, Val Acc: {val_acc:.2f}%") # Print epoch summary

In [None]:

best_model.eval() #Set model to evaluation mode (no dropout, no batchnorm updates)
correct = 0 # Count of correct predictions
total = 0  # Total number of test samples

with torch.no_grad():
    for images, labels in test_loader: # Loop through test data
        images, labels = images.to(device), labels.to(device) # Move data to GPU/CPU
        outputs = best_model(images) # Get model predictions
        _, preds = torch.max(outputs, 1) # Take the class with highest score
        correct += (preds == labels).sum().item() # Count correct predictions
        total += labels.size(0) # Count total samples

test_accuracy = correct / total # Calculate accuracy
print(f" Test Accuracy: {test_accuracy * 100:.2f}%") # Print the final test accuracy

#  Log to wandb
wandb.log({"test_accuracy": test_accuracy * 100})

In [None]:
import torch
import numpy as np
import matplotlib.pyplot as plt

# Function to unnormalize and convert a tensor image to NumPy for visualization
def imshow(img):
    img = img.cpu().numpy().transpose((1, 2, 0)) # Move to CPU
    mean = np.array([0.485, 0.456, 0.406]) # Mean used in normalization
    std = np.array([0.229, 0.224, 0.225]) # Std used in normalization
    img = std * img + mean  # Unnormalize the image
    img = np.clip(img, 0, 1) # Clip pixel values to [0, 1]
    return img

# Function to display model predictions
def show_predictions(model, dataloader, class_names, num_images=30):
    model.eval() # Set model to evaluation mode
    images_shown = 0 # Counter for images shown
    plt.figure(figsize=(12, 20)) # Create a big figure for showing images
    with torch.no_grad():  # Disable gradient computation
        for batch_idx, (inputs, labels) in enumerate(dataloader): # Loop through data
            inputs, labels = inputs.to(device), labels.to(device) # Move to device
            outputs = model(inputs)  # Get model predictions
            _, preds = torch.max(outputs, 1) # Get class with highest score
            for j in range(inputs.size(0)): # Loop through batch
                if images_shown >= num_images: # Stop if enough images shown
                    break
                img = imshow(inputs[j]) # Convert tensor to image
                plt.subplot(10, 3, images_shown + 1) # Create subplot (10 rows, 3 columns)
                plt.imshow(img) # Show image
                plt.title(f"Pred: {class_names[preds[j]]}\nTrue: {class_names[labels[j]]}", fontsize=8) # Title with predicted and true label
                plt.axis('off') # Hide axes
                images_shown += 1 # Update image counter
            if images_shown >= num_images:  # Break outer loop too if done
                break
    
    plt.tight_layout() # Adjust layout to prevent overlap
    plt.savefig("predictions.png", bbox_inches='tight') # Save figure as PNG file
    plt.close()  #  Close the plot so it's fully written
    wandb.log({"sample_predictions": wandb.Image("predictions.png")}) # Log image to wandb
    


#  Display 10x3 predictions from test set
show_predictions(best_model, test_loader, train_data.classes, num_images=30)
