Installing all necessary packages

In [None]:
!pip install matplotlib
!pip install opencv-python
!pip install tensorflow
!pip install keras

Import necessary libraries

In [None]:
import os
import matplotlib.pyplot as plt
import cv2
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from sklearn.utils import class_weight
import numpy as np
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Flatten, Dense, Dropout, BatchNormalization
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint, ReduceLROnPlateau
from tensorflow.keras.models import load_model

Renaming and combining datasets

In [None]:
def rename_files_in_category(base_dir, category):
    for sub_dir in ['Train', 'Test']:
        path = os.path.join(base_dir, category, sub_dir)
        files = os.listdir(path)

        for i, filename in enumerate(sorted(files), start=1):
            old_filepath = os.path.join(path, filename)
            new_filename = f"{category}_{i:02d}.jpg"
            new_filepath = os.path.join(path, new_filename)

            os.rename(old_filepath, new_filepath)
            print(f"Renamed '{old_filepath}' to '{new_filepath}'")

# Base directory where the new data is stored
base_dir = "./archive 2"

# List of categories
categories = ["Compost", "Hazardous", "Recycle", "Trash"]

# Renaming all the files in each category
for category in categories:
    rename_files_in_category(base_dir, category)

Data Preparation and Visualization

In [None]:
base_path = "./archive 2"

# Sub-directories for train and test sets
sub_dirs = ['train', 'test']

for sub_dir in sub_dirs:
    data_path = os.path.join(base_path, sub_dir)

    for class_name in os.listdir(data_path):
        class_path = os.path.join(data_path, class_name)

        # Only process if it's a directory, not empty, and not a .DS_Store file
        if os.path.isdir(class_path) and os.listdir(class_path) and not class_name.startswith('.'):
            img_name = os.listdir(class_path)[0]  # Get the first image name
            img_path = os.path.join(class_path, img_name)

            # Convert the image for visualization
            img = cv2.imread(img_path)
            if img is not None:
                plt.imshow(cv2.cvtColor(img, cv2.COLOR_BGR2RGB))
                plt.title(f"{sub_dir} - {class_name}")
                plt.show()
            else:
                print(f"Failed to load image: {img_path}")
        elif class_name.startswith('.'):
            print(f"Skipping hidden file: {class_name}")
        else:
            print(f"No images found in {class_path}")


In [None]:
def preprocess_image(img):
    img = tf.image.rgb_to_grayscale(img)
    img = tf.image.resize(img, (128, 128))
    return img

Data Augmentation and Splitting

In [None]:
train_data_path = os.path.join(base_path, 'train')
validation_data_path = os.path.join(base_path, 'test')

train_datagen = ImageDataGenerator(
    rescale=1./255,
    rotation_range=40,
    width_shift_range=0.2,
    height_shift_range=0.2,
    shear_range=0.2,
    zoom_range=0.2,
    horizontal_flip=True,
    fill_mode='nearest'
)

validation_datagen = ImageDataGenerator(rescale=1./255)

# Setting up the training data generator
train_generator = train_datagen.flow_from_directory(
    train_data_path,
    target_size=(128, 128),
    batch_size=32,
    class_mode='categorical'  # Multi-class classification
)

# Validation data generator
validation_generator = validation_datagen.flow_from_directory(
    validation_data_path,
    target_size=(128, 128),
    batch_size=32,
    class_mode='categorical'
)

Data Balancing : Class weights Calculation

In [None]:
# Calculating the class weights to handle class imbalance
class_weights = class_weight.compute_class_weight(
    class_weight='balanced',
    classes=np.unique(train_generator.classes),
    y=train_generator.classes
)
class_weights_dict = dict(enumerate(class_weights))

# Output class weights for reference
print("Class Weights: ", class_weights_dict)


Handling problematic Images

In [None]:
def clean_dataset(data_path):
    problematic_files = []

    for subdir, dirs, files in os.walk(data_path):
        for file in files:
            filepath = os.path.join(subdir, file)

            # Removing .DS_Store files
            if file == '.DS_Store':
                os.remove(filepath)
                print(f"Removed system file: {filepath}")
                continue

            #corrupt images
            if not filepath.endswith(('.png', '.jpg', '.jpeg')):
                continue

            img = cv2.imread(filepath)
            if img is None:
                problematic_files.append(filepath)

    return problematic_files

data_path = './archive 2'
problematic_files = clean_dataset(data_path)

if problematic_files:
    print("Problematic files:")
    for file in problematic_files:
        print(file)
else:
    print("No problematic files found.")


Model

In [None]:
model = Sequential([
    Conv2D(64, (3, 3), activation='relu', padding='same', input_shape=(128, 128, 3)),
    BatchNormalization(),
    MaxPooling2D(2, 2),

    Conv2D(128, (3, 3), activation='relu', padding='same'),
    BatchNormalization(),
    MaxPooling2D(2, 2),

    Conv2D(256, (3, 3), activation='relu', padding='same'),
    BatchNormalization(),
    MaxPooling2D(2, 2),

    Flatten(),
    Dense(512, activation='relu'),
    Dropout(0.5),
    BatchNormalization(),

    Dense(4, activation='softmax')
])

Callbacks

In [None]:
# Compile the model with a learning rate scheduler
model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=0.001),
              loss='categorical_crossentropy',
              metrics=['accuracy'])

# Early stopping and model checkpoint callbacks
early_stopping = EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True)
model_checkpoint = ModelCheckpoint('best_model.h5', monitor='val_loss', save_best_only=True)
lr_schedule = ReduceLROnPlateau(monitor='val_loss', factor=0.2, patience=2, min_lr=0.00001)


model.summary()

Training the model

In [None]:
history = model.fit(
    train_generator,
    epochs=100,
    validation_data=validation_generator,
    class_weight=class_weights_dict,
    callbacks=[early_stopping, model_checkpoint, lr_schedule],
    verbose=1
)

Plot training & validation accuracy and loss values

In [None]:
plt.figure(figsize=(12, 4))

plt.subplot(1, 2, 1)
plt.plot(history.history['accuracy'], label='Training Accuracy')
plt.plot(history.history['val_accuracy'], label='Validation Accuracy')
plt.title('Accuracy over Epochs')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.legend()

plt.subplot(1, 2, 2)
plt.plot(history.history['loss'], label='Training Loss')
plt.plot(history.history['val_loss'], label='Validation Loss')
plt.title('Loss over Epochs')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()

plt.show()

Evaluate the model on the validation set

In [None]:
val_loss, val_accuracy = model.evaluate(validation_generator)
print(f"Validation Accuracy: {val_accuracy*100:.2f}%")
print(f"Validation Loss: {val_loss:.2f}")

Demo using webcam

In [None]:
def capture_image_from_webcam():
    cap = cv2.VideoCapture(0)
    if not cap.isOpened():
        print("Cannot open camera")
        return

    while True:
        ret, frame = cap.read()
        if not ret:
            print("Can't receive frame (stream end?). Exiting ...")
            break

        cv2.imshow('frame', frame)
        if cv2.waitKey(1) == ord('q'):
            break

    cap.release()
    cv2.destroyAllWindows()
    return frame

image = capture_image_from_webcam()

In [None]:
model = load_model('best_model.h5')

In [None]:
def preprocess_image(image_path, target_size=(128, 128)):
    img = cv2.imread(image_path) 
    img = cv2.resize(img, target_size)
    img = img / 255.0
    img = img.reshape(1, target_size[0], target_size[1], 3) 
    return img

# Preprocess the captured image
preprocessed_image = preprocess_image('image.png')

In [None]:
# Predict the class
prediction = model.predict(preprocessed_image)
class_idx = np.argmax(prediction, axis=1)


class_names = ['Compost', 'Hazardous', 'Recyclables', 'Trash']
predicted_class = class_names[class_idx[0]]
print(f"Predicted Class: {predicted_class}")