Intro

Step 1: Import training image dataset

In [1]:
#import kagglehub

#target_path = "./data"
#path = kagglehub.dataset_download("feyzazkefe/trashnet", path=target_path)
#print("Path to dataset files:", path)

Step 2: Import packages

In [18]:
import numpy as np
from PIL import Image
import os
import shutil

from collections.abc import Iterable

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

import torchvision
from torchvision.models import efficientnet_b0
import torchvision.transforms as transforms
from torchvision.datasets import ImageFolder
from torch.utils.data import DataLoader

from collections import Counter
from collections.abc import Iterable
from sklearn.model_selection import train_test_split
from sklearn.metrics import precision_score, recall_score, f1_score, roc_auc_score, classification_report

Step 2: Split data into training and test sets & transform both sets to tensors

In [3]:
transform_train = transforms.Compose([
    transforms.Resize((224, 224)),  # Resize to EfficientNet input size
    transforms.RandomHorizontalFlip(),  # Lightweight and effective
    transforms.RandomRotation(10),  # Augment slightly with small angles
    transforms.ToTensor(),  # Convert image to PyTorch tensor
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])  # ImageNet normalization
])

transform_test = transforms.Compose([
    transforms.Resize((224, 224)),  # Resize to EfficientNet input size
    transforms.ToTensor(),  # Convert image to PyTorch tensor
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])  # ImageNet normalization
])

In [4]:
'''
#Data splitting code
# Paths
original_data_dir = "data/data-resized"
split_data_dir = "data/split_data"
categories = ["cardboard", "glass", "metal", "paper", "plastic", "trash"]

# Create directories for train, val, and test splits
for split in ["train", "val", "test"]:
    for category in categories:
        os.makedirs(os.path.join(split_data_dir, split, category), exist_ok=True)

# Split data for each category
for category in categories:
    category_path = os.path.join(original_data_dir, category)
    images = os.listdir(category_path)
    images = [img for img in images if img.endswith(('.jpg', '.png'))]  # Filter image files

    # Split into train+val and test (80-20)
    train_val, test = train_test_split(images, test_size=0.2, random_state=42)
    
    # Further split train+val into train and val (90-10 of train+val)
    train, val = train_test_split(train_val, test_size=0.1, random_state=42)

    # Copy files to split_data directory
    for split, split_images in zip(["train", "val", "test"], [train, val, test]):
        for img in split_images:
            src_path = os.path.join(category_path, img)
            dest_path = os.path.join(split_data_dir, split, category, img)
            shutil.copy(src_path, dest_path)

print("Data splitting completed!")
'''

'\n#Data splitting code\n# Paths\noriginal_data_dir = "data/data-resized"\nsplit_data_dir = "data/split_data"\ncategories = ["cardboard", "glass", "metal", "paper", "plastic", "trash"]\n\n# Create directories for train, val, and test splits\nfor split in ["train", "val", "test"]:\n    for category in categories:\n        os.makedirs(os.path.join(split_data_dir, split, category), exist_ok=True)\n\n# Split data for each category\nfor category in categories:\n    category_path = os.path.join(original_data_dir, category)\n    images = os.listdir(category_path)\n    images = [img for img in images if img.endswith((\'.jpg\', \'.png\'))]  # Filter image files\n\n    # Split into train+val and test (80-20)\n    train_val, test = train_test_split(images, test_size=0.2, random_state=42)\n    \n    # Further split train+val into train and val (90-10 of train+val)\n    train, val = train_test_split(train_val, test_size=0.1, random_state=42)\n\n    # Copy files to split_data directory\n    for spli

In [5]:
# Load datasets
train_dataset = ImageFolder(root="data/split_data/train", transform=transform_train)
val_dataset = ImageFolder(root="data/split_data/val", transform=transform_test)
test_dataset = ImageFolder(root="data/split_data/test", transform=transform_test)

# Define data loaders & batch size
batch_size = 32 # Can also be 64
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True, num_workers=4)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False, num_workers=4)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False, num_workers=4)

Step 3: Load in our efficietnet_b0 model & define number of classes in final layer

In [9]:
# Load in efficientnet_b0
model = efficientnet_b0(weights=True)

# Define class number
num_classes = 6
model.classifier[1] = nn.Linear(model.classifier[1].in_features, num_classes)

# Print class names from training data
print("Classes:", train_dataset.classes)

# Move model to GPU if available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = model.to(device)



Classes: ['cardboard', 'glass', 'metal', 'paper', 'plastic', 'trash']


Step 4: Define loss function & optimizer, as well as training loop and model evaluation method

In [21]:
# Define loss function with class weights and optimizer
class_counts = Counter(train_dataset.targets)
class_weights = torch.tensor([1.0 / count for count in class_counts.values()], device=device)
criterion = nn.CrossEntropyLoss(weight=class_weights)
optimizer = optim.Adam(model.parameters(), lr=0.0001)

# Function to train our model
def train_model(model, train_loader, val_loader, criterion, optimizer, epochs=10):
    model.train()
    for epoch in range(epochs):
        running_loss = 0.0
        correct = 0
        total = 0

        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)

            optimizer.zero_grad()  # Zero the parameter gradients
            outputs = model(inputs)  # Forward pass
            loss = criterion(outputs, labels)  # Calculate loss
            loss.backward()  # Backward pass
            optimizer.step()  # Update weights

            # Statistics
            running_loss += loss.item()
            _, predicted = torch.max(outputs, 1)
            correct += (predicted == labels).sum().item()
            total += labels.size(0)

        epoch_loss = running_loss / len(train_loader)
        epoch_acc = correct / total

        print(f"Epoch {epoch+1}/{epochs}, Loss: {epoch_loss:.4f}, Accuracy: {epoch_acc:.4f}")

        # Validate after each epoch
        validate_model(model, val_loader)

# Function to validate our model
def validate_model(model, val_loader):
    model.eval()
    all_preds = []
    all_labels = []
    with torch.no_grad():
        for inputs, labels in val_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            _, predicted = torch.max(outputs, 1)
            all_preds.extend(predicted.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())

    # Generate a classification report
    print(classification_report(all_labels, all_preds, target_names=train_dataset.classes))

def validate_model_with_metrics(model, val_loader):
    model.eval()
    all_preds = []
    all_labels = []
    all_probs = []

    with torch.no_grad():
        for inputs, labels in val_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)

            probs = F.softmax(outputs, dim=1)  # Convert logits to probabilities
            _, predicted = torch.max(outputs, 1)

            all_preds.extend(predicted.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())
            all_probs.extend(probs.cpu().numpy())

    # F1 Score, Precision, Recall
    precision = precision_score(all_labels, all_preds, average='weighted')
    recall = recall_score(all_labels, all_preds, average='weighted')
    f1 = f1_score(all_labels, all_preds, average='weighted')

    # AUC (one-vs-rest)
    auc = roc_auc_score(all_labels, all_probs, multi_class='ovr')

    print(f"Precision: {precision:.4f}")
    print(f"Recall: {recall:.4f}")
    print(f"F1 Score: {f1:.4f}")
    print(f"AUC: {auc:.4f}")

    return precision, recall, f1, auc

In [22]:
# Train the model
train_model(model, train_loader, val_loader, criterion, optimizer, epochs=10)

# Evaluate on the test set
validate_model_with_metrics(model, test_loader)

Epoch 1/10, Loss: 0.7131, Accuracy: 0.7664
              precision    recall  f1-score   support

   cardboard       0.97      0.94      0.95        33
       glass       0.81      0.85      0.83        40
       metal       0.69      0.88      0.77        33
       paper       0.98      0.88      0.92        48
     plastic       0.88      0.74      0.81        39
       trash       0.83      0.91      0.87        11

    accuracy                           0.86       204
   macro avg       0.86      0.87      0.86       204
weighted avg       0.87      0.86      0.86       204

Epoch 2/10, Loss: 0.2914, Accuracy: 0.9014
              precision    recall  f1-score   support

   cardboard       0.94      0.97      0.96        33
       glass       0.94      0.85      0.89        40
       metal       0.73      0.82      0.77        33
       paper       0.94      0.94      0.94        48
     plastic       0.94      0.82      0.88        39
       trash       0.67      0.91      0.77   

(np.float64(0.9158039129726422),
 np.float64(0.9133858267716536),
 np.float64(0.9133725043976313),
 np.float64(0.9905471569238822))

In [26]:
# Save the model
"""
save_dir = "Model_save"
save_path = os.path.join(save_dir, "efficientnet_trash_classifier.pth")
torch.save(model.state_dict(), save_path)

print(f"Model saved to {save_path}")
"""
# Load the model
model.load_state_dict(torch.load("Model_save/efficientnet_trash_classifier.pth"))
model.eval()

  model.load_state_dict(torch.load("Model_save/efficientnet_trash_classifier.pth"))


EfficientNet(
  (features): Sequential(
    (0): Conv2dNormActivation(
      (0): Conv2d(3, 32, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), bias=False)
      (1): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (2): SiLU(inplace=True)
    )
    (1): Sequential(
      (0): MBConv(
        (block): Sequential(
          (0): Conv2dNormActivation(
            (0): Conv2d(32, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), groups=32, bias=False)
            (1): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
            (2): SiLU(inplace=True)
          )
          (1): SqueezeExcitation(
            (avgpool): AdaptiveAvgPool2d(output_size=1)
            (fc1): Conv2d(32, 8, kernel_size=(1, 1), stride=(1, 1))
            (fc2): Conv2d(8, 32, kernel_size=(1, 1), stride=(1, 1))
            (activation): SiLU(inplace=True)
            (scale_activation): Sigmoid()
          )
          (2): Conv2dNormActivat