<a href="https://colab.research.google.com/github/rediahmds/eco-sort/blob/main/train/build_dataset_cnn.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Connect to Google Drive

In [24]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


## Prepare Dataset

### Download Dataset from Kaggle

The dataset comes from different accounts on Kaggle.

In [25]:
!pip install kagglehub



In [None]:
import kagglehub

alistair_ds = kagglehub.dataset_download("alistairking/recyclable-and-household-waste-classification")
print("Path to dataset files:", alistair_ds)

mostafa_ds = kagglehub.dataset_download("mostafaabla/garbage-classification")
print("Path to dataset files:", mostafa_ds)

joe_ds = kagglehub.dataset_download("joebeachcapital/realwaste")
print("Path to dataset files:", joe_ds)

glhdamar_ds = kagglehub.dataset_download("glhdamar/new-trash-classfication-dataset")
print("Path to dataset files:", glhdamar_ds)

Path to dataset files: /kaggle/input/recyclable-and-household-waste-classification
Path to dataset files: /kaggle/input/garbage-classification
Path to dataset files: /kaggle/input/realwaste


### Show Directory Tree

In [None]:
from pathlib import Path

def print_directory_tree(root: Path, prefix: str = ""):
    """
    Mencetak struktur direktori dengan tampilan seperti pohon.
    Hanya menampilkan folder (tanpa file).
    """
    subdirs = sorted([p for p in root.iterdir() if p.is_dir()])
    for i, subdir in enumerate(subdirs):
        connector = "└── " if i == len(subdirs) - 1 else "├── "
        print(f"{prefix}{connector}{subdir.name}")
        extension = "    " if i == len(subdirs) - 1 else "│   "
        print_directory_tree(subdir, prefix + extension)

In [None]:
# Path ke folder utama
alistair_path = Path(alistair_ds) / "images" / "images"
mostafa_path = Path(mostafa_ds) / "garbage_classification"
joe_path = Path(joe_ds) / "realwaste-main" / "RealWaste"
glhdamar_path = Path(glhdamar_ds) / "new-dataset-trash-type-v2"

print(alistair_path.name)
print_directory_tree(alistair_path)

print(mostafa_path.name)
print_directory_tree(mostafa_path)

print(joe_path.name)
print_directory_tree(joe_path)

print(glhdamar_path.name)
print_directory_tree(glhdamar_path)

### Restructure AlistairKing Dataset

In [None]:

from pathlib import Path
import shutil
from tqdm import tqdm

source_root = alistair_path
target_root = Path("dataset/train")
target_root.mkdir(parents=True, exist_ok=True)

class_map = {
    "food_waste": "organic",
    "eggshells": "organic",
    "coffee_grounds": "organic",
    "tea_bags": "organic",
    "plastic_soda_bottles": "plastic",
    "plastic_trash_bags": "plastic",
    "plastic_food_containers": "plastic",
    "plastic_shopping_bags": "plastic",
    "plastic_straws": "plastic",
    "plastic_water_bottles": "plastic",
    "plastic_detergent_bottles": "plastic",
    "plastic_cup_lids": "plastic",
    "glass_food_jars": "glass",
    "glass_beverage_bottles": "glass",
    "glass_cosmetic_containers": "glass",
    "aluminum_soda_cans": "metal",
    "aluminum_food_cans": "metal",
    "steel_food_cans": "metal",
    "aerosol_cans": "metal",
    "cardboard_boxes": "paper",
    "cardboard_packaging": "paper",
    "magazines": "paper",
    "newspaper": "paper",
    "office_paper": "paper",
    "paper_cups": "paper",
    "styrofoam_cups": "styrofoam",
    "styrofoam_food_containers": "styrofoam",
    "clothing": "textiles",
    "shoes": "textiles"
}


print("🚀 Memulai pengelompokan dataset dengan penamaan ulang...\n")

for class_name, parent_class in class_map.items():
    for subset in ["default", "real_world"]:
        class_dir = source_root / class_name / subset
        if class_dir.exists():
            img_list = list(class_dir.glob("*.*"))
            print(f"📁 Menyalin {len(img_list)} gambar dari '{class_name}/{subset}' ke '{parent_class}'")
            for i, img in enumerate(tqdm(img_list, desc=f"{class_name}/{subset}", leave=False)):
                dest_dir = target_root / parent_class
                dest_dir.mkdir(parents=True, exist_ok=True)

                # Format nama: subset_class_####__asli.ext
                ext = img.suffix
                original_name = img.stem.replace(" ", "_")
                new_name = f"{subset}_{class_name}_{i:04d}__{original_name}{ext}"
                shutil.copy(img, dest_dir / new_name)

print("\n✅ Pengelompokan selesai tanpa konflik penamaan.")
print("📂 Dataset tersimpan di:", target_root.resolve())


### Merge All Dataset

Copy from different sources.

In [None]:
from pathlib import Path
import shutil
import random

def copy_n_files(src_dir, dst_dir, n, randomize=False):
    src_path = Path(src_dir)
    dst_path = Path(dst_dir)

    # Buat folder tujuan jika belum ada
    dst_path.mkdir(parents=True, exist_ok=True)

    # Ambil semua file dari direktori sumber
    all_files = [f for f in src_path.iterdir() if f.is_file()]

    # Pastikan n tidak lebih besar dari jumlah file
    n = min(n, len(all_files))

    # Tentukan file mana yang akan disalin
    if randomize:
        files_to_copy = random.sample(all_files, n)
    else:
        files_to_copy = sorted(all_files)[:n]

    # Copy file satu per satu
    for file in files_to_copy:
        shutil.copy(file, dst_path)
        print(f"Copied: {file.name}")

    print(f"\nTotal {n} files copied from '{src_dir}' to '{dst_dir}' (random: {randomize}).")

In [None]:
# Uncomment all for first run

copy_n_files(f"{mostafa_ds}/garbage_classification/paper", "dataset/train/paper", 500, randomize=True)
copy_n_files(f"{mostafa_ds}/garbage_classification/cardboard", "dataset/train/paper", 500, randomize=True)

copy_n_files(f"{mostafa_ds}/garbage_classification/white-glass", "dataset/train/glass", 600, randomize=True)
copy_n_files(f"{mostafa_ds}/garbage_classification/brown-glass", "dataset/train/glass", 600, randomize=True)
copy_n_files(f"{mostafa_ds}/garbage_classification/green-glass", "dataset/train/glass", 600, randomize=True)

copy_n_files(f"{mostafa_ds}/garbage_classification/clothes", "dataset/train/textiles", 1500, randomize=True)
copy_n_files(f"{mostafa_ds}/garbage_classification/shoes", "dataset/train/textiles", 1500, randomize=True)

copy_n_files(f"{mostafa_ds}/garbage_classification/metal", "dataset/train/metal", 750, randomize=True)
copy_n_files(f"{joe_ds}/realwaste-main/RealWaste/Metal", "dataset/train/metal", 750, randomize=True)

copy_n_files(f"{mostafa_ds}/garbage_classification/biological", "dataset/train/organic", 980, randomize=True)
copy_n_files(f"{glhdamar_ds}/new-dataset-trash-type-v2/organic", "dataset/train/organic", 960, randomize=True)

### Check for Duplicates

#### Helper: Auto Reconnect

In [None]:
#@title Time Out Preventer (Advanced)
%%capture
AUTO_RECONNECT = True #@param {type:"boolean"}
#@markdown **Run this code to prevent Google Colab from Timeout**
from os import makedirs
makedirs("/root/.config/rclone", exist_ok = True)
if AUTO_RECONNECT:
  import IPython
  from google.colab import output

  display(IPython.display.Javascript('''
  function ClickConnect(){
    btn = document.querySelector("colab-connect-button")
    if (btn != null){
      console.log("Click colab-connect-button");
      btn.click()
      }

    btn = document.getElementById('ok')
    if (btn != null){
      console.log("Click reconnect");
      btn.click()
      }
    }

  setInterval(ClickConnect,60000)
  '''))

In [None]:
!pip install imagehash

In [None]:
from pathlib import Path
from PIL import Image
import imagehash
import torch
import torch.nn.functional as F
from torchvision import models, transforms
from collections import defaultdict
import matplotlib.pyplot as plt

def get_resnet_embedding(img: Image.Image, model, transform, device):
    img_tensor = transform(img).unsqueeze(0).to(device)
    with torch.no_grad():
        features = model(img_tensor).squeeze()
    return features.cpu()


def find_duplicates_images_with_end_deletion(
    dataset_dir: Path | str,
    hash_threshold: int = 5,
    sim_threshold: float = 0.98,
    delete: bool = False,
    show_preview: bool = True
):
    dataset_dir = Path(dataset_dir)
    print(f"🔍 Mendeteksi duplikat di dalam: {dataset_dir.resolve()}")

    # === Inisialisasi hash & CNN
    hash_dict = defaultdict(list)
    duplicates = []

    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    cnn_model = models.resnet18(weights=models.ResNet18_Weights.DEFAULT)
    cnn_model = torch.nn.Sequential(*list(cnn_model.children())[:-1])
    cnn_model.eval().to(device)

    transform = transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.ToTensor()
    ])

    counter = 0

    for img_path in dataset_dir.rglob("*.*"):
        if img_path.suffix.lower() not in [".jpg", ".jpeg", ".png"]:
            continue
        try:
            img = Image.open(img_path).convert("RGB")
            h = imagehash.average_hash(img)

            # 🔁 Bandingkan dengan semua hash yang mirip
            for existing_hash in hash_dict:
                if h - existing_hash <= hash_threshold:
                    for existing_path in hash_dict[existing_hash]:
                        try:
                            img1 = Image.open(existing_path).convert("RGB")
                            emb1 = get_resnet_embedding(img1, cnn_model, transform, device)
                            emb2 = get_resnet_embedding(img, cnn_model, transform, device)

                            sim = F.cosine_similarity(emb1, emb2, dim=0).item()
                            if sim >= sim_threshold:
                                counter += 1
                                print(f"🔢 Kode duplikat: {counter}")
                                print(f"⚠️ Duplikat terdeteksi:")
                                print(f"  Original : {existing_path.name}")
                                print(f"  Duplicate: {img_path.name}")
                                print(f"  🔗 Cosine Similarity: {sim:.4f}")

                                duplicates.append(img_path)

                                if show_preview:
                                    fig, ax = plt.subplots(1, 2, figsize=(8, 4))
                                    ax[0].imshow(img1)
                                    ax[0].set_title(f"Original: {existing_path.name}")
                                    ax[1].imshow(img)
                                    ax[1].set_title(f"Duplicate: {img_path.name}")
                                    for a in ax:
                                        a.axis("off")
                                    plt.tight_layout()
                                    plt.show()

                                print("\n")

                                break  # cukup validasi 1 yang cocok

                        except Exception as e:
                            print(f"❌ Error validasi CNN: {e}")
                    else:
                        continue
                    break  # keluar dari hash loop jika sudah match

            hash_dict[h].append(img_path)

        except Exception as e:
            print(f"❌ Gagal membuka {img_path}: {e}")

    # === Hapus jika diminta
    if delete:
        for dup in duplicates:
            try:
                dup.unlink()
                print(f"🗑️ Menghapus: {dup}")
            except Exception as e:
                print(f"❌ Gagal menghapus {dup}: {e}")

    print(f"\n✅ Total duplikat terverifikasi: {len(duplicates)}")
    return duplicates


def find_duplicates_images_with_immediate_deletion(
    dataset_dir: Path | str,
    hash_threshold: int = 5,
    sim_threshold: float = 0.98,
    delete: bool = False,
    show_preview: bool = True
):
    dataset_dir = Path(dataset_dir)
    print(f"🔍 Mendeteksi duplikat di dalam: {dataset_dir.resolve()}")

    hash_dict = defaultdict(list)
    duplicates = []

    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    cnn_model = models.resnet18(weights=models.ResNet18_Weights.DEFAULT)
    cnn_model = torch.nn.Sequential(*list(cnn_model.children())[:-1])
    cnn_model.eval().to(device)

    transform = transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.ToTensor()
    ])

    counter = 0

    for img_path in dataset_dir.rglob("*.*"):
        if img_path.suffix.lower() not in [".jpg", ".jpeg", ".png"]:
            continue
        try:
            img = Image.open(img_path).convert("RGB")
            h = imagehash.average_hash(img)

            for existing_hash in hash_dict:
                if h - existing_hash <= hash_threshold:
                    for existing_path in hash_dict[existing_hash]:
                        try:
                            img1 = Image.open(existing_path).convert("RGB")
                            emb1 = get_resnet_embedding(img1, cnn_model, transform, device)
                            emb2 = get_resnet_embedding(img, cnn_model, transform, device)

                            sim = F.cosine_similarity(emb1, emb2, dim=0).item()
                            if sim >= sim_threshold:
                                counter += 1
                                print(f"🔢 Kode duplikat: {counter}")
                                print(f"⚠️ Duplikat terdeteksi:")
                                print(f"  Original : {existing_path.name}")
                                print(f"  Duplicate: {img_path.name}")
                                print(f"  🔗 Cosine Similarity: {sim:.4f}")

                                if show_preview:
                                    fig, ax = plt.subplots(1, 2, figsize=(8, 4))
                                    ax[0].imshow(img1)
                                    ax[0].set_title(f"Original: {existing_path.name}")
                                    ax[1].imshow(img)
                                    ax[1].set_title(f"Duplicate: {img_path.name}")
                                    for a in ax:
                                        a.axis("off")
                                    plt.tight_layout()
                                    plt.show()

                                if delete:
                                    try:
                                        img_path.unlink()
                                        print(f"🗑️ Duplikat dihapus: {img_path}")
                                    except Exception as e:
                                        print(f"❌ Gagal menghapus {img_path}: {e}")
                                else:
                                    duplicates.append(img_path)

                                print("\n")
                                break  # match pertama cukup
                        except Exception as e:
                            print(f"❌ Error validasi CNN: {e}")
                    else:
                        continue
                    break  # hash cocok, stop

            hash_dict[h].append(img_path)

        except Exception as e:
            print(f"❌ Gagal membuka {img_path}: {e}")

    print(f"\n✅ Total duplikat terverifikasi: {counter}")
    return duplicates

In [None]:
find_duplicates_images_with_end_deletion(
    "dataset/train",
    hash_threshold=6,
    delete=True,
    show_preview=False,
)

### Check Dataset Distribution

In [None]:
from collections import Counter
from torchvision.datasets import ImageFolder

train_dataset = ImageFolder("dataset/train")
label_counts = Counter([label for _, label in train_dataset])
print("Label mapping:", train_dataset.class_to_idx)
print("Distribusi kelas:", label_counts)

## Archive Dataset and Save to Google Drive

In [None]:
!7z a dataset.7z dataset/

In [None]:
!cp dataset.7z /content/drive/MyDrive/