# Garbage Classification

In [None]:
import os
import wandb
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from PIL import Image
from scipy.signal import savgol_filter

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset
from torch.utils.data import DataLoader
import torch.optim as optim

import torchvision
from torchvision import transforms, datasets, models

from transformers import BertTokenizer
from transformers import BertModel

device = "cuda" if torch.cuda.is_available() else "cpu"
print(device)

### Define Data Directories

In [2]:
'''trainset_dir = 'data/enel645_2024f/garbage_data/CVPR_2024_dataset_Train'
valset_dir = 'data/enel645_2024f/garbage_data/CVPR_2024_dataset_Val'
testset_dir = 'data/enel645_2024f/garbage_data/CVPR_2024_dataset_Test'
'''

'''trainset_dir = 'C:/Users/Shaakira Gadiwan/Documents/enel645/Garbage-Classification/data/garbage_data/CVPR_2024_dataset_Train'
valset_dir = 'C:/Users/Shaakira Gadiwan/Documents/enel645/Garbage-Classification/data/garbage_data/CVPR_2024_dataset_Val'
testset_dir = 'C:/Users/Shaakira Gadiwan/Documents/enel645/Garbage-Classification/data/garbage_data/CVPR_2024_dataset_Test'
'''

### Define Garbage Dataset Class

In [3]:
class GarbageDataset(Dataset):
    def __init__(self, dataframe, image_transform=None, max_len=32, class_to_idx=None):
        self.dataframe = dataframe
        self.image_transform = image_transform
        self.tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
        self.max_len = max_len
        self.class_to_idx = class_to_idx  # Pass the class mapping

    def __len__(self):
        return len(self.dataframe)

    def __getitem__(self, idx):
        # Get image path, text description, and label from the dataframe
        img_path = self.dataframe.iloc[idx]['image_path']
        text_desc = self.dataframe.iloc[idx]['text_description']
        label = self.dataframe.iloc[idx]['label']  

        # Load and preprocess the image
        image = Image.open(img_path).convert("RGB")
        if self.image_transform:
            image = self.image_transform(image)

        # Tokenize the text description
        text_inputs = self.tokenizer(
            text_desc, 
            padding='max_length', 
            truncation=True, 
            max_length=self.max_len, 
            return_tensors="pt"
        )

        # Convert string label to numeric label using the class mapping
        numeric_label = self.class_to_idx[label]

        # Return the image, text input, and numeric label
        return {
            'image': image,
            'input_ids': text_inputs['input_ids'].squeeze(0),  
            'attention_mask': text_inputs['attention_mask'].squeeze(0),  
            'label': torch.tensor(numeric_label, dtype=torch.long)  
        }

### Define Multimodal Model Class

In [4]:
class CombinedClassifier(nn.Module):
    def __init__(self, image_model, text_model, combined_feature_size, num_classes):
        super(CombinedClassifier, self).__init__()
        self.image_model = image_model  # Pre-trained ResNet model
        self.text_model = text_model    # Pre-trained BERT model
        self.fc = nn.Linear(combined_feature_size, num_classes)  # Final classifier

    def forward(self, image, text_input_ids, text_attention_mask):
        # Get image features from the ResNet model
        image_features = self.image_model(image)
        
        # Get text features from the BERT model
        text_features = self.text_model(input_ids=text_input_ids, attention_mask=text_attention_mask).pooler_output
        
        # Combine image and text features
        combined_features = torch.cat((image_features, text_features), dim=1)
        
        # Pass combined features through the classifier
        output = self.fc(combined_features)
        return output

### Define Methods

In [5]:
# Extract images, labels, and text descriptions from a given folder
def extract_data_from_folders(base_dir):
    data = []

    # Traverse through each subfolder
    for label_folder in os.listdir(base_dir):
        folder_path = os.path.join(base_dir, label_folder)

        # Check if it's a directory
        if os.path.isdir(folder_path):
            # Loop through each image file in the subfolder
            for filename in os.listdir(folder_path):
                if filename.endswith(('.jpg', '.png', '.jpeg')):  # Filter image files
                    image_path = os.path.join(folder_path, filename)

                    # Extract text from filename (remove file extension)
                    text_description = os.path.splitext(filename)[0]

                    # Append image path, text, and label to the data list
                    data.append({
                        'image_path': image_path,
                        'text_description': text_description,
                        'label': label_folder  # The subfolder name represents the label (bin)
                    })

    # Convert to DataFrame for easy manipulation
    return pd.DataFrame(data)

In [6]:
def get_dataset_stats(dataloader):
    mean = 0.
    std = 0.
    nb_samples = 0.

    for batch in dataloader:
        # Get the images from the batch
        images = batch['image']  # Accessing the image tensor
        batch_samples = images.size(0)  # Number of samples in the batch
        images = images.view(batch_samples, images.size(1), -1)  # Reshape to (batch_size, channels, height * width)
        
        mean += images.mean(2).sum(0)  # Accumulate mean for each channel
        std += images.std(2).sum(0)    # Accumulate std for each channel
        nb_samples += batch_samples      # Total number of samples

    mean /= nb_samples  # Calculate overall mean
    std /= nb_samples    # Calculate overall std
    return mean, std

In [7]:
# Display an image
def imshow(img,stats):
    img = img *stats[1] + stats[0]     # unnormalize
    npimg = img.numpy() # convert the tensor back to numpy
    plt.imshow(np.transpose(npimg, (1, 2, 0)))
    plt.show()

In [None]:
run = wandb.init(project='garbage-collection')

In [8]:
# Define classes and map them to indices
class_names = ['Green', 'Blue', 'Black', 'TTR']  
class_to_idx = {class_name: idx for idx, class_name in enumerate(class_names)}
idx_to_class = {idx: class_name for idx, class_name in enumerate(class_names)}

In [None]:
# Extract the data
trainset_df = extract_data_from_folders(trainset_dir)
valset_df = extract_data_from_folders(valset_dir)
testset_df = extract_data_from_folders(testset_dir)

# Print the first few rows of the DataFrames
print(trainset_df.tail())
print(valset_df.tail())
print(testset_df.tail())

In [None]:
# Create the training data set and retrieve it's statistics
batch_size = 256

transform = transforms.Compose([
    transforms.ToTensor()
    ])

trainset_temp = GarbageDataset(trainset_df, image_transform=transform, class_to_idx=class_to_idx)
trainloader = DataLoader(trainset_temp, batch_size=batch_size, shuffle=True)

stats = get_dataset_stats(trainloader)

print('Train Stats:', stats)

In [None]:
# Transform and normalize the training, validation, and testing data

transform = transforms.Compose([
    transforms.Resize((224,224)),
    transforms.ToTensor(),
    transforms.Normalize(stats[0],stats[1])])

trainset = GarbageDataset(trainset_df, image_transform=transform, class_to_idx=class_to_idx)
valset = GarbageDataset(valset_df, image_transform=transform, class_to_idx=class_to_idx)
testset = GarbageDataset(testset_df, image_transform=transform, class_to_idx=class_to_idx)

# Example: Access a sample from the training dataset
sample = trainset[0]
print(sample['image'].shape)  # (3, 224, 224)
print(sample['input_ids'].shape)  # (32,) - max_len of 32 tokens
print(sample['label'])  # The label as a tensor

trainloader = DataLoader(trainset, batch_size=batch_size, shuffle=True, num_workers=2)
valloader = DataLoader(valset, batch_size=batch_size, shuffle=True, num_workers=2)
testloader = DataLoader(testset, batch_size=batch_size, shuffle=True, num_workers=2)

In [None]:
dataiter = iter(trainloader)
batch = next(dataiter)

# Extract images and labels from the batch dictionary
images = batch['image'].to(device) 
labels = batch['label'].to(device)  

# show images
imshow(torchvision.utils.make_grid(images[:8]), (stats[0][:, None, None], stats[1][:, None, None]))
# print labels
print(' '.join(f'{labels[j]:5s}' for j in range(8)))

In [None]:
# Load a pre-trained models for image and text feature extraction
image_model = models.resnet18(pretrained=True)
text_model = BertModel.from_pretrained('bert-base-uncased')

# Freeze the feature-extracting layers if desired
for name, param in image_model.named_parameters():
    if 'layer4' in name or 'fc' in name:  # Unfreeze layer4 and the final fully connected layer, layer 4 is the last convolutional block and should be fine tuned...
        param.requires_grad = True
    else:
        param.requires_grad = False  # Freeze all other layers

for param in text_model.parameters():
    param.requires_grad = True # not sure which layers to freeze

# Replace the final fully connected layer to match the number of required outputs for our problem
num_image_features = image_model.fc.in_features
image_model.fc = nn.Linear(num_image_features, len(class_names))

# Create the combined image and text model
combined_feature_size = num_image_features + 768  # 768 is the BERT hidden size
combined_model = CombinedClassifier(image_model, text_model, combined_feature_size, len(class_names)).to(device)

image_model = image_model.to(device)
print(image_model)

In [13]:
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(combined_model.fc.parameters(), lr=0.001)

In [None]:
num_epochs = 10

wandb.config = {"epochs": num_epochs, "batch_size":batch_size, "learning_rate":0.001}

for epoch in range(wandb.config['epochs']):
    print(f'Epoch {epoch+1}/{num_epochs}')
    print('-' * 10)
    
    # Training phase
    combined_model.train()
    running_loss = 0.0
    running_corrects = 0
    
    for batch in trainloader:  # trainloader contains your dataset
        images = batch['image'].to(device)
        text_input_ids = batch['input_ids'].to(device)
        text_attention_mask = batch['attention_mask'].to(device)
        labels = batch['label'].to(device)
        
        optimizer.zero_grad()
        
        # Forward pass
        outputs = combined_model(images, text_input_ids, text_attention_mask)
        _, preds = torch.max(outputs, 1)
        loss = criterion(outputs, labels)
        
        # computing backward pass and optimization
        loss.backward()
        optimizer.step()
        
        # computing the statistics
        running_loss += loss.item() * images.size(0)
        running_corrects += torch.sum(preds == labels.data)
        
    epoch_loss = running_loss / len(trainset)
    epoch_acc = running_corrects.double() / len(trainset)
    
    print(f'Training Loss: {epoch_loss:.4f} Acc: {epoch_acc:.4f}')
    wandb.log({"Training Loss": epoch_loss, "Training Accuracy": epoch_acc})
    
    # Validation phase
    combined_model.eval()
    val_running_loss = 0.0
    val_running_corrects = 0
    
    with torch.no_grad():
        for batch in valloader:
            images = batch['image'].to(device)
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            labels = batch['label'].to(device)
            
            # Forward pass
            outputs = combined_model(images, input_ids=input_ids, attention_mask=attention_mask)
            _, preds = torch.max(outputs, 1)
            loss = criterion(outputs, labels)
            
            # Statistics
            val_running_loss += loss.item() * images.size(0)
            val_running_corrects += torch.sum(preds == labels.data)
            
    val_loss = val_running_loss / len(valset)
    val_acc = val_running_corrects.double() / len(valset)
    
    print(f'Validation Loss: {val_loss:.4f} Acc: {val_acc:.4f}')
    wandb.log({"Validation Loss": val_loss, "Validation Accuracy": val_acc})

In [None]:
wandb.finish()