# **Waste Material Segregation for Improving Waste Management**

## **Objective**

The objective of this project is to implement an effective waste material segregation system using convolutional neural networks (CNNs) that categorises waste into distinct groups. This process enhances recycling efficiency, minimises environmental pollution, and promotes sustainable waste management practices.

The key goals are:

* Accurately classify waste materials into categories like cardboard, glass, paper, and plastic.
* Improve waste segregation efficiency to support recycling and reduce landfill waste.
* Understand the properties of different waste materials to optimise sorting methods for sustainability.

## **Data Understanding**

The Dataset consists of images of some common waste materials.

1. Food Waste
2. Metal
3. Paper
4. Plastic
5. Other
6. Cardboard
7. Glass


**Data Description**

* The dataset consists of multiple folders, each representing a specific class, such as `Cardboard`, `Food_Waste`, and `Metal`.
* Within each folder, there are images of objects that belong to that category.
* However, these items are not further subcategorised. <br> For instance, the `Food_Waste` folder may contain images of items like coffee grounds, teabags, and fruit peels, without explicitly stating that they are actually coffee grounds or teabags.

## **1. Load the data**

Load and unzip the dataset zip file.

**Import Necessary Libraries**

In [1]:
# Recommended versions:

# numpy version: 1.26.4
# pandas version: 2.2.2
# seaborn version: 0.13.2
# matplotlib version: 3.10.0
# PIL version: 11.1.0
# tensorflow version: 2.18.0
# keras version: 3.8.0
# sklearn version: 1.6.1

In [None]:
# Import essential libraries

import warnings
warnings.filterwarnings('ignore') 
import os
os.environ['IF_ENABLE_ONEDNN_OPTS'] = '1'

import zipfile
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import tensorflow as tf
from tensorflow.keras.preprocessing.image import load_img, img_to_array, ImageDataGenerator
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Flatten, Dense, Dropout, BatchNormalization
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import confusion_matrix, classification_report
from PIL import Image # For image operations

Load the dataset.

In [2]:
# Load and unzip the dataset
import os
from zipfile import ZipFile

zip_file_path = r'Dataset_Waste_Segregation.zip'
unzip_destination = r'Dataset_Waste_Segregation'

# Unzip the dataset
try:
    with ZipFile(zip_file_path, 'r') as zip_ref:
        zip_ref.extractall(unzip_destination)
    print(f"Dataset unzipped successfully to: {unzip_destination}")
except FileNotFoundError:
    print(f"Error: Zip file not found at {zip_file_path}. Please check the path.")
except zipfile.BadZipFile:
    print(f"Error: The file at {zip_file_path} is not a valid zip file.")
except Exception as e:
    print(f"An unexpected error occurred during unzipping: {e}")


Dataset unzipped successfully to: Dataset_Waste_Segregation


## **2. Data Preparation** <font color=red> [25 marks] </font><br>


### **2.1 Load and Preprocess Images** <font color=red> [8 marks] </font><br>

Let us create a function to load the images first. We can then directly use this function while loading images of the different categories to load and crop them in a single step.

#### **2.1.1** <font color=red> [3 marks] </font><br>
Create a function to load the images.

In [3]:
# Create a function to load the raw images

def load_and_preprocess_image(image_path, target_size=(128, 128)):
    try:
        img = load_img(image_path, target_size=target_size)

        # Convert the PIL Image to a NumPy array
        img_array = img_to_array(img)

        # Normalize pixel values to [0, 1]
        img_array = img_array / 255.0

        return img_array

    except Exception as e:
        print(f"Error loading or preprocessing image {image_path}: {e}")
        return None

#### **2.1.2** <font color=red> [5 marks] </font><br>
Load images and labels.

Load the images from the dataset directory. Labels of images are present in the subdirectories.

Verify if the images and labels are loaded correctly.

In [5]:
# Get the images and their labels
dataset_base_dir = os.path.join(unzip_destination)

IMG_WIDTH, IMG_HEIGHT = 128, 128
TARGET_SIZE = (IMG_WIDTH, IMG_HEIGHT)

all_images = []
all_labels = []
class_names = [] # To store the names of the classes

print(f"\nLoading images from: {dataset_base_dir}")

# Iterating through each subdirectory
for class_name in os.listdir(dataset_base_dir):
    class_dir = os.path.join(dataset_base_dir, class_name)

    # Checking that its a directory and no files are processeed if present in parent folder.
    if os.path.isdir(class_dir):
        class_names.append(class_name)
        print(f"Processing class: {class_name}")

        # Iterating through each image file in the class directory
        for image_filename in os.listdir(class_dir):
            image_path = os.path.join(class_dir, image_filename)

            # Load and preprocess the image using our defined function
            processed_img = load_and_preprocess_image(image_path, target_size=TARGET_SIZE)

            if processed_img is not None:
                all_images.append(processed_img)
                all_labels.append(class_name)

# Convert lists to NumPy arrays
X = np.array(all_images)
y_raw = np.array(all_labels)

# Verifing the images and labels are loaded correctly
print("\n--- Verification ---")
print(f"Total images loaded: {len(all_images)}")
print(f"Total labels loaded: {len(all_labels)}")
print(f"Shape of image data (X): {X.shape}")
print(f"Shape of label data (y_raw): {y_raw.shape}")
print(f"Detected classes: {class_names}")

if len(class_names) == 0:
    print("No classes or images found. Please check 'dataset_base_dir' path and dataset structure.")
    exit()



Loading images from: Dataset_Waste_Segregation
Processing class: Dataset_Waste_Segregation
Error loading or preprocessing image Dataset_Waste_Segregation/Dataset_Waste_Segregation/data.zip: name 'load_img' is not defined


NameError: name 'np' is not defined

Perform any operations, if needed, on the images and labels to get them into the desired format.

### **2.2 Data Visualisation** <font color=red> [9 marks] </font><br>

#### **2.2.1** <font color=red> [3 marks] </font><br>
Create a bar plot to display the class distribution

In [None]:
# Visualise Data Distribution
plt.figure(figsize=(10, 6))
sns.countplot(x=y_raw, order=class_names)
plt.title('Distribution of Waste Categories')
plt.xlabel('Waste Category')
plt.ylabel('Number of Images')
plt.xticks(rotation=45, ha='right')
plt.tight_layout()
plt.show()


#### **2.2.2** <font color=red> [3 marks] </font><br>
Visualise some sample images

In [None]:
# Visualise Sample Images (across different labels)

num_samples_per_class = 3
plt.figure(figsize=(15, 10))

for i, class_name in enumerate(class_names):
    # Get indices of images belonging to the current class
    class_indices = np.where(y_raw == class_name)[0]
    np.random.shuffle(class_indices)
    sample_indices = class_indices[:num_samples_per_class]

    for j, idx in enumerate(sample_indices):
        plt.subplot(len(class_names), num_samples_per_class, i * num_samples_per_class + j + 1)
        plt.imshow(X[idx])
        plt.title(f"{class_name}")
        plt.axis('off')
plt.tight_layout()
plt.show()

#### **2.2.3** <font color=red> [3 marks] </font><br>
Based on the smallest and largest image dimensions, resize the images.

In [None]:
# Find the smallest and largest image dimensions from the data set
original_widths = []
original_heights = []

# Re-iterate through dataset to get original dimensions (without preprocessing)
for class_name in os.listdir(dataset_base_dir):
    class_dir = os.path.join(dataset_base_dir, class_name)
    if os.path.isdir(class_dir):
        for image_filename in os.listdir(class_dir):
            image_path = os.path.join(class_dir, image_filename)
            try:
                with Image.open(image_path) as img:
                    original_widths.append(img.width)
                    original_heights.append(img.height)
            except Exception as e:
                print(f"Could not open image {image_path} for dimension analysis: {e}")



In [None]:
# Resize the image dimensions
if original_widths and original_heights:
    min_width = np.min(original_widths)
    max_width = np.max(original_widths)
    min_height = np.min(original_heights)
    max_height = np.max(original_heights)

    print(f"Smallest original width: {min_width} pixels")
    print(f"Largest original width: {max_width} pixels")
    print(f"Smallest original height: {min_height} pixels")
    print(f"Largest original height: {max_height} pixels")
    print(f"Chosen target size for CNN: {TARGET_SIZE} (width, height)")
    print("Images have been resized to this target size during loading.")
else:
    print("No images found for dimension analysis.")


### **2.3 Encoding the classes** <font color=red> [3 marks] </font><br>

There are seven classes present in the data.

We have extracted the images and their labels, and visualised their distribution. Now, we need to perform encoding on the labels. Encode the labels suitably.

####**2.3.1** <font color=red> [3 marks] </font><br>
Encode the target class labels.

In [None]:
# Encode the labels suitably

label_encoder = LabelEncoder()
y_encoded = label_encoder.fit_transform(y_raw)

# Convert encoded labels to one-hot encoding
num_classes = len(class_names)
y_one_hot = tf.keras.utils.to_categorical(y_encoded, num_classes=num_classes)

print(f"Original labels (first 5): {y_raw[:5]}")
print(f"Encoded labels (first 5): {y_encoded[:5]}")
print(f"One-Hot Encoded labels (first 5 rows):\n{y_one_hot[:5]}")
print(f"Classes mapping: {list(label_encoder.classes_)}")
print(f"Number of classes: {num_classes}")
print("Labels encoded successfully.")


### **2.4 Data Splitting** <font color=red> [5 marks] </font><br>

#### **2.4.1** <font color=red> [5 marks] </font><br>
Split the dataset into training and validation sets

In [None]:
# Assign specified parts of the dataset to train and validation sets
# Splitting data into training and testing sets (80% train, 20% test)
X_train, X_test, y_train, y_test = train_test_split(X, y_one_hot, test_size=0.2, random_state=42, stratify=y_encoded)

print(f"Training images shape: {X_train.shape}")
print(f"Testing images shape: {X_test.shape}")
print(f"Training labels shape: {y_train.shape}")
print(f"Testing labels shape: {y_test.shape}")
print("Dataset split into training and testing sets.")


## **3. Model Building and Evaluation** <font color=red> [20 marks] </font><br>

### **3.1 Model building and training** <font color=red> [15 marks] </font><br>

#### **3.1.1** <font color=red> [10 marks] </font><br>
Build and compile the model. Use 3 convolutional layers. Add suitable normalisation, dropout, and fully connected layers to the model.

Test out different configurations and report the results in conclusions.

In [None]:
# Build and compile the model
model = Sequential([
    # First Convolutional Block
    Conv2D(32, (3, 3), activation='relu', input_shape=(IMG_WIDTH, IMG_HEIGHT, 3)),
    BatchNormalization(), # Normalize activations
    MaxPooling2D((2, 2)),
    Dropout(0.25), # Regularization to prevent overfitting

    # Second Convolutional Block
    Conv2D(64, (3, 3), activation='relu'),
    BatchNormalization(),
    MaxPooling2D((2, 2)),
    Dropout(0.25),

    # Third Convolutional Block
    Conv2D(128, (3, 3), activation='relu'),
    BatchNormalization(),
    MaxPooling2D((2, 2)),
    Dropout(0.25),

    # Flatten the output for the fully connected layers
    Flatten(),

    # Fully Connected Layers
    Dense(256, activation='relu'),
    BatchNormalization(),
    Dropout(0.5), # More aggressive dropout for dense layers
    Dense(num_classes, activation='softmax') # Output layer with softmax for multi-class classification
])


#### **3.1.2** <font color=red> [5 marks] </font><br>
Train the model.

Use appropriate metrics and callbacks as needed.

In [None]:
# Training
# ModelCheckpoint to save the best model during training
checkpoint_filepath = 'best_model.h5' # Save the model with the best validation accuracy
model_checkpoint_callback = ModelCheckpoint(
    filepath=checkpoint_filepath,
    save_weights_only=False, # Save the entire model
    monitor='val_accuracy',
    mode='max',
    save_best_only=True, # Only save when validation accuracy improves
    verbose=1
)

# EarlyStopping to stop training if validation accuracy doesn't improve
early_stopping_callback = EarlyStopping(
    monitor='val_accuracy',
    patience=10, # Number of epochs with no improvement after which training will be stopped.
    mode='max',
    restore_best_weights=True, # Restore model weights from the epoch with the best value of the monitored quantity.
    verbose=1
)

# Train the model
EPOCHS = 20 # Started with higher and going lower due to memory issue.
BATCH_SIZE = 18

history = model.fit(
    X_train, y_train,
    epochs=EPOCHS,
    batch_size=BATCH_SIZE,
    validation_data=(X_test, y_test), # Use the test set as validation data
    callbacks=[model_checkpoint_callback, early_stopping_callback],
    verbose=1
)

print("\nModel training complete.")

# Plot training history
plt.figure(figsize=(12, 5))

# Plot training & validation accuracy values
plt.subplot(1, 2, 1)
plt.plot(history.history['accuracy'])
plt.plot(history.history['val_accuracy'])
plt.title('Model Accuracy')
plt.ylabel('Accuracy')
plt.xlabel('Epoch')
plt.legend(['Train', 'Validation'], loc='upper left')

# Plot training & validation loss values
plt.subplot(1, 2, 2)
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.title('Model Loss')
plt.ylabel('Loss')
plt.xlabel('Epoch')
plt.legend(['Train', 'Validation'], loc='upper left')
plt.tight_layout()
plt.show()


### **3.2 Model Testing and Evaluation** <font color=red> [5 marks] </font><br>

#### **3.2.1** <font color=red> [5 marks] </font><br>
Evaluate the model on test dataset. Derive appropriate metrics.

In [None]:
# Evaluate on the test set; display suitable metrics
# Loading the best model saved by ModelCheckpoint
best_model = tf.keras.models.load_model(checkpoint_filepath)

# Evaluating the best model on the test set
loss, accuracy = best_model.evaluate(X_test, y_test, verbose=0)
print(f"Test Loss: {loss:.4f}")
print(f"Test Accuracy: {accuracy:.4f}")

# Predict probabilities for the test set
y_pred_probs = best_model.predict(X_test)

# Convert probabilities to class labels
y_pred = np.argmax(y_pred_probs, axis=1)

# Convert one-hot encoded true labels back to single integer labels
y_true = np.argmax(y_test, axis=1)

# Generate Classification Report
print("\nClassification Report:")
print(classification_report(y_true, y_pred, target_names=label_encoder.classes_))

# Generate Confusion Matrix
print("\nConfusion Matrix:")
cm = confusion_matrix(y_true, y_pred)
plt.figure(figsize=(8, 6))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=label_encoder.classes_, yticklabels=label_encoder.classes_)
plt.xlabel('Predicted Label')
plt.ylabel('True Label')
plt.title('Confusion Matrix')
plt.show()
print("Model evaluation complete. Metrics displayed.")


## **4. Data Augmentation** <font color=red> [optional] </font><br>

#### **4.1 Create a Data Augmentation Pipeline**

##### **4.1.1**
Define augmentation steps for the datasets.

In [None]:
# Define augmentation steps to augment images
# Different augmentation techniques for image classification
datagen = ImageDataGenerator(
    rotation_range=20,        # Rotate images by a random angle (0-20 degrees)
    width_shift_range=0.1,    # Shift images horizontally by 10% of total width
    height_shift_range=0.1,   # Shift images vertically by 10% of total height
    shear_range=0.1,          # Apply shear transformation
    zoom_range=0.1,           # Zoom in/out by 10%
    horizontal_flip=True,     # Randomly flip images horizontally
    fill_mode='nearest'       # Strategy for filling in newly created pixels
)


datagen.fit(X_train)
print("Data augmentation pipeline defined and fitted on training data.")


Augment and resample the images.
In case of class imbalance, you can also perform adequate undersampling on the majority class and augment those images to ensure consistency in the input datasets for both classes.

Augment the images.

In [None]:
# Create a function to augment the images


print("\nCreating augmented training dataset generator.")
augmented_train_generator = datagen.flow(X_train, y_train, batch_size=BATCH_SIZE)
print("Augmented training dataset created.")


In [None]:
# Create the augmented training dataset



##### **4.1.2**

Train the model on the new augmented dataset.

In [None]:
# Train the model using augmented images
augmented_model = Sequential([
    # First Convolutional Block
    Conv2D(32, (3, 3), activation='relu', input_shape=(IMG_WIDTH, IMG_HEIGHT, 3)),
    BatchNormalization(),
    MaxPooling2D((2, 2)),
    Dropout(0.25),

    # Second Convolutional Block
    Conv2D(64, (3, 3), activation='relu'),
    BatchNormalization(),
    MaxPooling2D((2, 2)),
    Dropout(0.25),

    # Third Convolutional Block
    Conv2D(128, (3, 3), activation='relu'),
    BatchNormalization(),
    MaxPooling2D((2, 2)),
    Dropout(0.25),

    # Flatten the output for the fully connected layers
    Flatten(),

    # Fully Connected Layers
    Dense(256, activation='relu'),
    BatchNormalization(),
    Dropout(0.5),
    Dense(num_classes, activation='softmax')
])

augmented_model.compile(optimizer='adam',
                        loss='categorical_crossentropy',
                        metrics=['accuracy'])

augmented_model.summary()
print("New model initialized and compiled for augmented training.")

# Define callbacks for augmented training
checkpoint_filepath_aug = 'best_augmented_model.h5'
model_checkpoint_callback_aug = ModelCheckpoint(
    filepath=checkpoint_filepath_aug,
    save_weights_only=False,
    monitor='val_accuracy',
    mode='max',
    save_best_only=True,
    verbose=1
)

early_stopping_callback_aug = EarlyStopping(
    monitor='val_accuracy',
    patience=15, # Increased patience for augmented data as it might take longer to converge
    mode='max',
    restore_best_weights=True,
    verbose=1
)

# Train the model using the augmented data generator
# Use validation_data for evaluation on the test set
history_aug = augmented_model.fit(
    augmented_train_generator,
    steps_per_epoch=len(X_train) // BATCH_SIZE, # Number of batches per epoch
    epochs=EPOCHS,
    validation_data=(X_test, y_test),
    callbacks=[model_checkpoint_callback_aug, early_stopping_callback_aug],
    verbose=1
)

print("\nModel training with augmented data complete.")

# Plot training history for augmented model
plt.figure(figsize=(12, 5))

# Plot training & validation accuracy values
plt.subplot(1, 2, 1)
plt.plot(history_aug.history['accuracy'])
plt.plot(history_aug.history['val_accuracy'])
plt.title('Augmented Model Accuracy')
plt.ylabel('Accuracy')
plt.xlabel('Epoch')
plt.legend(['Train', 'Validation'], loc='upper left')

# Plot training & validation loss values
plt.subplot(1, 2, 2)
plt.plot(history_aug.history['loss'])
plt.plot(history_aug.history['val_loss'])
plt.title('Augmented Model Loss')
plt.ylabel('Loss')
plt.xlabel('Epoch')
plt.legend(['Train', 'Validation'], loc='upper left')
plt.tight_layout()
plt.show()

# Evaluate the augmented model
print("\n--- Evaluating Augmented Model on Test Set ---")
best_augmented_model = tf.keras.models.load_model(checkpoint_filepath_aug)
loss_aug, accuracy_aug = best_augmented_model.evaluate(X_test, y_test, verbose=0)
print(f"Augmented Model Test Loss: {loss_aug:.4f}")
print(f"Augmented Model Test Accuracy: {accuracy_aug:.4f}")

y_pred_probs_aug = best_augmented_model.predict(X_test)
y_pred_aug = np.argmax(y_pred_probs_aug, axis=1)
print("\nClassification Report (Augmented Model):")
print(classification_report(y_true, y_pred_aug, target_names=label_encoder.classes_))


## **5. Conclusions** <font color = red> [5 marks]</font>

#### **5.1 Conclude with outcomes and insights gained** <font color =red> [5 marks] </font>

* Report your findings about the data
* Report model training results

There are many materials which are of same type and create issue in the classification.
There are many items which if tried to be caught then creates a lot of noise.
High iteration and sample sizes are good for getting best resutls but should be optimized due to resource contraints.
It is best to go for iterative learning to find the best segregation of the garbage.