In [None]:
%pip install pycocotools
%pip install scikit-learn

In [1]:
import json
from pathlib import Path
from sklearn.model_selection import train_test_split
from pycocotools.coco import COCO
import csv

import torch, os, time
import torch.nn.functional as F
import torchvision
from torch import optim
from torch import nn
from torch.utils.data import DataLoader
from tqdm import tqdm
from torchvision import datasets, transforms
from itertools import product
import copy
import os
from pathlib import Path

In [2]:
data_dir = Path('data/')
src_json = data_dir / "annotations.json"              # original 60-class annotations
dst_json = data_dir / "annotations_10cats.json"
out_csv  = data_dir / "image_labels.csv"


In [3]:
old_to_new = {
    0:2, 1:8, 2:8, 3:8, 4:0, 5:0, 6:1, 7:0, 8:2, 9:1, 10:2, 11:2, 12:2,
    13:3, 14:3, 15:3, 16:3, 17:3, 18:3, 19:3, 20:3, 21:0, 22:7, 23:1,
    24:0, 25:5, 26:1, 27:0, 28:2, 29:4, 30:3, 31:3, 32:3, 33:3, 34:3,
    35:3, 36:4, 37:0, 38:0, 39:0, 40:0, 41:0, 42:0, 43:0, 44:0, 45:0,
    46:7, 47:0, 48:0, 49:0, 50:2, 51:6, 52:2, 53:6, 54:0, 55:0, 56:3,
    57:7, 58:9, 59:9
}

grand_names = [
    "Plastic containers & bottles",
    "Glass",
    "Metal",
    "Paper & cardboard",
    "Plastic film / wrappers",
    "Food waste / organics",
    "Textiles & misc. items",
    "Polystyrene / foam",
    "Hazardous / special waste",
    "Other / unlabelled"
]

# ----- Load original TACO annotations -----
with open(src_json) as f:
    coco = json.load(f)

# ----- Determine a single grand category per image -----
image_to_cat = {}   # image_id -> grand_id
for ann in coco["annotations"]:
    img_id = ann["image_id"]
    grand_id = old_to_new[ann["category_id"]]
    # policy: choose the lowest grand_id if multiple present
    if img_id not in image_to_cat:
        image_to_cat[img_id] = grand_id
    else:
        image_to_cat[img_id] = min(image_to_cat[img_id], grand_id)

# ----- Build a simple table: image file name -> grand category -----
rows = []
for img in coco["images"]:
    img_id = img["id"]
    if img_id in image_to_cat:   # skip images with no annotations
        rows.append({
            "file_name": img["file_name"],
            "grand_id": image_to_cat[img_id],
            "grand_name": grand_names[image_to_cat[img_id]]
        })

# ----- Save to CSV -----
with open(out_csv, "w", newline="") as f:
    writer = csv.DictWriter(f, fieldnames=["file_name", "grand_id", "grand_name"])
    writer.writeheader()
    writer.writerows(rows)

print(f"✓ Wrote {len(rows)} rows to {out_csv}")

✓ Wrote 1500 rows to data/image_labels.csv


In [11]:
# %pip install Pillow
# %pip install torch torchvision

# import pandas as pd
# from sklearn.model_selection import train_test_split
# from PIL import Image
# from torch.utils.data import Dataset, DataLoader
# from torchvision import transforms
# import torch

# data_dir   = Path("data")
# images_dir = data_dir / "images"
# csv_file   = data_dir / "image_labels.csv"

# # ---------- Train / Test split ----------
# df = pd.read_csv(csv_file)
# train_df, test_df = train_test_split(
#     df,
#     test_size=0.2,        # 80% train, 20% test
#     random_state=42,
#     shuffle=True
# )
# print(f"Train: {len(train_df)}  Test: {len(test_df)}")

# # (Optional) save the splits
# train_df.to_csv(data_dir / "train_labels.csv", index=False)
# test_df.to_csv(data_dir / "test_labels.csv", index=False)

# # ---------- Transform: resize shortest side to 1024, keep full image ----------
# train_transform = transforms.Compose([
#     transforms.RandomHorizontalFlip(),
#     transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2),
#     transforms.Resize(1024),    # shortest side = 1024 px, keep aspect ratio
#     transforms.ToTensor(),
#     transforms.Normalize(mean=[0.485, 0.456, 0.406],
#                          std=[0.229, 0.224, 0.225])
# ])

# test_transform = transforms.Compose([
#     transforms.Resize(1024),
#     transforms.ToTensor(),
#     transforms.Normalize(mean=[0.485, 0.456, 0.406],
#                          std=[0.229, 0.224, 0.225])
# ])



# # ---------- Custom Dataset ----------
# class SingleLabelDataset(Dataset):
#     def __init__(self, dataframe, img_root, transform=None):
#         self.df = dataframe.reset_index(drop=True)
#         self.root = Path(img_root)
#         self.transform = transform

#     def __len__(self):
#         return len(self.df)

#     def __getitem__(self, idx):
#         row = self.df.iloc[idx]
#         img = Image.open(self.root / row.file_name).convert("RGB")
#         label = torch.tensor(int(row.grand_id), dtype=torch.long)
#         if self.transform:
#             img = self.transform(img)
#         return img, label

# # ---------- Create Datasets & DataLoaders ----------
# train_ds = SingleLabelDataset(train_df, images_dir, transform=train_transform)
# test_ds  = SingleLabelDataset(test_df,  images_dir, transform=test_transform)

# train_loader = DataLoader(train_ds, batch_size=8, shuffle=True, num_workers=4)
# test_loader  = DataLoader(test_ds,  batch_size=8, shuffle=False, num_workers=4)

# print("✓ DataLoaders ready: 1024-pixel images without center crop.")

from pathlib import Path
import pandas as pd
from sklearn.model_selection import train_test_split
from PIL import Image
import torch
from torch.utils.data import TensorDataset, DataLoader
from torchvision import transforms

# ---------- Paths ----------
data_dir   = Path("data")
images_dir = data_dir
csv_file   = data_dir / "image_labels.csv"

# ---------- Train / Test split ----------
df = pd.read_csv(csv_file)
train_df, test_df = train_test_split(df, test_size=0.2, random_state=42, shuffle=True)
print(f"Train: {len(train_df)}  Test: {len(test_df)}")

# ---------- Transforms ----------
train_transform = transforms.Compose([
    transforms.RandomHorizontalFlip(),
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2),
    transforms.Resize((512, 512)),     # shortest side = 1024, keep aspect ratio
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485,0.456,0.406], std=[0.229,0.224,0.225])
])
test_transform = transforms.Compose([
    transforms.Resize((512, 512)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485,0.456,0.406], std=[0.229,0.224,0.225])
])

# ---------- Load all images once and wrap in TensorDataset ----------
def make_tensor_dataset(frame, transform):
    imgs, labels = [], []
    for _, row in frame.iterrows():
        img = Image.open(images_dir / row.file_name).convert("RGB")
        img = transform(img)
        imgs.append(img)
        labels.append(int(row.grand_id))
    return TensorDataset(torch.stack(imgs),
                         torch.tensor(labels, dtype=torch.long))

train_ds = make_tensor_dataset(train_df, train_transform)
test_ds  = make_tensor_dataset(test_df,  test_transform)

train_loader = DataLoader(train_ds, batch_size=8, shuffle=True)
test_loader  = DataLoader(test_ds,  batch_size=8, shuffle=False)

print("✓ DataLoaders ready for training")

Train: 1200  Test: 300
✓ DataLoaders ready for training


In [12]:
class CNN(nn.Module):
	def __init__(self, in_channels: int, num_classes: int = 10):
			super(CNN, self).__init__()
			# ---- Convolutional feature extractor ----
			self.conv1 = nn.Conv2d(in_channels, 8, kernel_size=3, stride=1, padding=1)
			self.pool  = nn.MaxPool2d(kernel_size=2, stride=2)
			self.conv2 = nn.Conv2d(8, 16, kernel_size=3, stride=1, padding=1)

			# ---- Global pooling + classifier ----
			# Always outputs (batch, 16, 1, 1) no matter the input H×W
			self.gap = nn.AdaptiveAvgPool2d((1, 1))
			self.fc1 = nn.Linear(16, num_classes)

	def forward(self, x):
			x = F.relu(self.conv1(x))
			x = self.pool(x)
			x = F.relu(self.conv2(x))
			x = self.gap(x)                # -> (batch, 16, 1, 1)
			x = torch.flatten(x, 1)        # -> (batch, 16)
			x = self.fc1(x)                # logits
			return x

In [13]:
if torch.backends.mps.is_available():
    device = torch.device("mps")   # Use Apple Silicon GPU
else:
    device = torch.device("cpu")   
    

print(device)

mps


In [15]:
model = CNN(in_channels=3, num_classes=10).to(device)

# (Optional) add extra data‐augmentation transforms if needed:
aug_tfms = transforms.Compose([
	transforms.RandomHorizontalFlip(),
	transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2),
])

criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3, weight_decay=1e-4)

epochs = 20
for epoch in range(epochs):
    model.train()
    running_loss, correct, total = 0.0, 0, 0

    # wrap the train_loader with tqdm
    progress_bar = tqdm(train_loader, desc=f"Epoch {epoch+1}/{epochs}", leave=False)

    for imgs, labels in progress_bar:
        imgs, labels = imgs.to(device), labels.to(device)

        optimizer.zero_grad()
        logits = model(imgs)
        loss = criterion(logits, labels)
        loss.backward()
        optimizer.step()

        # update stats
        running_loss += loss.item() * imgs.size(0)
        _, predicted = logits.max(1)
        correct += (predicted == labels).sum().item()
        total += labels.size(0)

        # update the tqdm bar text
        progress_bar.set_postfix({
            "loss": f"{running_loss/total:.4f}",
            "acc":  f"{100*correct/total:.2f}%"
        })

    print(f"Epoch {epoch+1:02d}: "
          f"Train Loss {running_loss/total:.4f}, "
          f"Train Acc {100*correct/total:.2f}%")

                                                                                      

Epoch 01: Train Loss 1.7521, Train Acc 48.67%


                                                                                      

Epoch 02: Train Loss 1.5487, Train Acc 51.50%


                                                                                      

Epoch 03: Train Loss 1.5365, Train Acc 51.50%


                                                                                      

Epoch 04: Train Loss 1.5332, Train Acc 51.50%


                                                                                      

Epoch 05: Train Loss 1.5308, Train Acc 51.50%


                                                                                      

Epoch 06: Train Loss 1.5238, Train Acc 51.50%


                                                                                      

Epoch 07: Train Loss 1.5232, Train Acc 51.50%


                                                                                      

Epoch 08: Train Loss 1.5122, Train Acc 51.50%


                                                                                      

Epoch 09: Train Loss 1.5128, Train Acc 51.50%


                                                                                       

Epoch 10: Train Loss 1.5096, Train Acc 51.50%


                                                                                       

Epoch 11: Train Loss 1.5067, Train Acc 51.58%


                                                                                       

Epoch 12: Train Loss 1.5019, Train Acc 51.67%


                                                                                       

Epoch 13: Train Loss 1.5033, Train Acc 51.67%


                                                                                       

Epoch 14: Train Loss 1.5019, Train Acc 51.75%


                                                                                       

Epoch 15: Train Loss 1.4987, Train Acc 51.75%


                                                                                       

Epoch 16: Train Loss 1.4964, Train Acc 51.58%


                                                                                       

Epoch 17: Train Loss 1.4968, Train Acc 51.75%


                                                                                       

Epoch 18: Train Loss 1.4993, Train Acc 51.67%


                                                                                       

Epoch 19: Train Loss 1.4935, Train Acc 51.42%


                                                                                       

Epoch 20: Train Loss 1.4941, Train Acc 52.00%




In [None]:
DRIVE = Path("/content/drive/My Drive/dataset_splits")
DATA_ROOT = DRIVE / 'dataset_splits'

TRAIN_DIR = DATA_ROOT / "train"
TEST_DIR = DATA_ROOT / "test"

In [None]:
class CNN(nn.Module):
    def __init__(self, in_channels, num_classes=10):
        super(CNN, self).__init__()
        # Layer 1: conv2D
        self.conv1 = nn.Conv2d(in_channels=in_channels, out_channels=8, kernel_size=3, stride=1, padding=1)
        # Layer 2: 2x2 max pooling
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2)
        # Layer 3: conv2D
        self.conv2 = nn.Conv2d(in_channels=8, out_channels=16, kernel_size=3, stride=1, padding=1)
        # Layer 4: Fully connected layer
        self.fc1 = nn.Linear(16 * 14 * 14, num_classes)  # 3136
``

    def forward(self, x):

        x = F.relu(self.conv1(x))
        x = self.pool(x)
        x = F.relu(self.conv2(x))
        x = x.reshape(x.shape[0], -1)
        x = self.fc1(x)
        return x

In [None]:
# Hyperparameters that cannot be changed
input_size = 784
num_classes = 10
# Hyperparameters that can be tuned
learning_rate = 1e-2
batch_size = 64
num_epochs = 20

In [None]:
# transforms
train_tfms = transforms.Compose([
    transforms.Resize((28, 28)),
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor(),
])

eval_tfms = transforms.Compose([
    transforms.Resize((28, 28)),
    transforms.ToTensor(),
])

In [None]:
train_dataset   = datasets.ImageFolder(TRAIN_DIR, transform=train_tfms)
test_dataset = datasets.ImageFolder(TRAIN_DIR, transform=eval_tfms)

# create validation set
val_ratio = 0.15
valLen = int(len(train_dataset) * val_ratio)
trainLen = len(train_dataset) - valLen
train_ds, val_ds = torch.utils.data.random_split(train_dataset, [trainLen, valLen])

# loaders
test_loader = DataLoader(dataset=train_dataset, batch_size=batch_size, shuffle=True)
train_loader = DataLoader(dataset=train_dataset, batch_size=batch_size, shuffle=False)
val_loader = DataLoader(dataset=val_ds, batch_size=batch_size, shuffle=False)


In [None]:
model = CNN(in_channels=1, num_classes=num_classes).to(device)

In [None]:
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=learning_rate)

In [None]:
def train_one_epoch(model, loader, optimizer, criterion, device):
      model.train()
      total_loss = 0
      n_samples = 0

      for images, labels in train_loader:
          images, labels = images.to(device), labels.to(device)

          outputs = model(images)
          loss = criterion(outputs, labels)

          # reset gradients
          optimizer.zero_grad()
          # backpropagate
          loss.backward()
          # update model weights
          optimizer.step()

          bs = images.size(0)
          total_loss += loss.item() * bs
          n_samples += bs

      return total_loss / n_samples
      # print(f"Epoch {epoch+1}/{num_epochs}, Loss: {averageLoss:.4f}")

In [None]:
def evaluate(model, loader, device, criterion):
  model.eval()
  with torch.no_grad():
    total_loss, correct, total = 0, 0, 0
    for images, labels in test_loader:
      images, labels = images.to(device), labels.to(device)

      outputs = model(images)
      loss = criterion(outputs, labels)

      total_loss += loss.item() * images.size(0)
      preds = outputs.argmax(1)
      correct += (preds == labels).sum().item()
      total += labels.size(0)

  test_loss = total_loss / total
  test_acc  = correct / total
  return test_loss, test_acc
  # print(f"Test loss {test_loss:.4f} | Test acc {test_acc:.4f}")

In [None]:
param_grid = {
    "optimizer": ["sgd", "adam"],
    "lr": [1e-1, 1e-2, 1e-3],
    "weight_decay": [0.0, 1e-4, 1e-3],
}
search_epochs = 5

In [None]:
best = {"val_acc": -1.0, "params": None, "state": None}

In [None]:
for opt_name, lr, wd in product(param_grid["optimizer"], param_grid["lr"], param_grid["weight_decay"]):
    model = CNN(in_channels=1, num_classes=10).to(device)

    if opt_name == "sgd":
        optimizer = optim.SGD(model.parameters(), lr=lr, momentum=0.9, weight_decay=wd)
    else:
        optimizer = optim.Adam(model.parameters(), lr=lr, weight_decay=wd)

    for _ in range(search_epochs):
        train_one_epoch(model, train_loader, optimizer, criterion, device)

    val_loss, val_acc = evaluate(model, val_loader, device, criterion)
    print(f"[{opt_name}] lr={lr:.0e}, wd={wd:.0e} -> val_acc={val_acc:.4f}, val_loss={val_loss:.4f}")

    if val_acc > best["val_acc"]:
        best = {"val_acc": val_acc, "params": (opt_name, lr, wd), "state": copy.deepcopy(model.state_dict())}

print("\nBest params:", best["params"], " | best val_acc:", round(best["val_acc"], 4))

Try experimenting different numbers of layers in CNN model
Ways:
    - Fully connected
    - More convolutional layers