In [1]:
!pip install kaggle

from google.colab import files
files.upload()  # Upload file kaggle.json: Kaggle (Account > API > Create New API Token)
!mkdir -p ~/.kaggle
!cp kaggle.json ~/.kaggle/
!chmod 600 ~/.kaggle/kaggle.json
!kaggle datasets download -d alistairking/recyclable-and-household-waste-classification
!unzip recyclable-and-household-waste-classification.zip -d /content/dataset



[1;30;43mStreaming output truncated to the last 5000 lines.[0m
  inflating: /content/dataset/images/images/plastic_shopping_bags/default/Image_1.png  
  inflating: /content/dataset/images/images/plastic_shopping_bags/default/Image_10.png  
  inflating: /content/dataset/images/images/plastic_shopping_bags/default/Image_100.png  
  inflating: /content/dataset/images/images/plastic_shopping_bags/default/Image_101.png  
  inflating: /content/dataset/images/images/plastic_shopping_bags/default/Image_102.png  
  inflating: /content/dataset/images/images/plastic_shopping_bags/default/Image_103.png  
  inflating: /content/dataset/images/images/plastic_shopping_bags/default/Image_104.png  
  inflating: /content/dataset/images/images/plastic_shopping_bags/default/Image_105.png  
  inflating: /content/dataset/images/images/plastic_shopping_bags/default/Image_106.png  
  inflating: /content/dataset/images/images/plastic_shopping_bags/default/Image_107.png  
  inflating: /content/dataset/images/i

# Label

In [2]:
import os
import shutil
import pandas as pd
from pathlib import Path

dataset_path = '/content/dataset/images/images'

category_mapping = {
    # Plastic
    'disposable_plastic_cutlery': 'Plastic',
    'plastic_cup_lids': 'Plastic',
    'plastic_detergent_bottles': 'Plastic',
    'plastic_food_containers': 'Plastic',
    'plastic_shopping_bags': 'Plastic',
    'plastic_soda_bottles': 'Plastic',
    'plastic_straws': 'Plastic',
    'plastic_trash_bags': 'Plastic',
    'plastic_water_bottles': 'Plastic',  # Adjust if the folder name is different

    # Paper/Cardboard
    'cardboard_boxes': 'Paper_Cardboard',
    'cardboard_packaging': 'Paper_Cardboard',
    'magazines': 'Paper_Cardboard',
    'newspaper': 'Paper_Cardboard',
    'office_paper': 'Paper_Cardboard',
    'paper_cups': 'Paper_Cardboard',

    # Organic/Food Waste
    'coffee_grounds': 'Organic_Food_Waste',
    'eggshells': 'Organic_Food_Waste',
    'food_waste': 'Organic_Food_Waste',
    'tea_bags': 'Organic_Food_Waste',

    # Glass
    'glass_beverage_bottles': 'Glass',
    'glass_cosmetic_containers': 'Glass',
    'glass_food_jars': 'Glass',

    # Metal/Other
    'aerosol_cans': 'Metal_Other',
    'aluminum_food_cans': 'Metal_Other',
    'aluminum_soda_cans': 'Metal_Other',
    'steel_food_cans': 'Metal_Other',
    'styrofoam_cups': 'Metal_Other',
    'styrofoam_food_containers': 'Metal_Other',
    'clothing': 'Metal_Other',
    'shoes': 'Metal_Other'
}

def collect_image_data(dataset_path):
    image_data = []

    # Iterate through each category folder in the dataset
    for category in os.listdir(dataset_path):
        category_path = os.path.join(dataset_path, category)

        # Check if it's a directory
        if os.path.isdir(category_path):
            # Check if the category exists in the mapping
            if category in category_mapping:
                main_label = category_mapping[category]
            else:
                print(f"Warning: Category '{category}' not found in mapping. Skipping...")
                continue

            # Iterate through 'default' and 'real_world' subfolders
            for subfolder in ['default', 'real_world']:
                subfolder_path = os.path.join(category_path, subfolder)

                if os.path.exists(subfolder_path):
                    # Iterate through all images in the subfolder
                    for image_name in os.listdir(subfolder_path):
                        if image_name.endswith('.png'):  # Ensure it's a PNG file
                            image_path = os.path.join(subfolder_path, image_name)
                            # Append the image path, original category, main label, and subfolder type
                            image_data.append({
                                'image_path': image_path,
                                'original_category': category,
                                'main_label': main_label,
                                'subfolder': subfolder
                            })
                else:
                    print(f"Subfolder '{subfolder}' not found in category '{category}'")

    return image_data

# Collect the image data
image_data = collect_image_data(dataset_path)

# Convert to a pandas DataFrame
df = pd.DataFrame(image_data)

# Display the first few rows of the DataFrame
print("Dataset Overview:")
print(df.head())
print("\nLabel Distribution:")
print(df['main_label'].value_counts())

# Save the labeled dataset to a CSV file
output_csv_path = '/content/labeled_waste_dataset.csv'  # Path to save the CSV
df.to_csv(output_csv_path, index=False)
print(f"\nLabeled dataset saved to: {output_csv_path}")

# Create a new folder structure based on the main labels while preserving 'default' and 'real_world'
create_new_structure = True  # Set to False if you don't want to create a new folder structure

if create_new_structure:
    output_dataset_path = '/content/labeled_dataset'
    os.makedirs(output_dataset_path, exist_ok=True)

    # Create subfolders for each main label and include 'default' and 'real_world' subfolders
    for label in df['main_label'].unique():
        label_path = os.path.join(output_dataset_path, label)
        os.makedirs(label_path, exist_ok=True)
        # Create 'default' and 'real_world' subfolders under each main label
        os.makedirs(os.path.join(label_path, 'default'), exist_ok=True)
        os.makedirs(os.path.join(label_path, 'real_world'), exist_ok=True)

    # Copy images to the new structure while preserving the 'default' and 'real_world' subfolders
    for idx, row in df.iterrows():
        src_path = row['image_path']
        label = row['main_label']
        subfolder = row['subfolder']  # 'default' or 'real_world'
        image_name = os.path.basename(src_path)
        dst_path = os.path.join(output_dataset_path, label, subfolder, image_name)

        # Handle duplicate filenames by adding a suffix if needed
        base, extension = os.path.splitext(image_name)
        counter = 1
        while os.path.exists(dst_path):
            new_image_name = f"{base}_{counter}{extension}"
            dst_path = os.path.join(output_dataset_path, label, subfolder, new_image_name)
            counter += 1

        shutil.copy(src_path, dst_path)

    print(f"\nNew dataset structure created at: {output_dataset_path}")

Dataset Overview:
                                          image_path      original_category  \
0  /content/dataset/images/images/plastic_water_b...  plastic_water_bottles   
1  /content/dataset/images/images/plastic_water_b...  plastic_water_bottles   
2  /content/dataset/images/images/plastic_water_b...  plastic_water_bottles   
3  /content/dataset/images/images/plastic_water_b...  plastic_water_bottles   
4  /content/dataset/images/images/plastic_water_b...  plastic_water_bottles   

  main_label subfolder  
0    Plastic   default  
1    Plastic   default  
2    Plastic   default  
3    Plastic   default  
4    Plastic   default  

Label Distribution:
main_label
Plastic               4500
Metal_Other           4000
Paper_Cardboard       3000
Organic_Food_Waste    2000
Glass                 1500
Name: count, dtype: int64

Labeled dataset saved to: /content/labeled_waste_dataset.csv

New dataset structure created at: /content/labeled_dataset


# Augmentation data

In [3]:
labeled_dir = "/content/labeled_dataset"
subfolders = ["default", "real_world"]

print("Merging default and real_world in labeled_dataset...")
for label in os.listdir(labeled_dir):
    label_path = os.path.join(labeled_dir, label)
    if not os.path.isdir(label_path):
        continue

    for subfd in subfolders:
        subfd_path = os.path.join(label_path, subfd)
        if not os.path.exists(subfd_path):
            continue

        for img_name in os.listdir(subfd_path):
            src = os.path.join(subfd_path, img_name)
            dst = os.path.join(label_path, img_name)

            base, extension = os.path.splitext(img_name)
            counter = 1
            while os.path.exists(dst):
                new_img_name = f"{base}_{counter}{extension}"
                dst = os.path.join(label_path, new_img_name)
                counter += 1

            shutil.move(src, dst)

        shutil.rmtree(subfd_path)

Merging default and real_world in labeled_dataset...


In [4]:
import os
import cv2
import numpy as np
from tqdm import tqdm
from tensorflow.keras.preprocessing.image import ImageDataGenerator

datagen = ImageDataGenerator(
    rotation_range=15,
    shear_range=0.15,
    zoom_range=0.1,
    horizontal_flip=True,
    brightness_range=[0.8, 1.2],
    fill_mode='nearest'
)

classes = ["Plastic","Metal_Other", "Paper_Cardboard","Organic_Food_Waste", "Glass"]
source_base = "/content/labeled_dataset"
target_base = "/content/augmented_data"
AUG_PER_IMG = 1
MAX_IMAGES = 1500

for cls in classes:
    src_folder = os.path.join(source_base, cls)
    dst_folder = os.path.join(target_base, cls)
    os.makedirs(dst_folder, exist_ok=True)

    print(f"Augmenting class: {cls}")
    img_list = os.listdir(src_folder)[:MAX_IMAGES]
    processed_count = 0

    for img_name in tqdm(img_list):
        if processed_count >= MAX_IMAGES:  #
            break

        img_path = os.path.join(src_folder, img_name)
        img = cv2.imread(img_path)
        if img is None:
            continue

        img = np.expand_dims(img, 0)
        for i, batch in enumerate(datagen.flow(img, batch_size=1)):
            save_name = f"{img_name.split('.')[0]}_aug.jpg"
            save_path = os.path.join(dst_folder, save_name)
            cv2.imwrite(save_path, batch[0].astype(np.uint8))
            if i + 1 >= AUG_PER_IMG:
                break

        processed_count += 1


Augmenting class: Plastic


100%|██████████| 1500/1500 [00:41<00:00, 36.28it/s]


Augmenting class: Metal_Other


100%|██████████| 1500/1500 [00:37<00:00, 39.48it/s]


Augmenting class: Paper_Cardboard


100%|██████████| 1500/1500 [00:38<00:00, 39.31it/s]


Augmenting class: Organic_Food_Waste


100%|██████████| 1500/1500 [00:37<00:00, 40.23it/s]


Augmenting class: Glass


100%|██████████| 1500/1500 [00:36<00:00, 40.75it/s]


In [5]:
augmented_dir = "/content/augmented_data"
print("\nNumber of images in augmented_dataset:")
for label in os.listdir(augmented_dir):
    label_path = os.path.join(augmented_dir, label)
    if os.path.isdir(label_path):
        num_images = len([f for f in os.listdir(label_path) if f.endswith(('.jpg', '.jpeg', '.png'))])
        print(f"Class {label}: {num_images} pictures")
    else:
        print(f"Class {label}: Folder not found")


Number of images in augmented_dataset:
Class Metal_Other: 1500 pictures
Class Glass: 1500 pictures
Class Organic_Food_Waste: 1500 pictures
Class Paper_Cardboard: 1500 pictures
Class Plastic: 1500 pictures


In [6]:
import os
import shutil

labeled_dir = "/content/labeled_dataset"
augmented_dir = "/content/augmented_data"
combined_dir = "/content/combined_dataset"
classes = ["Plastic","Metal_Other", "Paper_Cardboard","Organic_Food_Waste", "Glass"]
MAX_IMAGES = 1500

def count_image_files(folder):
    valid_extensions = {'.jpg', '.jpeg', '.png'}
    return len([f for f in os.listdir(folder) if os.path.splitext(f)[1].lower() in valid_extensions])

os.makedirs(combined_dir, exist_ok=True)

for label in classes:
    src_label_path = os.path.join(labeled_dir, label)
    aug_label_path = os.path.join(augmented_dir, label)
    dst_label_path = os.path.join(combined_dir, label)

    if not os.path.exists(src_label_path):
        print(f"Folder {src_label_path} not found, skipping...")
        continue
    if not os.path.exists(aug_label_path):
        print(f"Folder {src_label_path} not found, skipping...")
        continue

    os.makedirs(dst_label_path, exist_ok=True)

    # get a list of the first 1500 images
    img_list = [f for f in os.listdir(src_label_path) if os.path.splitext(f)[1].lower() in {'.jpg', '.jpeg', '.png'}][:MAX_IMAGES]

    for img_name in img_list:
        src = os.path.join(src_label_path, img_name)
        dst = os.path.join(dst_label_path, img_name)

        # redundant name files
        base, extension = os.path.splitext(img_name)
        counter = 1
        while os.path.exists(dst):
            new_img_name = f"{base}_{counter}{extension}"
            dst = os.path.join(dst_label_path, new_img_name)
            counter += 1

        shutil.copy(src, dst)

    # copy augment images
    for img_name in img_list:
        aug_img_name = f"{os.path.splitext(img_name)[0]}_aug.jpg"
        aug_src = os.path.join(aug_label_path, aug_img_name)

        if os.path.exists(aug_src):
            dst = os.path.join(dst_label_path, aug_img_name)
            # Xử lý trùng tên file
            base, extension = os.path.splitext(aug_img_name)
            counter = 1
            while os.path.exists(dst):
                new_img_name = f"{base}_{counter}{extension}"
                dst = os.path.join(dst_label_path, new_img_name)
                counter += 1

            shutil.copy(aug_src, dst)
        else:
            print(f"Augment image {aug_img_name} is not exist in {aug_label_path}, skipping...")

# Bước 2: In số lượng ảnh trong combined_dataset để kiểm tra
print("\nNumber of images in combined_dataset:")
for label in classes:
    label_path = os.path.join(combined_dir, label)
    if os.path.exists(label_path) and os.path.isdir(label_path):
        num_images = count_image_files(label_path)
        print(f"Class {label}: {num_images} images")
    else:
        print(f"Class {label}: Folder not exist")


Number of images in combined_dataset:
Class Plastic: 3000 images
Class Metal_Other: 3000 images
Class Paper_Cardboard: 3000 images
Class Organic_Food_Waste: 3000 images
Class Glass: 3000 images
