In [55]:
import torch
from torch import nn
from torch import optim
from torchvision import datasets, transforms
from torch.utils.data import random_split, DataLoader
import torch.optim as optim
import torch.nn.functional as F
from torch.optim.lr_scheduler import CosineAnnealingLR
import torchvision
import torchvision.transforms as transforms
import matplotlib.pyplot as plt
import time
import os
import random
import shutil
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
import json
import time
from sklearn.model_selection import train_test_split
from tqdm import tqdm 


In [56]:
!pip install torchviz




In [57]:
#this implementation was inspired by the following source: https://www.digitalocean.com/community/tutorials/alexnet-pytorch
class SceneClassificationCNN(nn.Module):
    def __init__(self, num_classes=15):
        super(SceneClassificationCNN, self).__init__()
        self.layer1 = nn.Sequential(
            nn.Conv2d(3, 96, kernel_size=11, stride=4, padding=0),
            nn.BatchNorm2d(96),
            nn.ELU(),  
            nn.MaxPool2d(kernel_size=3, stride=2))
        self.layer2 = nn.Sequential(
            nn.Conv2d(96, 256, kernel_size=5, stride=1, padding=2),
            nn.BatchNorm2d(256),
            nn.ELU(),  
            nn.MaxPool2d(kernel_size=3, stride=2))
        self.layer3 = nn.Sequential(
            nn.Conv2d(256, 384, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(384),
            nn.ELU())  
        self.layer4 = nn.Sequential(
            nn.Conv2d(384, 384, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(384),
            nn.ELU())  
        self.layer5 = nn.Sequential(
            nn.Conv2d(384, 256, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(256),
            nn.ELU(),  
            nn.MaxPool2d(kernel_size=3, stride=2))
        
        self.fc = nn.Sequential(
            nn.Dropout(0.3),
            nn.Linear(6400, 4096),
            nn.ELU())  
        self.fc1 = nn.Sequential(
            nn.Dropout(0.3),
            nn.Linear(4096, 4096),
            nn.ELU()) 
        self.fc2 = nn.Sequential(
            nn.Linear(4096, num_classes) 
        )

        # Initialize weights
        self._initialize_weights()

    def _initialize_weights(self):
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight, mode='fan_in', nonlinearity='linear')  
                m.weight.data *= 1.0  
                if m.bias is not None:
                    nn.init.constant_(m.bias, 0)
            elif isinstance(m, nn.Linear):
                nn.init.kaiming_normal_(m.weight, mode='fan_in', nonlinearity='linear')  
                m.weight.data *= 1.0  
                nn.init.constant_(m.bias, 0)
            elif isinstance(m, nn.BatchNorm2d):
                nn.init.ones_(m.weight)
                nn.init.zeros_(m.bias)




    def forward(self, x):
        out = self.layer1(x)
        out = self.layer2(out)
        out = self.layer3(out)
        out = self.layer4(out)
        out = self.layer5(out)
        out = out.view(out.size(0), -1)
        out = self.fc(out)
        out = self.fc1(out)
        out = self.fc2(out)
        
        return out


In [59]:
import os
import shutil
from sklearn.model_selection import train_test_split
from collections import Counter

def prepare_data_splits(dataset_dir, output_dir, train_ratio=0.8, val_ratio=0.1, test_ratio=0.1):
    """
    Splits the dataset into training, validation, and test sets, and copies them to an output directory,
    while balancing the dataset across classes.

    Parameters:
    - dataset_dir: The directory containing class subdirectories of images.
    - output_dir: The directory where train, val, and test subdirectories will be created.
    - train_ratio: Ratio of data to use for training (default is 0.8).
    - val_ratio: Ratio of data to use for validation (default is 0.1).
    - test_ratio: Ratio of data to use for testing (default is 0.1).
    """
    assert train_ratio + val_ratio + test_ratio == 1.0, "Train, val, and test ratios must sum to 1."
    
    # Create the output directories for splits
    for split in ['train', 'val', 'test']:
        split_dir = os.path.join(output_dir, split)
        os.makedirs(split_dir, exist_ok=True)

    class_counts = {}  # To store counts of images per class

    # Loop over each class directory in the input dataset
    for class_folder in os.listdir(dataset_dir):
        class_path = os.path.join(dataset_dir, class_folder)
        
        # Skip if this is not a class directory (e.g., skip hidden files)
        if os.path.isdir(class_path):
            images = os.listdir(class_path)
            class_counts[class_folder] = len(images)  # Count images in this class

    # Determine the minimum number of images in any class for balancing
    min_class_count = min(class_counts.values())
    
    # Loop again to create balanced splits
    for class_folder in os.listdir(dataset_dir):
        class_path = os.path.join(dataset_dir, class_folder)
        
        if os.path.isdir(class_path):
            images = os.listdir(class_path)
            # Balance by sampling the minimum count
            balanced_train_imgs = images[:min_class_count]  # Take the first 'min_class_count' images
            
            # Split the remaining images into validation and test
            remaining_imgs = images[min_class_count:]
            train_imgs, temp_imgs = train_test_split(balanced_train_imgs, test_size=(1 - train_ratio))
            val_imgs, test_imgs = train_test_split(temp_imgs, test_size=test_ratio / (test_ratio + val_ratio))

            # Function to copy images to the corresponding split folder in the output directory
            def copy_images(img_list, split_type):
                split_class_dir = os.path.join(output_dir, split_type, class_folder)
                os.makedirs(split_class_dir, exist_ok=True)
                for img in img_list:
                    src_img_path = os.path.join(class_path, img)
                    dest_img_path = os.path.join(split_class_dir, img)
                    shutil.copy(src_img_path, dest_img_path)

            # Copy images to the respective directories in the output folder
            copy_images(train_imgs, 'train')
            copy_images(val_imgs, 'val')
            copy_images(test_imgs, 'test')

    print(f"Data successfully split and copied into train, val, and test sets in '{output_dir}'.")

# Example usage
prepare_data_splits('data/15-Scene', 'data/preprocessed-data')


In [60]:
# Define data augmentation and transformation
from collections import Counter

transform = transforms.Compose([
    transforms.RandomResizedCrop(224),
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

train_dataset = datasets.ImageFolder('data/preprocessed-data/train', transform=transform)
val_dataset = datasets.ImageFolder('data/preprocessed-data/val', transform=transform)
test_dataset = datasets.ImageFolder('data/preprocessed-data/test', transform=transform)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)
# def print_class_distribution(dataset, dataset_name):
#     class_counts = Counter([label for _, label in dataset.samples])
#     print(f"\n{dataset_name} dataset distribution:")
#     for class_name, class_idx in dataset.class_to_idx.items():
#         print(f"{class_name}: {class_counts[class_idx]} images")

# Print total number of images
print('Total number of images:')
print('Train dataset:', len(train_dataset))
print('Validation dataset:', len(val_dataset))
print('Test dataset:', len(test_dataset))

# Print number of images per class
# print_class_distribution(train_dataset, 'Train')
# print_class_distribution(val_dataset, 'Validation')
# print_class_distribution(test_dataset, 'Test')


Total number of images:
Train dataset: 2520
Validation dataset: 315
Test dataset: 315


In [62]:
def get_optimizer(model, optimizer_type, lr=0.001):
    if optimizer_type == 'SGD':
        return optim.SGD(model.parameters(), lr=lr, momentum=0.9)
    elif optimizer_type == 'Adam':
        return optim.Adam(model.parameters(), lr=lr)
    elif optimizer_type == 'RMSProp':
        return optim.RMSprop(model.parameters(), lr=lr)
    else:
        raise ValueError(f"Unknown optimizer type: {optimizer_type}")
 

In [63]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = SceneClassificationCNN().to(device)
criterion = nn.CrossEntropyLoss()



In [64]:
def train_and_evaluate(model, train_loader, test_loader, optimizer,scheduler, num_epochs): 
    # To store training data for plotting
    training_data = {
        'train_loss': [],
        'test_loss': [],
        'train_error': [],
        'test_error': [],
        'accuracy': [],
        'precision': [],
        'recall': [],
        'f1_score': []
    }

    start_time = time.time()

    for epoch in range(num_epochs):
        # Training phase
        model.train()
        total_train_loss = 0
        correct_train = 0
        total_train = 0
        
        rain_loader_iter = tqdm(train_loader, desc=f"Training Epoch {epoch+1}/{num_epochs}")

        for images, labels in rain_loader_iter:
            images, labels = images.to(device), labels.to(device)

            optimizer.zero_grad()
            outputs = model(images)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            total_train_loss += loss.item() * images.size(0)
            _, predicted = torch.max(outputs.data, 1)
            total_train += labels.size(0)
            correct_train += (predicted == labels).sum().item()
        scheduler.step()
        avg_train_loss = total_train_loss / len(train_loader.dataset)
        train_error = 1 - (correct_train / total_train)

        # Testing phase
        model.eval()
        total_test_loss = 0
        correct_test = 0
        total_test = 0
        all_labels = []
        all_preds = []
        test_loader_iter = tqdm(test_loader, desc=f"Validating Epoch {epoch+1}/{num_epochs}")

        with torch.no_grad():
            for images, labels in test_loader_iter:
                images, labels = images.to(device), labels.to(device)

                outputs = model(images)
                loss = criterion(outputs, labels)
                total_test_loss += loss.item() * images.size(0)

                _, predicted = torch.max(outputs.data, 1)
                total_test += labels.size(0)
                correct_test += (predicted == labels).sum().item()

                all_labels.extend(labels.cpu().numpy())
                all_preds.extend(predicted.cpu().numpy())

        avg_test_loss = total_test_loss / len(test_loader.dataset)
        test_error = 1 - (correct_test / total_test)
        accuracy = accuracy_score(all_labels, all_preds)
        precision = precision_score(all_labels, all_preds, average='weighted', zero_division=0)
        recall = recall_score(all_labels, all_preds, average='weighted', zero_division=0)
        f1 = f1_score(all_labels, all_preds, average='weighted', zero_division=0)


        # Store metrics
        training_data['train_loss'].append(avg_train_loss)
        training_data['test_loss'].append(avg_test_loss)
        training_data['train_error'].append(train_error)
        training_data['test_error'].append(test_error)
        training_data['accuracy'].append(accuracy)
        training_data['precision'].append(precision)
        training_data['recall'].append(recall)
        training_data['f1_score'].append(f1)

        print(f'Epoch [{epoch+1}/{num_epochs}], '
              f'Train Loss: {avg_train_loss:.4f}, Test Loss: {avg_test_loss:.4f}, '
              f'Accuracy: {accuracy:.4f}, Precision: {precision:.4f}, '
              f'Recall: {recall:.4f}, F1 Score: {f1:.4f}')

    end_time = time.time()
    training_time = end_time - start_time
    training_data['training_time'] = training_time

    return training_data


In [65]:
def save_model_and_results(model, model_name, training_data):
    torch.save(model.state_dict(), f'{model_name}_model.pth')
    # Convert training data to a JSON-serializable format
    serializable_data = {}
    for key, value in training_data.items():
        if isinstance(value, torch.Tensor):
            serializable_data[key] = value.tolist()  # Convert Tensors to lists
        elif isinstance(value, list) and isinstance(value[0], torch.Tensor):
            serializable_data[key] = [v.tolist() for v in value]  # Convert lists of Tensors
        else:
            serializable_data[key] = value

    # Save the training data as JSON
    with open(f'{model_name}_training_data.json', 'w') as json_file:
        json.dump(serializable_data, json_file)


In [66]:
import copy

In [67]:
import torch

def evaluate_model(model, val_loader):
    correct = 0
    total = 0
    with torch.no_grad():
        for images, labels in val_loader:
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    return correct / total



In [None]:
optimizers = ['SGD', 'Adam', 'RMSProp']

# Save the initial state of the model
initial_model_state = copy.deepcopy(model.state_dict())

for opt_type in optimizers:
    # Reload the initial state of the model for each optimizer
    model.load_state_dict(initial_model_state)

    # Get optimizer
    optimizer = get_optimizer(model, opt_type)
    print(f"\nTraining with {opt_type} optimizer:")

    # Scheduler for the optimizer
    scheduler = CosineAnnealingLR(optimizer, T_max=100, eta_min=0.001)

    # Train and evaluate the model
    training_data = train_and_evaluate(model, train_loader, test_loader, optimizer, scheduler, num_epochs=300)
    save_model_and_results(model, f'scene_classification_{opt_type}', training_data)
    # Save the trained model and its results
    accuracy = evaluate_model(model, val_loader)
    print(f'Accuracy for Network on the whole validation set: {accuracy:.2%}')
