<a href="https://colab.research.google.com/github/lahirunie-dulsara/EN3150-Assignment-3-CNN/blob/Aazir/realwaste_cnn.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import datasets, models, transforms
from torch.utils.data import DataLoader
import os
import shutil
from sklearn.model_selection import train_test_split
import time


In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
# Device configuration
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

In [None]:
import zipfile, os, re, shutil
from PIL import Image

zip_path = "/content/drive/MyDrive/Pattern Recognition/realwaste.zip"

extract_path = "/content"
os.makedirs(extract_path, exist_ok=True)

with zipfile.ZipFile(zip_path, 'r') as zip_ref:
    zip_ref.extractall(extract_path)
print("Unzipped to:", extract_path)


In [None]:
base_dir = "/content/realwaste-main/RealWaste"
for folder in os.listdir(base_dir):
    count = len(os.listdir(os.path.join(base_dir, folder)))
    print(f"{folder}: {count} images")

In [None]:
import os
import cv2
import pandas as pd
from sklearn.model_selection import StratifiedShuffleSplit
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from PIL import Image
import random
import numpy as np
import torch
import time
from tqdm import tqdm
import matplotlib.pyplot as plt
from collections import Counter, defaultdict

# Configuration
BATCH_SIZE = 64
IMG_SIZE = (224, 224)
TEST_SPLIT = 0.15
VAL_SPLIT = 0.15
RANDOM_SEED = 42
DATA_DIR = "/content/realwaste-main/RealWaste"

def set_seed(seed):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed(seed)
        torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

set_seed(RANDOM_SEED)

class RealWasteDataset(Dataset):
    def __init__(self, root_dir=None, data=[], labels=[], size=None, transforms=[], transform=None, print_info=True):
        self.root_dir = root_dir
        self.transforms = transforms
        self.transform = transform
        self.data = data
        self.labels = labels
        self.image_sizes = {} if size is None else {size}

        # Class mapping
        self.classes = {
            0: 'Cardboard', 1: 'Food Organics', 2: 'Glass', 3: 'Metal',
            4: 'Miscellaneous Trash', 5: 'Paper', 6: 'Plastic', 7: 'Textile Trash', 8: 'Vegetation'
        }

        start = time.time()

        if root_dir is not None:
            for label, label_name in self.classes.items():
                class_dir = os.path.join(self.root_dir, label_name)
                if not os.path.exists(class_dir):
                    continue
                for img_name in os.listdir(class_dir):
                    img_path = os.path.join(class_dir, img_name)

                    try:
                        with Image.open(img_path) as img:
                            img.verify()
                            img_size = img.size
                            self.image_sizes[img_size] = self.image_sizes.get(img_size, 0) + 1
                    except Exception as e:
                        print(f"Error loading image {img_path}: {e}")
                        continue

                    self.data.append(img_path)
                    self.labels.append(label)

        end = time.time()

        if print_info:
            self.print_info(round(end - start, 2))

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        img_path = self.data[idx]
        label = self.labels[idx]
        image = Image.open(img_path).convert("RGB")

        if self.transforms:
            image = self.transforms[idx](image)
        elif self.transform:
            image = self.transform(image)

        return image, label

    def to_dataframe(self):
        return pd.DataFrame({
            'image_path': self.data,
            'label': self.labels,
            'transformation': [self.transform for _ in range(len(self.labels))]
        })

    def print_info(self, elapsed):
        print("----------Dataset Summary----------")
        print(f"Total images: {len(self.data)}")
        print(f"Number of classes: {len(self.classes)}")
        print("Images per class:")

        class_counts = {label_name: 0 for label_name in self.classes.values()}
        for label in self.labels:
            class_name = self.classes[label]
            class_counts[class_name] += 1

        for class_name, count in class_counts.items():
            print(f"  {class_name}: {count} images")

        print("\nUnique image sizes:")
        for size, count in self.image_sizes.items():
            print(f"  {size}: {count} images")
        print(f"\nLoaded in {elapsed} seconds!")

transform = transforms.Compose([
        transforms.Resize(IMG_SIZE),
        transforms.ToTensor(),
])

full_dataset = RealWasteDataset(DATA_DIR, transform=transform)

# Visualize class distribution
def plot_class_distribution_from_dataframe(dataframe, label_to_class, title="Image Distribution per Class", color='skyblue'):
    label_counts = Counter(dataframe['label'])
    sorted_labels = sorted(label_counts.items())

    plt.figure(figsize=(12, 6))
    bars = plt.bar([label_to_class[label] for label, _ in sorted_labels], [count for _, count in sorted_labels], color=color)

    for bar in bars:
        yval = bar.get_height()
        plt.text(bar.get_x() + bar.get_width() / 2, yval + 2, str(yval), ha='center', fontsize=10, color='black')

    plt.title(title, fontsize=16)
    plt.xlabel("Classes", fontsize=12)
    plt.ylabel("Number of Images", fontsize=12)
    plt.xticks(rotation=45, fontsize=10)
    plt.grid(axis='y', linestyle='--', alpha=0.7)
    plt.tight_layout()
    plt.show()

plot_class_distribution_from_dataframe(full_dataset.to_dataframe(), full_dataset.classes)


In [None]:
# Stratified Split
def train_val_test_split(full_dataframe, test_size, val_size, random_seed=RANDOM_SEED):
    stratified_split = StratifiedShuffleSplit(n_splits=1, test_size=test_size, random_state=random_seed)
    for train_val_idx, test_idx in stratified_split.split(full_dataframe, full_dataframe['label']):
        train_val_df = full_dataframe.iloc[train_val_idx]
        test_df = full_dataframe.iloc[test_idx]

    val_adjusted_size = val_size / (1 - test_size)
    stratified_split = StratifiedShuffleSplit(n_splits=1, test_size=val_adjusted_size, random_state=random_seed)
    for train_idx, val_idx in stratified_split.split(train_val_df, train_val_df['label']):
        train_df = train_val_df.iloc[train_idx]
        val_df = train_val_df.iloc[val_idx]

    return train_df, val_df, test_df

train_df, val_df, test_df = train_val_test_split(full_dataset.to_dataframe(), test_size=TEST_SPLIT, val_size=VAL_SPLIT)

# Visualizing Augmented Dataset
def augment_train(train_df, duplication_factor):
    transform_augment = transforms.Compose([
        transforms.RandomHorizontalFlip(),
        transforms.RandomVerticalFlip(),
        transforms.RandomRotation(degrees=45),
        transforms.RandomResizedCrop(size=IMG_SIZE[0], scale=(0.6, 0.9)),
        transforms.Resize(IMG_SIZE),
        transforms.ToTensor()
    ])

    augmented_rows = []
    for _, row in tqdm(train_df.iterrows(), total=len(train_df), desc="Augmenting Images"):
        img_path, label = row['image_path'], row['label']
        for _ in range(duplication_factor):
            augmented_rows.append({
                "image_path": img_path,
                "label": label,
                "transformation": transform_augment
            })

    augmented_df = pd.DataFrame(augmented_rows)
    combined_df = pd.concat([train_df, augmented_df], ignore_index=True)
    return combined_df

train_df_augmented = augment_train(train_df, duplication_factor=3)

plot_class_distribution_from_dataframe(train_df_augmented, full_dataset.classes)

# Visualizing Training Images by Class
def plot_training_images_by_class(train_df_augmented, class_names, ncols=5, title="Examples of Images by Class"):
    sampled_images = defaultdict(list)
    for label in train_df_augmented['label'].unique():
        class_df = train_df_augmented[train_df_augmented['label'] == label]
        sampled_df = class_df.sample(min(len(class_df), ncols), random_state=RANDOM_SEED)
        for _, row in sampled_df.iterrows():
            sampled_images[label].append(row)

    classes = sorted(sampled_images.keys())
    nrows = len(classes)
    fig, axs = plt.subplots(nrows, ncols + 1, figsize=(ncols * 2.5, nrows * 2))
    fig.suptitle(title, fontsize=16)

    for i, label in enumerate(classes):
        axs[i, 0].imshow(Image.open(sampled_images[label][0]['image_path']))
        axs[i, 0].set_title(class_names[label], fontsize=12)
        axs[i, 0].axis('off')

        for j in range(ncols):
            img_path = sampled_images[label][j]['image_path']
            axs[i, j+1].imshow(Image.open(img_path))
            axs[i, j+1].axis('off')

    plt.tight_layout()
    plt.show()

plot_training_images_by_class(train_df_augmented, full_dataset.classes)


In [None]:
import torch
from torch.utils.data import Dataset
from PIL import Image

class RealWasteDataset(Dataset):
    def __init__(self, data, labels, transforms=None, print_info=True):
        self.data = data  # List of image paths
        self.labels = labels  # List of labels (numeric or one-hot encoded)
        self.transforms = transforms  # List of transformations
        self.print_info = print_info  # Whether to print dataset info

        # Print dataset info if specified
        if self.print_info:
            print(f"Dataset contains {len(data)} images.")

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        # Load the image
        img_path = self.data[idx]
        label = self.labels[idx]

        img = Image.open(img_path)

        # Apply transformations (if any)
        if self.transforms:
            img = self.transforms[idx](img)

        return img, label


In [None]:
import pandas as pd
from torch.utils.data import DataLoader

# Constants
BATCH_SIZE = 32  # Example batch size

# Assuming you have the augmented DataFrames loaded as train_df_augmented, val_df, test_df
# # Replace this with your actual DataFrame loading method

# # Example data for the DataLoader
# train_df_augmented = pd.read_csv('/path/to/your/train_df_augmented.csv')
# val_df = pd.read_csv('/path/to/your/val_df.csv')
# test_df = pd.read_csv('/path/to/your/test_df.csv')

# Create Datasets
train_dataset = RealWasteDataset(
    data=train_df_augmented['image_path'].to_list(),
    labels=train_df_augmented['label'].to_list(),
    transforms=train_df_augmented['transformation'].to_list(),
    print_info=False
)

val_dataset = RealWasteDataset(
    data=val_df['image_path'].to_list(),
    labels=val_df['label'].to_list(),
    transforms=val_df['transformation'].to_list(),
    print_info=False
)

test_dataset = RealWasteDataset(
    data=test_df['image_path'].to_list(),
    labels=test_df['label'].to_list(),
    transforms=test_df['transformation'].to_list(),
    print_info=False
)

# Create DataLoaders
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False)

# Print dataset information
print("Train", end='\n\t')
print(f"Batches: {len(train_loader)}", end='\n\t')
print(f"Images: {len(train_loader.dataset)}")
print("Validation", end='\n\t')
print(f"Batches: {len(val_loader)}", end='\n\t')
print(f"Images: {len(val_loader.dataset)}")
print("Test", end='\n\t')
print(f"Batches: {len(test_loader)}", end='\n\t')
print(f"Images: {len(test_loader.dataset)}")


In [None]:
import torch
import torch.optim as optim
import torch.nn as nn
from torchvision import models, transforms
from torch.utils.data import DataLoader
from torch.optim.lr_scheduler import ReduceLROnPlateau
import matplotlib.pyplot as plt
from tqdm import tqdm

# Create the class weights
class_counts = np.bincount(train_dataset.labels)
class_weights = 1.0 / class_counts
class_weights = class_weights / class_weights.sum() * len(class_counts)
num_classes = len(full_dataset.classes)

# Get the device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Update the training results dataframe
def update_training_results_csv(model_name, optimizer, lr, weight_decay, train_losses, val_losses, train_accuracies, val_accuracies):

    csv_path = 'training_results.csv'

    # Controlla se il file CSV esiste
    if os.path.exists(csv_path):
        df = pd.read_csv(csv_path)
    else:
        # Se non esiste, crea un DataFrame vuoto con le colonne necessarie
        df = pd.DataFrame(columns=["model_name",
                                   "epoch",
                                   "optimizer",
                                   "learning_rate",
                                   "weight_decay",
                                   "train_loss",
                                   "val_loss",
                                   "train_accuracy",
                                   "val_accuracy"
                                  ]
                         )

    # Rimuovi eventuali vecchie entry per il modello
    df = df[df['model_name'] != model_name]

    # Crea un nuovo DataFrame con i risultati attuali
    new_entries = {
        "model_name": [model_name] * len(train_losses),
        "epoch": list(range(1, len(train_losses) + 1)),
        "optimizer": [optimizer] * len(train_losses),
        "learning_rate": [lr] * len(train_losses),
        "weight_decay": [weight_decay] * len(train_losses),
        "train_loss": train_losses,
        "val_loss": val_losses,
        "train_accuracy": train_accuracies,
        "val_accuracy": val_accuracies
    }

    new_df = pd.DataFrame(new_entries)

    # Concatenare i nuovi dati con quelli esistenti
    df = pd.concat([df, new_df], ignore_index=True)

    # Salva il DataFrame aggiornato nel file CSV
    df.to_csv(csv_path, index=False)

    print(f"Dati di training per '{model_name}' aggiornati in '{csv_path}'")

# Function to train the model with a progress bar
def train_model(model, train_loader, val_loader, criterion, optimizer, num_epochs=10):
    train_losses, val_losses = [], []
    train_accuracies, val_accuracies = [], []

    for epoch in range(num_epochs):
        model.train()
        running_loss = 0.0
        correct_train = 0
        total_train = 0

        # Progress bar for training
        with tqdm(train_loader, desc=f"Training Epoch {epoch+1}/{num_epochs}", unit="batch") as tepoch:
            for images, labels in tepoch:
                images, labels = images.to(device), labels.to(device)

                # Zero the gradients
                optimizer.zero_grad()

                # Forward pass through the model
                outputs = model(images)
                loss = criterion(outputs, labels)

                # Backpropagation
                loss.backward()
                optimizer.step()

                running_loss += loss.item()

                # Calculate accuracy
                _, predicted = torch.max(outputs, 1)
                total_train += labels.size(0)
                correct_train += (predicted == labels).sum().item()

                # Update the progress bar
                tepoch.set_postfix(loss=running_loss / len(tepoch), accuracy=100 * correct_train / total_train)

        train_losses.append(running_loss / len(train_loader))
        train_accuracies.append(100 * correct_train / total_train)

        # Validation
        model.eval()
        val_loss = 0.0
        correct_val = 0
        total_val = 0

        # Progress bar for validation
        with tqdm(val_loader, desc=f"Validation Epoch {epoch+1}/{num_epochs}", unit="batch") as vepoch:
            with torch.no_grad():
                for images, labels in vepoch:
                    images, labels = images.to(device), labels.to(device)
                    outputs = model(images)
                    loss = criterion(outputs, labels)

                    val_loss += loss.item()

                    # Calculate accuracy
                    _, predicted = torch.max(outputs, 1)
                    total_val += labels.size(0)
                    correct_val += (predicted == labels).sum().item()

                    # Update the progress bar
                    vepoch.set_postfix(loss=val_loss / len(vepoch), accuracy=100 * correct_val / total_val)

        val_losses.append(val_loss / len(val_loader))
        val_accuracies.append(100 * correct_val / total_val)

        print(f"Epoch {epoch+1}/{num_epochs}, "
              f"Train Loss: {train_losses[-1]:.4f}, Train Accuracy: {train_accuracies[-1]:.2f}%, "
              f"Val Loss: {val_losses[-1]:.4f}, Val Accuracy: {val_accuracies[-1]:.2f}%")

    return train_losses, val_losses, train_accuracies, val_accuracies

# Useful plot function
def plot_training_validation_metrics(train_losses, val_losses, train_accuracies, val_accuracies):
    """
    Function to plot the graph of losses and accuracies during training and validation.

    Args:
        train_losses (list): List containing loss values during training.
        val_losses (list): List containing loss values during validation.
        train_accuracies (list): List containing accuracies during training.
        val_accuracies (list): List containing accuracies during validation.
    """
    plt.figure(figsize=(12, 6))

    # Loss plot
    plt.subplot(1, 2, 1)
    plt.plot(train_losses, label='Train Loss')
    plt.plot(val_losses, label='Validation Loss')
    plt.title('Loss per Epoch')
    plt.xlabel('Epochs')
    plt.ylabel('Loss')
    plt.legend()

    # Accuracy plot
    plt.subplot(1, 2, 2)
    plt.plot(train_accuracies, label='Train Accuracy')
    plt.plot(val_accuracies, label='Validation Accuracy')
    plt.title('Accuracy per Epoch')
    plt.xlabel('Epochs')
    plt.ylabel('Accuracy (%)')
    plt.legend()

    plt.tight_layout()
    plt.show()


In [None]:
class CNNModel(nn.Module):
    def __init__(self, num_classes=9):
        super(CNNModel, self).__init__()

        # Blocco convoluzionale base
        self.conv_block1 = self._create_conv_block(3, 64)  # 64 filtri
        self.conv_block2 = self._create_conv_block(64, 128)  # 128 filtri
        self.conv_block3 = self._create_conv_block(128, 256)  # 256 filtri
        self.conv_block4 = self._create_conv_block(256, 512)  # 512 filtri
        self.conv_block5 = self._create_conv_block(512, 512)  # 512 filtri aggiuntivi

        # Global Average Pooling
        self.global_avg_pool = nn.AdaptiveAvgPool2d(1)

        # Strati completamente connessi
        self.fc1 = nn.Linear(512, 512)
        self.fc2 = nn.Linear(512, num_classes)
        self.dropout = nn.Dropout(0.3)

    def _create_conv_block(self, in_channels, out_channels):
        """Crea un blocco convoluzionale standard."""
        return nn.Sequential(
            nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True),
            nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2)
        )

    def forward(self, x):
        x = self.conv_block1(x)
        x = self.conv_block2(x)
        x = self.conv_block3(x)
        x = self.conv_block4(x)
        x = self.conv_block5(x)

        # Global Average Pooling
        x = self.global_avg_pool(x)
        x = x.view(x.size(0), -1)  # Flatten

        # Strati densi
        x = nn.functional.relu(self.fc1(x))
        x = self.dropout(x)
        x = self.fc2(x)

        return x
# Istanza del modello
model = CNNModel(num_classes=9)

model_name = "Custom CNN"

# Configurazione del dispositivo
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

# Conversione dei pesi al dispositivo
weights = torch.tensor(class_weights.copy(), dtype=torch.float32).to(device)  # Personalizza i pesi
criterion = nn.CrossEntropyLoss(weight=weights)

# Modello sul dispositivo
model = model.to(device)

# Ottimizzatore
learning_rate = 1e-5
weight_decay = 1e-2
optimizer = optim.Adam(
    filter(lambda p: p.requires_grad, model.parameters()),
    lr=learning_rate,
    weight_decay=weight_decay
)

# Scheduler del learning rate
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=5, gamma=0.5)

# Numero di epoche
num_epochs = 15

# Training del modello
train_losses, val_losses, train_accuracies, val_accuracies = train_model(
    model=model,
    train_loader=train_loader,
    val_loader=val_loader,
    criterion=criterion,
    optimizer=optimizer,
    num_epochs=num_epochs
)

# Save the model
model_save_path = f"/kaggle/working/trained_{model_name}_model.pth"
torch.save(model.state_dict(), model_save_path)
print(f"Model saved to {model_save_path}")

# Update metrics dataset
update_training_results_csv(model_name, "adam", learning_rate, weight_decay, train_losses, val_losses, train_accuracies, val_accuracies)

# Plot delle metriche di training e validazione
plot_training_validation_metrics(
    train_losses=train_losses,
    val_losses=val_losses,
    train_accuracies=train_accuracies,
    val_accuracies=val_accuracies
)


In [None]:
# Save the model
model_save_path = f"/content/trained_{model_name}_model.pth"
torch.save(model.state_dict(), model_save_path)
print(f"Model saved to {model_save_path}")


In [None]:
import os
import torch
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import classification_report, confusion_matrix

# Set the path to the directory containing the models
models_path = "/content/"
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# List of class names
class_names = [
    'Cardboard',
    'Food Organics',
    'Glass',
    'Metal',
    'Miscellaneous Trash',
    'Paper',
    'Plastic',
    'Textile Trash',
    'Vegetation'
]

class CNNModel(nn.Module):
    def __init__(self, num_classes=9):
        super(CNNModel, self).__init__()

        # Blocco convoluzionale base
        self.conv_block1 = self._create_conv_block(3, 64)  # 64 filtri
        self.conv_block2 = self._create_conv_block(64, 128)  # 128 filtri
        self.conv_block3 = self._create_conv_block(128, 256)  # 256 filtri
        self.conv_block4 = self._create_conv_block(256, 512)  # 512 filtri
        self.conv_block5 = self._create_conv_block(512, 512)  # 512 filtri aggiuntivi

        # Global Average Pooling
        self.global_avg_pool = nn.AdaptiveAvgPool2d(1)

        # Strati completamente connessi
        self.fc1 = nn.Linear(512, 512)
        self.fc2 = nn.Linear(512, num_classes)
        self.dropout = nn.Dropout(0.3)

    def _create_conv_block(self, in_channels, out_channels):
        """Crea un blocco convoluzionale standard."""
        return nn.Sequential(
            nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True),
            nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2)
        )

    def forward(self, x):
        x = self.conv_block1(x)
        x = self.conv_block2(x)
        x = self.conv_block3(x)
        x = self.conv_block4(x)
        x = self.conv_block5(x)

        # Global Average Pooling
        x = self.global_avg_pool(x)
        x = x.view(x.size(0), -1)  # Flatten

        # Strati densi
        x = nn.functional.relu(self.fc1(x))
        x = self.dropout(x)
        x = self.fc2(x)

        return x

def load_custom_cnn_model(model_path, num_classes):
    """Load the custom CNN model."""
    model = CNNModel(num_classes=num_classes)
    model.load_state_dict(torch.load(model_path, map_location=device))
    model.to(device)
    model.eval()
    return model

def evaluate_custom_cnn_models(models_path, test_loader, num_classes):
    """Evaluate the custom CNN model and store predictions and labels."""
    model_files = [f for f in os.listdir(models_path) if f.endswith("model.pth")]
    results = {}
    for model_file in model_files:
        model_path = os.path.join(models_path, model_file)
        print(f"Evaluating custom CNN model: {model_file}")
        model = load_custom_cnn_model(model_path, num_classes)
        all_preds, all_labels = [], []
        with torch.no_grad():
            for images, labels in test_loader:
                images, labels = images.to(device), labels.to(device)
                outputs = model(images)
                _, predicted = torch.max(outputs, 1)
                all_preds.append(predicted.cpu().numpy())
                all_labels.append(labels.cpu().numpy())
        results[model_file] = {
            "preds": np.concatenate(all_preds),
            "labels": np.concatenate(all_labels)
        }
    return results

def plot_confusion_matrices(results):
    """Plot normalized confusion matrices (percentages) for all models with a maximum of three matrices per row."""
    num_models = len(results)
    cols = 3  # Max 3 confusion matrices per row
    rows = -(-num_models // cols)  # Round up division to calculate number of rows

    fig, axes = plt.subplots(rows, cols, figsize=(8 * cols, 8 * rows), dpi=150)
    axes = axes.flatten()  # Flatten axes array for easier indexing

    for i, (model_file, data) in enumerate(results.items()):
        # Compute confusion matrix and normalize it by row
        cm = confusion_matrix(data["labels"], data["preds"], normalize='true')
        cm_percentage = cm * 100  # Convert to percentage

        # Plot confusion matrix with percentages rounded to 1 decimal place
        sns.heatmap(cm_percentage, annot=True, fmt='.1f', cmap='Blues',
                    xticklabels=class_names, yticklabels=class_names, ax=axes[i])
        axes[i].set_title(f"Confusion Matrix (Normalized)\n{model_file}", fontsize=12)
        axes[i].set_xlabel("Predicted")
        axes[i].set_ylabel("True")
        axes[i].set_aspect('equal')  # Maintain aspect ratio

    # Remove unused axes if number of models is less than the grid size
    for j in range(i + 1, len(axes)):
        fig.delaxes(axes[j])

    plt.tight_layout()
    plt.savefig("confusion_matrices.png", dpi=300, bbox_inches="tight", format="png")
    plt.show()

def plot_classification_reports(results):
    """Plot classification reports for all models."""
    for model_file, data in results.items():
        report = classification_report(data["labels"], data["preds"], target_names=class_names)
        print(f"Classification Report for {model_file}:\n")
        print(report)
        print("\n" + "=" * 80 + "\n")

def plot_classification_metrics(results):
    """Plot precision, recall, and F1-score for all models with percentages on bars."""
    metrics = ['precision', 'recall', 'f1-score']
    for model_file, data in results.items():
        # Generate classification report as a dictionary
        report = classification_report(
            data["labels"], data["preds"], target_names=class_names, output_dict=True
        )

        # Extract metrics for each class (excluding 'accuracy' and 'macro avg')
        class_metrics = {metric: [] for metric in metrics}
        for class_name in class_names:
            for metric in metrics:
                class_metrics[metric].append(report[class_name][metric])

        # Convert class names to a numpy array to avoid Seaborn warnings
        class_array = np.array(class_names)

        # Plot metrics for the current model
        fig, axes = plt.subplots(1, 3, figsize=(18, 6), dpi=150)
        for ax, metric in zip(axes, metrics):
            # Create barplot
            sns.barplot(y=class_array, x=class_metrics[metric], ax=ax, palette="viridis")
            ax.set_title(f"{metric.capitalize()} by Class for {model_file}")
            ax.set_xlabel(metric.capitalize())
            ax.set_ylabel("Class")
            ax.set_xlim(0, 1)  # Scores are between 0 and 1

            # Add percentage labels inside bars
            for i, value in enumerate(class_metrics[metric]):
                ax.text(
                    value + 0.02, i, f"{value:.2%}", va="center", ha="left", fontsize=9
                )

        plt.tight_layout()
        # Save each model's figure with a different name
        plt.savefig(f"classification_metrics_{model_file}.png", dpi=300, bbox_inches="tight", format="png")
        plt.show()

def plot_model_accuracies(results):
    """Plot overall accuracy for all models."""
    accuracies = {}
    for model_file, data in results.items():
        # Generate classification report as a dictionary
        report = classification_report(
            data["labels"], data["preds"], target_names=class_names, output_dict=True
        )
        # Extract overall accuracy
        accuracies[model_file] = report['accuracy']

    # Convert model names to a numpy array to avoid warnings
    model_names = np.array(list(accuracies.keys()))
    accuracy_values = list(accuracies.values())

    plt.figure(figsize=(10, 6), dpi=150)
    sns.barplot(x=accuracy_values, y=model_names, palette="viridis")
    plt.title("Overall Accuracy by Model")
    plt.xlabel("Accuracy")
    plt.ylabel("Model")
    plt.xlim(0, 1)  # Accuracy is between 0 and 1

    # Add percentage labels inside bars
    for i, value in enumerate(accuracy_values):
        plt.text(
            value + 0.02, i, f"{value:.2%}", va="center", ha="left", fontsize=10
        )

    plt.tight_layout()
    plt.savefig("model_accuracies.png", dpi=300, bbox_inches="tight", format="png")
    plt.show()

# Evaluation and plotting for custom CNN
num_classes = len(class_names)
results = evaluate_custom_cnn_models(models_path, test_loader, num_classes)
plot_confusion_matrices(results)
plot_classification_reports(results)
plot_classification_metrics(results)
plot_model_accuracies(results)


In [None]:
import torch
import os
import numpy as np

class CNNModel(nn.Module):
    def __init__(self, num_classes=9):
        super(CNNModel, self).__init__()

        # Blocco convoluzionale base
        self.conv_block1 = self._create_conv_block(3, 64)  # 64 filtri
        self.conv_block2 = self._create_conv_block(64, 128)  # 128 filtri
        self.conv_block3 = self._create_conv_block(128, 256)  # 256 filtri
        self.conv_block4 = self._create_conv_block(256, 512)  # 512 filtri
        self.conv_block5 = self._create_conv_block(512, 512)  # 512 filtri aggiuntivi

        # Global Average Pooling
        self.global_avg_pool = nn.AdaptiveAvgPool2d(1)

        # Strati completamente connessi
        self.fc1 = nn.Linear(512, 512)
        self.fc2 = nn.Linear(512, num_classes)
        self.dropout = nn.Dropout(0.3)

    def _create_conv_block(self, in_channels, out_channels):
        """Crea un blocco convoluzionale standard."""
        return nn.Sequential(
            nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True),
            nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2)
        )

    def forward(self, x):
        x = self.conv_block1(x)
        x = self.conv_block2(x)
        x = self.conv_block3(x)
        x = self.conv_block4(x)
        x = self.conv_block5(x)

        # Global Average Pooling
        x = self.global_avg_pool(x)
        x = x.view(x.size(0), -1)  # Flatten

        # Strati densi
        x = nn.functional.relu(self.fc1(x))
        x = self.dropout(x)
        x = self.fc2(x)

        return x

def load_model(model_path, device):
    """
    Load the model from the specified path and ensure it's ready for evaluation.
    """
    if not os.path.exists(model_path):
        raise FileNotFoundError(f"Model path '{model_path}' does not exist.")

    # Check if the model file ends with '.pth'
    model_files = [f for f in os.listdir(model_path) if f.endswith('.pth')]
    if not model_files:
        raise FileNotFoundError("No model files ending with '.pth' found in the directory.")

    model_file_path = os.path.join(model_path, model_files[0])
    print(f"Loading model from {model_file_path}")

    # Load the state dict
    model = torch.load(model_file_path, map_location=device)
    model.eval()
    return model

def evaluate_model(model, test_loader, device):
    """
    Evaluate the model and collect predictions and labels.
    """
    all_preds = []
    all_labels = []

    with torch.no_grad():
        for batch_idx, (images, labels) in enumerate(test_loader):
            # Ensure that images and labels are not empty
            if images.shape[0] == 0:
                print(f"Warning: Empty batch at index {batch_idx}")
                continue

            images, labels = images.to(device), labels.to(device)

            # Forward pass
            outputs = model(images)
            _, predicted = torch.max(outputs, 1)

            # Collect predictions and labels
            all_preds.extend(predicted.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())

    if not all_preds or not all_labels:
        print("Evaluation produced no predictions or labels.")
        return None

    print(f"Collected {len(all_preds)} predictions and {len(all_labels)} labels.")
    return np.array(all_preds), np.array(all_labels)

# Example Execution
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
import torch
from torchvision import datasets, transforms
from torch.utils.data import DataLoader

# Define test data path and transformation
test_data_path = "/kaggle/input/last-test/test"  # Update with your actual test dataset path

# Define test transformations
test_transforms = transforms.Compose([
    transforms.Resize((224, 224)),  # Adjust size as needed
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

# Load the test dataset
try:
    test_dataset = datasets.ImageFolder(root=test_data_path, transform=test_transforms)
    test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)
    print(f"Loaded test dataset with {len(test_dataset)} images.")
except FileNotFoundError as e:
    print(f"Error loading test dataset: {e}")


In [None]:
import torch
import torch.nn as nn
import numpy as np
from torch.utils.data import DataLoader, Dataset
from torchvision import transforms, datasets
from sklearn.metrics import classification_report, confusion_matrix
import seaborn as sns
import matplotlib.pyplot as plt
import os

# Define CNN Model
class CNNModel(nn.Module):
    def __init__(self, num_classes=9):
        super(CNNModel, self).__init__()
        self.conv_block1 = self._create_conv_block(3, 64)
        self.conv_block2 = self._create_conv_block(64, 128)
        self.conv_block3 = self._create_conv_block(128, 256)
        self.conv_block4 = self._create_conv_block(256, 512)
        self.conv_block5 = self._create_conv_block(512, 512)

        self.global_avg_pool = nn.AdaptiveAvgPool2d(1)
        self.fc1 = nn.Linear(512, 512)
        self.fc2 = nn.Linear(512, num_classes)
        self.dropout = nn.Dropout(0.3)

    def _create_conv_block(self, in_channels, out_channels):
        return nn.Sequential(
            nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True),
            nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2)
        )

    def forward(self, x):
        x = self.conv_block1(x)
        x = self.conv_block2(x)
        x = self.conv_block3(x)
        x = self.conv_block4(x)
        x = self.conv_block5(x)
        x = self.global_avg_pool(x)
        x = x.view(x.size(0), -1)
        x = nn.functional.relu(self.fc1(x))
        x = self.dropout(x)
        x = self.fc2(x)
        return x

# Function to load the model
def load_model(model_path, device):
    model = CNNModel()  # Instantiate the CNNModel class
    model.load_state_dict(torch.load(model_path, map_location=device))
    model.to(device)
    return model

# Evaluate the model
def evaluate_model(model, dataloader, device):
    model.eval()
    all_preds = []
    all_labels = []

    with torch.no_grad():
        for images, labels in dataloader:
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            _, preds = torch.max(outputs, 1)
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())

    return all_preds, all_labels

# Plot confusion matrix
def plot_confusion_matrix(y_true, y_pred, class_names):
    cm = confusion_matrix(y_true, y_pred)
    plt.figure(figsize=(8, 6))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=class_names, yticklabels=class_names)
    plt.xlabel("Predicted")
    plt.ylabel("Actual")
    plt.title("Confusion Matrix")
    plt.show()

# Plot classification report
def plot_classification_report(y_true, y_pred, class_names):
    report = classification_report(y_true, y_pred, target_names=class_names)
    print("Classification Report:\n", report)

# Load and preprocess test dataset
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

test_dataset_path = "/kaggle/input/last-test/test"  # Update with your test dataset path
test_dataset = datasets.ImageFolder(root=test_dataset_path, transform=transform)
test_loader = DataLoader(test_dataset, batch_size=8, shuffle=False)

# Load the model
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model_path = "/kaggle/working/trained_Custom CNN_model.pth"  # Replace with your correct model path

try:
    model = load_model(model_path, device)
except FileNotFoundError:
    print(f"Model file not found at {model_path}. Please check the path and file name.")

# Evaluate and plot results if model loading succeeds
if 'model' in locals():
    predictions, labels = evaluate_model(model, test_loader, device)

    # Plot the confusion matrix and classification report
    class_names = test_dataset.classes
    plot_confusion_matrix(labels, predictions, class_names)
    plot_classification_report(labels, predictions, class_names)