In [4]:
import os
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
from torchvision import transforms
from PIL import Image
import matplotlib.pyplot as plt
import random

## Custom Dataset Class

We define a custom `WasteDataset` class that inherits from PyTorch's `Dataset` class. This class is responsible for loading and preprocessing the images from the dataset.

### Initialization

The `__init__` method takes the following parameters:
- `root_dir`: The root directory containing the dataset images.
- `split`: The dataset split (train, validation, or test).
- `transform`: Optional image transformations to be applied.

Inside the `__init__` method, we:
1. Store the `root_dir`, `transform`, and `split` parameters.
2. Get the list of class names by listing the directories in `root_dir`.
3. Initialize empty lists for `image_paths` and `labels`.
4. Iterate over each class directory and its subfolders ('default' and 'real_world').
5. Shuffle the image names in each subfolder.
6. Based on the `split` parameter, select a portion of the images (60% for train, 20% for validation, 20% for test).
7. Append the image paths and corresponding labels to the respective lists.

### Length and Item Retrieval

The `__len__` method returns the total number of images in the dataset.

The `__getitem__` method takes an `index` and returns the image and its corresponding label at that index. It:
1. Retrieves the image path and label using the provided index.
2. Opens the image using `Image.open()` and converts it to RGB format.
3. Applies the specified image transformations, if any.
4. Returns the transformed image and its label.

This custom dataset class allows us to easily load and preprocess the waste images for training, validation, and testing.

In [5]:
# Define the dataset class (modified to include a split parameter)
class WasteDataset(Dataset):
    def __init__(self, root_dir, split, transform=None):
        self.root_dir = root_dir
        self.transform = transform
        self.classes = ['aluminum_soda_cans','cardboard_boxes','paper_cups','plastic_water_bottles'] #sorted(os.listdir(root_dir)) #['aluminum_soda_cans','cardboard_boxes','paper_cups','plastic_water_bottles'] 
        self.image_paths = []
        self.labels = []
        
        for i, class_name in enumerate(self.classes):
                class_dir = os.path.join(root_dir, class_name)
                image_names = os.listdir(class_dir)
                random.shuffle(image_names)
                
                if split == 'train':
                    image_names = image_names[:int(0.6 * len(image_names))]
                elif split == 'val':
                    image_names = image_names[int(0.6 * len(image_names)):int(0.8 * len(image_names))]
                else:  # split == 'test'
                    image_names = image_names[int(0.8 * len(image_names)):]
                
                for image_name in image_names:
                    self.image_paths.append(os.path.join(class_dir, image_name))
                    self.labels.append(i)
    
    def __len__(self):
        return len(self.image_paths)
    
    def __getitem__(self, index):
        image_path = self.image_paths[index]
        label = self.labels[index]
        image = Image.open(image_path).convert('RGB')
        
        if self.transform:
            image = self.transform(image)
        
        return image, label

## Dataset Path and Hyperparameters

We set the following dataset path and hyperparameters:
- `dataset_path`: The path to the directory containing the dataset images.
- `batch_size`: The number of samples per batch during training and evaluation.
- `num_epochs`: The number of epochs to train the model.
- `learning_rate`: The learning rate for the optimizer.

These hyperparameters can be adjusted based on the specific requirements and available computational resources.

In [6]:
# Set the dataset path and hyperparameters
dataset_path = 'output_images'
batch_size = 32
num_epochs = 5
learning_rate = 0.001

## Data Preprocessing and Loaders

We define a composition of image transformations using `transforms.Compose`:
1. `transforms.Resize((224, 224))`: Resizes the images to a fixed size of (224, 224) pixels.
2. `transforms.ToTensor()`: Converts the images to PyTorch tensors.
3. `transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])`: Normalizes the image tensors using the specified mean and standard deviation values.

These transformations ensure that the images are preprocessed consistently before being fed into the model.

We create instances of the `WasteDataset` class for the train, validation, and test splits, passing the `dataset_path`, `split`, and `transform` parameters. This allows us to load the dataset images with the specified transformations for each split.

Finally, we create data loaders for each dataset using `DataLoader`:
- `train_dataloader`: Loads the training data in batches of size `batch_size` and shuffles the samples.
- `val_dataloader`: Loads the validation data in batches of size `batch_size` without shuffling.
- `test_dataloader`: Loads the test data in batches of size `batch_size` without shuffling.

The data loaders provide an efficient way to iterate over the dataset during training and evaluation, handling batching and shuffling as specified.

In [7]:
from PIL import Image
from rembg import remove

def rotate_and_pad_dynamic(image, angle, background_color=(0, 0, 0), min_size=400):
    # Step 1: 旋轉並展開
    # image = remove(image)
    rotated = image.rotate(angle, expand=True, fillcolor=background_color)

    # Step 2: 動態計算正方形背景尺寸（取最大邊長，與 min_size 比較）
    side = max(min_size, rotated.width, rotated.height)
    
    # Step 3: 建立正方形背景並貼上圖片
    background = Image.new("RGB", (side, side), background_color)
    paste_x = (side - rotated.width) // 2
    paste_y = (side - rotated.height) // 2
    background.paste(rotated, (paste_x, paste_y))

    return background

In [8]:
# Create the datasets and data loaders
transform = transforms.Compose([
    transforms.Lambda(lambda img: rotate_and_pad_dynamic(img, angle=random.randint(-180, 180))),
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])
train_dataset = WasteDataset(dataset_path, split='train', transform=transform)
val_dataset = WasteDataset(dataset_path, split='val', transform=transform)
test_dataset = WasteDataset(dataset_path, split='test', transform=transform)
train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_dataloader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
test_dataloader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

In [19]:
# import numpy as np
# import random
# mean = np.array([0.485, 0.456, 0.406])
# std = np.array([0.229, 0.224, 0.225])

# def unnormalize(img):
#     """反轉標準化，將圖片還原到可視化範圍"""
#     img = img.numpy().transpose((1, 2, 0))  # 將張量轉換為 NumPy 格式
#     img = std * img + mean  # 反轉標準化
#     img = np.clip(img, 0, 1)  # 限制範圍在 [0, 1]
#     return img

# imagelist = os.listdir(dataset_path)

# train_image_dir = os.path.join(dataset_path,imagelist[random.randint(0, len(imagelist)-1)])
# imagelist = os.listdir(train_image_dir)
# random.shuffle(imagelist)
# # 顯示前 10 張轉換後圖片
# fig, axes = plt.subplots(2, 5, figsize=(15, 6))
# axes = axes.flatten()
# count = 0

# for filename in imagelist:
#     if filename.lower().endswith(('.jpg', '.png', '.jpeg')):
#         image_path = os.path.join(train_image_dir, filename)
#         image = Image.open(image_path).convert('RGB')
#         transformed_tensor = transform(image)
#         img_np = unnormalize(transformed_tensor)

#         axes[count].imshow(img_np)
#         axes[count].set_title(filename.split('.')[0])
#         axes[count].axis('off')
#         count += 1
#         if count >= 10:
#             break

# plt.tight_layout()
# plt.show()

# Model Initialization

In [20]:
from torchvision import models, datasets, transforms
import torchvision
import os
os.environ['CUDA_LAUNCH_BLOCKING'] = '1'
# Create the model, loss function, and optimizer
num_classes = len(train_dataset.classes)

model = models.resnet18(weights=torchvision.models.ResNet18_Weights.DEFAULT)


# 凍結所有參數（如要 fine-tune 可改為 False）
for param in model.parameters():
    param.requires_grad = False
    
model.fc = nn.Linear(model.fc.in_features, num_classes)
model = model.to('cuda')
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

# Training


In [21]:
# Lists to store the training and validation losses
train_losses = []
val_losses = []

# Training loop
for epoch in range(num_epochs):
    # Training
    model.train()
    train_loss = 0.0
    for images, labels in train_dataloader:
        images = images.to('cuda')
        labels = labels.to('cuda')
        
        outputs = model(images)
        loss = criterion(outputs, labels)
        
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        train_loss += loss.item() * images.size(0)
    
    train_loss /= len(train_dataset)
    train_losses.append(train_loss)
    
    # Validation
    model.eval()
    val_loss = 0.0
    with torch.no_grad():
        for images, labels in val_dataloader:
            images = images.to('cuda')
            labels = labels.to('cuda')
            
            outputs = model(images)
            loss = criterion(outputs, labels)
            
            val_loss += loss.item() * images.size(0)
    
    val_loss /= len(val_dataset)
    val_losses.append(val_loss)
    
    print(f"Epoch [{epoch+1}/{num_epochs}], Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}")

print("Training completed!")

Epoch [1/5], Train Loss: 1.2947, Val Loss: 0.9846
Epoch [2/5], Train Loss: 0.8602, Val Loss: 0.7261
Epoch [3/5], Train Loss: 0.6886, Val Loss: 0.6058
Epoch [4/5], Train Loss: 0.6053, Val Loss: 0.5300
Epoch [5/5], Train Loss: 0.5317, Val Loss: 0.4935
Training completed!


In [22]:
torch.save(model, 'waste_classification_model_resnet.pth')
print("Model saved successfully!")

Model saved successfully!


# 載入並測試模型

In [23]:
# # Perform sample inferences on random test images with different labels
# model.eval()
# with torch.no_grad():
#     indices = list(range(len(test_dataset)))
#     random.shuffle(indices)
    
#     selected_images = []
#     selected_labels = []
#     selected_predicted = []
    
#     for index in indices:
#         image, label = test_dataset[index]
#         image = image.unsqueeze(0).to('cuda')
        
#         output = model(image)
#         _, predicted = torch.max(output, 1)
        
#         if label not in selected_labels:
#             selected_images.append(image)
#             selected_labels.append(label)
#             selected_predicted.append(predicted.item())
        
#         if len(selected_labels) == 9:
#             break
    
#     fig, axes = plt.subplots(2, 4, figsize=(12, 12))
#     axes = axes.flatten()
    
#     for i in range(8):
#         axes[i].imshow(selected_images[i].squeeze().cpu().permute(1, 2, 0))
#         axes[i].set_title(f"True: {train_dataset.classes[selected_labels[i]]}\nPredicted: {train_dataset.classes[selected_predicted[i]]}")
#         axes[i].axis('off')
    
#     plt.tight_layout()
#     plt.show()

In [24]:
device = 'cuda'
testmodel = torch.load('waste_classification_model_resnet.pth') 
testmodel.eval()  # 切換到評估模式
correct = 0
total = 0

with torch.no_grad():  # 評估時不計算梯度
    for images, labels in test_dataloader:
        images = images.to(device)
        labels = labels.to(device)

        outputs = testmodel(images)             # 前向推論，shape=(batch_size, num_classes)
        _, preds = torch.max(outputs, 1)    # 取每列最大值的索引作為預測

        correct += (preds == labels).sum().item()
        total += labels.size(0)

accuracy = correct / total
print(f"Test Accuracy: {accuracy * 100:.2f}%")


  testmodel = torch.load('waste_classification_model_resnet.pth')


Test Accuracy: 84.09%


In [2]:
import os
from torchvision.transforms.functional import to_pil_image
from torchvision import transforms

# 建立反標準化的 transform（ImageNet mean/std）
unnormalize = transforms.Normalize(
    mean=[-m/s for m, s in zip([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])],
    std=[1/s for s in [0.229, 0.224, 0.225]]
)

# 儲存處理過的圖片
def save_tensor_as_image(tensor, filename):
    img = unnormalize(tensor.squeeze(0)).clamp(0, 1)  # 去 batch 維 + 限定 0~1
    img_pil = to_pil_image(img)
    os.makedirs('rotated_image', exist_ok=True)
    img_pil.save(os.path.join('rotated_image', filename))


In [None]:
# train_dataset.classes

['aluminum_soda_cans',
 'cardboard_boxes',
 'paper_cups',
 'plastic_water_bottles']

In [9]:
import os
from PIL import Image
import torch
from torchvision import transforms
from rembg import remove

# 設定測試圖片資料夾
test_image_dir = 'custom_test/'

# 模型輸入所需的轉換（依照你訓練時使用的 transform）
transform = transforms.Compose([
    transforms.Lambda(lambda img: rotate_and_pad_dynamic(img, angle=random.randint(-180, 180))),
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])

# 載入你訓練好的模型
testmodel = torch.load('waste_classification_model_resnet.pth')
testmodel.eval()
testmodel.to('cuda')
# 類別標籤（順序需與訓練時相符）
class_names = train_dataset.classes

# 預測每一張圖片
for filename in os.listdir(test_image_dir):
    if filename.lower().endswith(('.jpg', '.png', '.jpeg')):
        image_path = os.path.join(test_image_dir, filename)
        image = Image.open(image_path).convert('RGB')
        image = remove(image)
        
        input_tensor = transform(image).unsqueeze(0)  # 增加 batch dimension
        input_tensor = input_tensor.to('cuda')
        save_tensor_as_image(input_tensor, filename)  # 儲存處理過的圖片
        
        with torch.no_grad():
            output = testmodel(input_tensor)
            probabilities = torch.nn.functional.softmax(output, dim=1)
            print(probabilities)
            predicted_class = class_names[output.argmax(1).item()]
        
        if predicted_class.startswith('aluminum_') or predicted_class.startswith('steel'):
            print(f"{filename} → 預測為: 鐵鋁罐")
        elif predicted_class.startswith('cardboard_'):
            print(f"{filename} → 預測為: 紙箱/紙板")
        elif predicted_class.startswith('plastic'):
            print(f"{filename} → 預測為: 寶特瓶")
        elif predicted_class.startswith('paper_'):
            print(f"{filename} → 預測為: 紙杯")

  testmodel = torch.load('waste_classification_model_resnet.pth')


tensor([[0.5526, 0.0223, 0.3653, 0.0599]], device='cuda:0')
image.png → 預測為: 鐵鋁罐
tensor([[0.3897, 0.1047, 0.3555, 0.1501]], device='cuda:0')
image1.png → 預測為: 鐵鋁罐
tensor([[0.0061, 0.8758, 0.1165, 0.0016]], device='cuda:0')
image2.png → 預測為: 紙箱/紙板
tensor([[0.1938, 0.1065, 0.6203, 0.0794]], device='cuda:0')
image3.png → 預測為: 紙杯
tensor([[0.0218, 0.0015, 0.0282, 0.9485]], device='cuda:0')
image4.png → 預測為: 寶特瓶
