In [152]:
import os
import random
import shutil
from pathlib import Path
import matplotlib.pyplot as plt
import numpy as np
from PIL import Image
from torchvision import transforms
import kagglehub
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from torchvision import models, datasets, transforms
from pathlib import Path

In [153]:
path = kagglehub.dataset_download("feyzazkefe/trashnet")
DATA_DIR = Path('C:/Users/murug/.cache/kagglehub/datasets/feyzazkefe/trashnet/versions/1/dataset-resized')
ALL_CLASSES = ['plastic', 'metal', 'glass', 'cardboard', 'paper', 'organic']
TRAIN_SPLIT_RATIO = 0.8 # 80% for training, 20% for validation
IMAGE_SIZE = (224, 224)

In [154]:
def simulate_dataset_creation(base_dir, classes, num_images_per_class=100):
    """
    Creates a dummy dataset with a similar structure to TrashNet for demonstration.
    This function will be skipped if the data directory already exists.
    """
    if base_dir.exists():
        print(f"Dataset already exists at '{base_dir}'. Skipping simulation.")
        return

    print(f"Creating a simulated dataset at '{base_dir}'...")
    os.makedirs(base_dir, exist_ok=True)
    for cls in classes:
        cls_path = base_dir / cls
        os.makedirs(cls_path, exist_ok=True)
        for i in range(num_images_per_class):
            # Create a dummy text file to represent an image
            with open(cls_path / f"image_{i}.txt", "w") as f:
                f.write(f"This is a dummy image for {cls}.")
    print("Simulated dataset created successfully.")

In [155]:
# --- Data Preparation Logic ---
def prepare_data_for_model(data_directory, all_classes):
    """
    Organizes data into train/val splits and returns file paths.
    """
    print(f"\n--- Starting data preparation from directory: {data_directory} ---")

    if not data_directory.exists():
        print(f"Error: Dataset directory '{data_directory}' not found.")
        print("Please download and extract the TrashNet dataset and update the DATA_DIR path.")
        return None, None, None

    # Initialize dictionaries to hold file paths
    all_files = {cls: [] for cls in all_classes}
    valid_classes = []
    
    for cls in all_classes:
        class_path = data_directory / cls
        if not class_path.exists():
            print(f"Warning: Directory '{class_path}' not found. Skipping class '{cls}'.")
            continue
        
        # Only select files that are likely images (ending with common extensions)
        files = [f for f in class_path.glob('*') if f.suffix.lower() in ['.jpg', '.jpeg', '.png', '.gif', '.bmp']]
        
        if not files:
            print(f"Warning: No valid image files found in '{class_path}'. Skipping class '{cls}'.")
            continue
            
        all_files[cls] = files
        valid_classes.append(cls)

    if not valid_classes:
        print("\nError: No valid classes with image data were found. Please check your dataset.")
        return None, None, None
        
    # Print a summary of the loaded files
    print("\nInitial file counts per class (only including classes with data):")
    for cls in valid_classes:
        print(f"  - {cls}: {len(all_files[cls])} images")
    
    # --- Split the data into train and validation sets ---
    print("\nSplitting data into training and validation sets...")
    train_dir = data_directory.parent / 'train'
    val_dir = data_directory.parent / 'validation'
    
    # Clear existing train/val directories to avoid duplication
    if train_dir.exists():
        shutil.rmtree(train_dir)
    if val_dir.exists():
        shutil.rmtree(val_dir)

    for cls in valid_classes:
        files = all_files[cls]
        random.shuffle(files) # Shuffle the files for a good mix
        
        split_index = int(len(files) * TRAIN_SPLIT_RATIO)
        train_files = files[:split_index]
        val_files = files[split_index:]
        
        # Create destination directories
        os.makedirs(train_dir / cls, exist_ok=True)
        os.makedirs(val_dir / cls, exist_ok=True)
        
        # Copy files to the new directories
        for file in train_files:
            shutil.copy(file, train_dir / cls / file.name)
        for file in val_files:
            shutil.copy(file, val_dir / cls / file.name)
    
    print("\nData splitting and organization complete!")

    return train_dir, val_dir, valid_classes

In [156]:
# --- Main Data Preparation Logic ---
def prepare_data_for_model(data_directory, classes):
    """
    Organizes data into train/val splits and returns file paths.
    """
    print(f"\n--- Starting data preparation from directory: {data_directory} ---")

    # Call the simulation function if the data directory is not present
    simulate_dataset_creation(data_directory, classes)
    
    # Initialize dictionaries to hold file paths
    all_files = {cls: [] for cls in classes}
    for cls in classes:
        class_path = data_directory / cls
        if not class_path.exists():
            print(f"Warning: Directory '{class_path}' not found. Skipping class '{cls}'.")
            continue
        all_files[cls] = [f for f in class_path.glob('*') if f.is_file()]
        # Print a summary of the loaded files
    print("\nInitial file counts per class:")
    for cls, files in all_files.items():
        print(f"  - {cls}: {len(files)} images")
    
    # --- Split the data into train and validation sets ---
    print("\nSplitting data into training and validation sets...")
    train_dir = data_directory.parent / 'train'
    val_dir = data_directory.parent / 'validation'
    
    # Clear existing train/val directories to avoid duplication
    if train_dir.exists():
        shutil.rmtree(train_dir)
    if val_dir.exists():
        shutil.rmtree(val_dir)

    for cls in classes:
        files = all_files[cls]
        random.shuffle(files) # Shuffle the files for a good mix
        
        split_index = int(len(files) * TRAIN_SPLIT_RATIO)
        train_files = files[:split_index]
        val_files = files[split_index:]
        
        # Create destination directories
        os.makedirs(train_dir / cls, exist_ok=True)
        os.makedirs(val_dir / cls, exist_ok=True)
        
        # Copy files to the new directories
        for file in train_files:
            shutil.copy(file, train_dir / cls / file.name)
        for file in val_files:
            shutil.copy(file, val_dir / cls / file.name)
    
    print("\nData splitting and organization complete!")

    return train_dir, val_dir

In [157]:
def define_transforms(image_size):
    """
    Defines the image preprocessing and augmentation pipelines.
    """
    # Augmentation for the training set
    train_transforms = transforms.Compose([
        transforms.Resize(image_size),
        transforms.RandomHorizontalFlip(),
        transforms.RandomRotation(10),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
    ])
    
    # Normalization for the validation set (no augmentation)
    val_transforms = transforms.Compose([
        transforms.Resize(image_size),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
    ])
    
    print("/nData preprocessing and augmentation pipelines defined.")
    return train_transforms, val_transforms

In [158]:
# --- Exploratory Data Analysis (EDA) Functions ---
def visualize_class_distribution(train_dir, val_dir, classes):
    """
    Creates and displays a bar chart of the class distribution.
    """
    print("/nVisualizing class distribution...")
    train_counts = [len(os.listdir(train_dir / cls)) for cls in classes]
    val_counts = [len(os.listdir(val_dir / cls)) for cls in classes]
    
    x = np.arange(len(classes))
    width = 0.35
    
    fig, ax = plt.subplots(figsize=(10, 6))
    rects1 = ax.bar(x - width/2, train_counts, width, label='Train')
    rects2 = ax.bar(x + width/2, val_counts, width, label='Validation')
    
    ax.set_ylabel('Number of Images')
    ax.set_title('Image Count by Class and Split')
    ax.set_xticks(x)
    ax.set_xticklabels(classes, rotation=45, ha="right")
    ax.legend()
    
    fig.tight_layout()
    plt.show()

In [159]:
def show_sample_images(data_directory, classes):
    """
    Displays one random image from each class.
    """
    print("/nShowing a sample image from each class...")
    fig, axes = plt.subplots(1, len(classes), figsize=(15, 3))
    
    for i, cls in enumerate(classes):
        class_path = data_directory / cls
        files = [f for f in class_path.glob('*') if f.suffix.lower() in ['.jpg', '.jpeg', '.png', '.gif', '.bmp']]
        if files:
            sample_file = random.choice(files)
            try:
                img = Image.open(sample_file)
                axes[i].imshow(img)
                axes[i].set_title(cls)
                axes[i].axis('off')
            except Exception as e:
                axes[i].set_title(f"{cls} (Error)")
                axes[i].axis('off')
                print(f"Could not open image file {sample_file}: {e}")
        else:
            axes[i].set_title(f"{cls} (No images)")
            axes[i].axis('off')
            
    fig.tight_layout()
    plt.show()

In [160]:
def analyze_pixel_distribution(data_directory, classes):
    """
    Analyzes and plots the pixel intensity distribution for a sample of images.
    """
    print("/nAnalyzing pixel intensity distribution...")
    num_samples = 5 # Number of images to sample from each class for analysis
    all_pixels = {'R': [], 'G': [], 'B': []}
    
    for cls in classes:
        class_path = data_directory / cls
        files = list(class_path.glob('*'))
        if not files:
            continue
            
        sampled_files = random.sample(files, min(num_samples, len(files)))
        
        for file in sampled_files:
            try:
                img = Image.open(file).convert('RGB')
                img_array = np.array(img)
                all_pixels['R'].extend(img_array[:, :, 0].flatten())
                all_pixels['G'].extend(img_array[:, :, 1].flatten())
                all_pixels['B'].extend(img_array[:, :, 2].flatten())
            except Exception as e:
                print(f"Could not process image {file}: {e}")
                
    fig, axes = plt.subplots(1, 3, figsize=(18, 5))
    
    axes[0].hist(all_pixels['R'], bins=256, color='red', alpha=0.7)
    axes[0].set_title('Red Channel')
    axes[1].hist(all_pixels['G'], bins=256, color='green', alpha=0.7)
    axes[1].set_title('Green Channel')
    axes[2].hist(all_pixels['B'], bins=256, color='blue', alpha=0.7)
    axes[2].set_title('Blue Channel')
    
    for ax in axes:
        ax.set_xlabel('Pixel Intensity')
        ax.set_ylabel('Frequency')
        
    fig.suptitle('Pixel Intensity Distribution Across All Classes')
    fig.tight_layout(rect=[0, 0.03, 1, 0.95])
    plt.show()

In [161]:
def visualize_class_distribution(train_dir, val_dir, classes):
    """
    Creates and displays a bar chart of the class distribution.
    """
    print("\nVisualizing class distribution...")
    train_counts = [len(os.listdir(train_dir / cls)) for cls in classes]
    val_counts = [len(os.listdir(val_dir / cls)) for cls in classes]
    
    x = np.arange(len(classes))
    width = 0.35
    
    fig, ax = plt.subplots(figsize=(10, 6))
    rects1 = ax.bar(x - width/2, train_counts, width, label='Train')
    rects2 = ax.bar(x + width/2, val_counts, width, label='Validation')
    
    ax.set_ylabel('Number of Images')
    ax.set_title('Image Count by Class and Split')
    ax.set_xticks(x)
    ax.set_xticklabels(classes, rotation=45, ha="right")
    ax.legend()
    
    fig.tight_layout()
    plt.show()

In [162]:
DATA_ROOT = Path('C:/Users/murug/.cache/kagglehub/datasets/feyzazkefe/trashnet/versions/1')
NUM_CLASSES = 6
IMAGE_SIZE = (224, 224)
BATCH_SIZE = 32
LEARNING_RATE = 0.001
NUM_EPOCHS = 10
MODEL_NAME = "resnet50" # Options: resnet50, mobilenet_v2, efficientnet_b0

In [163]:
# --- Data Loading ---
def get_data_loaders(data_root, image_size, batch_size):
    """
    Creates data loaders for the training and validation sets.
    """
    print("\nLoading data from prepared directories...")
    # These transforms should match the ones used in the data prep script for validation
    data_transforms = {
        'train': transforms.Compose([
            transforms.Resize(image_size),
            transforms.RandomHorizontalFlip(),
            transforms.RandomRotation(10),
            transforms.ToTensor(),
            transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
        ]),
        'validation': transforms.Compose([
            transforms.Resize(image_size),
            transforms.ToTensor(),
            transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
        ]),
    }
    
    image_datasets = {
        x: datasets.ImageFolder(
            os.path.join(data_root, x), 
            data_transforms[x]
        ) for x in ['train', 'validation']
    }
    
    dataloaders = {
        x: DataLoader(
            image_datasets[x], 
            batch_size=batch_size, 
            shuffle=True, 
            num_workers=4
        ) for x in ['train', 'validation']
    }
    
    dataset_sizes = {x: len(image_datasets[x]) for x in ['train', 'validation']}
    class_names = image_datasets['train'].classes
    
    print("Data loaders created successfully.")
    return dataloaders, dataset_sizes, class_names

In [164]:
# --- Model Development (Transfer Learning) ---
def build_model(model_name, num_classes):
    """
    Loads a pre-trained model, freezes base layers, and adds a custom classifier.
    """
    print(f"\nBuilding model with {model_name}...")
    if model_name == "resnet50":
        model = models.resnet50(weights=models.ResNet50_Weights.DEFAULT)
        # Freeze all layers in the pre-trained model
        for param in model.parameters():
            param.requires_grad = False
        # Replace the final fully connected layer
        num_ftrs = model.fc.in_features
        model.fc = nn.Linear(num_ftrs, num_classes)
    elif model_name == "mobilenet_v2":
        model = models.mobilenet_v2(weights=models.MobileNet_V2_Weights.DEFAULT)
        for param in model.parameters():
            param.requires_grad = False
        num_ftrs = model.classifier[1].in_features
        model.classifier[1] = nn.Linear(num_ftrs, num_classes)
    elif model_name == "efficientnet_b0":
        model = models.efficientnet_b0(weights=models.EfficientNet_B0_Weights.DEFAULT)
        for param in model.parameters():
            param.requires_grad = False
        num_ftrs = model.classifier[1].in_features
        model.classifier[1] = nn.Linear(num_ftrs, num_classes)
    else:
        raise ValueError("Unsupported model name.")

    print("Model built successfully with a custom classifier.")
    return model

In [165]:
# --- Training and Validation Loop ---
def train_model(model, dataloaders, dataset_sizes, num_epochs, learning_rate):
    """
    The main function for training and validating the model.
    """
    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    model = model.to(device)

    # Define loss function and optimizer
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.fc.parameters(), lr=learning_rate)
    
    print("\nStarting training...")
    for epoch in range(num_epochs):
        print(f'Epoch {epoch+1}/{num_epochs}')
        print('-' * 10)
        
        # Each epoch has a training and validation phase
        for phase in ['train', 'validation']:
            if phase == 'train':
                model.train() # Set model to training mode
            else:
                model.eval() # Set model to evaluate mode
            
            running_loss = 0.0
            running_corrects = 0
            
            # Iterate over data
            for inputs, labels in dataloaders[phase]:
                inputs = inputs.to(device)
                labels = labels.to(device)
                
                optimizer.zero_grad()
                
                # Forward pass
                with torch.set_grad_enabled(phase == 'train'):
                    outputs = model(inputs)
                    _, preds = torch.max(outputs, 1)
                    loss = criterion(outputs, labels)
                    
                    # Backward pass and optimize only if in training phase
                    if phase == 'train':
                        loss.backward()
                        optimizer.step()
                
                # Statistics
                running_loss += loss.item() * inputs.size(0)
                running_corrects += torch.sum(preds == labels.data)
            
            epoch_loss = running_loss / dataset_sizes[phase]
            epoch_acc = running_corrects.double() / dataset_sizes[phase]
            
            print(f'{phase} Loss: {epoch_loss:.4f} Acc: {epoch_acc:.4f}')

    print("\nTraining complete!")
    return model

In [166]:
def print_final_summary(train_dir, val_dir, classes):
    """
    Prints the final file counts for the train and validation sets.
    """
    print("\n--- Final Data Summary ---")
    print(f"\nTraining set located at: {train_dir}")
    for cls in classes:
        count = len(os.listdir(train_dir / cls))
        print(f"  - {cls}: {count} images")

    print(f"\nValidation set located at: {val_dir}")
    for cls in classes:
        count = len(os.listdir(val_dir / cls))
        print(f"  - {cls}: {count} images")
    
    print("\n--- Next Steps for Class Imbalance ---")
    print("If you notice a significant class imbalance in the visualization, you can address it during the training process by:")
    print("  - Using class weights in your loss function.")
    print("  - Applying data augmentation techniques to the minority classes more aggressively.")
    print("  - Using a different sampling strategy, such as oversampling the minority class.")
    print("\nData is now ready for model training!")

# --- Script Execution ---
if __name__ == "__main__":
    train_data_path, val_data_path, valid_classes = prepare_data_for_model(DATA_DIR, ALL_CLASSES)
    if train_data_path and val_data_path and valid_classes:
        train_transforms, val_transforms = define_transforms(IMAGE_SIZE)
        visualize_class_distribution(train_data_path, val_data_path, valid_classes)
        show_sample_images(train_data_path, val_data_path, valid_classes)
        analyze_pixel_distribution(train_data_path, val_data_path, valid_classes)
        print_final_summary(train_data_path, val_data_path, valid_classes)
    else:
        print("\nSkipping further processing due to data preparation errors. Please resolve the issues above.")

    try:
        dataloaders, dataset_sizes, class_names = get_data_loaders(DATA_ROOT, IMAGE_SIZE, BATCH_SIZE)
        model_ft = build_model(MODEL_NAME, NUM_CLASSES)
        model_ft = train_model(model_ft, dataloaders, dataset_sizes, NUM_EPOCHS, LEARNING_RATE)
        # You can now save the trained model for later use
        # torch.save(model_ft.state_dict(), 'trashnet_model.pth')
        print(f"\nModel training finished. Final model has {NUM_CLASSES} output classes.")
    except Exception as e:
        print(f"An error occurred: {e}")


--- Starting data preparation from directory: C:\Users\murug\.cache\kagglehub\datasets\feyzazkefe\trashnet\versions\1\dataset-resized ---
Dataset already exists at 'C:\Users\murug\.cache\kagglehub\datasets\feyzazkefe\trashnet\versions\1\dataset-resized'. Skipping simulation.

Initial file counts per class:
  - plastic: 482 images
  - metal: 410 images
  - glass: 501 images
  - cardboard: 403 images
  - paper: 594 images
  - organic: 0 images

Splitting data into training and validation sets...

Data splitting and organization complete!


ValueError: not enough values to unpack (expected 3, got 2)

In [None]:
import os
from pathlib import Path

# Set this to the path where your original dataset is located
DATA_DIR = Path('C:/Users/murug/.cache/kagglehub/datasets/feyzazkefe/trashnet/versions/1')

# Check the contents of the original 'organic' folder
print(f"Checking original directory: {DATA_DIR / 'organic'}")
original_files = os.listdir(DATA_DIR / 'organic')
print(f"Files found: {len(original_files)}")

# Check the contents of the new 'train/organic' folder
TRAIN_DIR = DATA_DIR.parent / 'train'
print(f"Checking training directory: {TRAIN_DIR / 'organic'}")
train_files = os.listdir(TRAIN_DIR / 'organic')
print(f"Files found: {len(train_files)}")

Checking original directory: C:\Users\murug\.cache\kagglehub\datasets\feyzazkefe\trashnet\versions\1\organic


FileNotFoundError: [WinError 3] The system cannot find the path specified: 'C:\\Users\\murug\\.cache\\kagglehub\\datasets\\feyzazkefe\\trashnet\\versions\\1\\organic'