## Unspoiled - AI Model Training
#### Ben Cobb

Spring 2024

## THINGS TO DO FIRST!

#### Uncomment and run the ones you need (Shift+Enter is how I run individual cells)

In [4]:
# Do this if you don't already have YOLOv6 cloned on your system!
# !git clone https://github.com/meituan/YOLOv6.git

In [5]:
# When YOLOv6 is on your system, be sure to download data.zip from the GDrive link in our repo. 
# Replace the /YOLOv6/data folder with what's inside the zip!

In [6]:
# Do this if you haven't already installed Ultralytics!
# !pip install -U ultralytics

In [7]:
# Be sure to change the yamlPath variable to match your system's path setup

## Imports and Libraries

In [None]:
import cv2 as cv
import os
import time
import datetime
import requests
from PIL import Image
from ultralytics import YOLO

In [1]:
import torch
import torch.nn as nn
import torchvision
import torchvision.models.detection as detection
from torchvision.models.detection.rpn import AnchorGenerator
from torchvision.ops import RoIAlign
from torch.utils.data import Dataset
from torchvision.io import read_image

# Global Variables

In [None]:
# Change the start of this path to the location your YOLOv6 folder is in!
yamlPath = "/Users/molte/YOLOv6/data/dataset.yaml"

# Change this when we have a real model! (for now, you can ignore)
modelPath = "/Users/molte/OneDrive/Desktop/UAFS/~Spring 2024/Capstone/Unspoiled/AIModel.yaml"

deptNames = ["COLD", "PRODUCE", "PACKAGED"]
classes = ["MILK", "EGGCARTON", "CREAMER", "APPLE", "BANANA", "PEAR", "COUGHDROPS", "CHEEZIT", "SODA"]
'''
classIDs:
0 = "milk"
1 = "eggcarton"
2 = "creamer"
3 = "apple"
4 = "banana"
5 = "pear"
6 = "coughdrops"
7 = "cheezit"
8 = "soda"
'''

cmap = "gray"

# Methods

In [14]:
# Takes a photo
def doStuff():

    print("hello")

# Class

In [3]:
# Class for our Dataset
class UnspoiledData(Dataset):
    
    def __init__(self, filepath, folder, transform=None):
        self.filepath = filepath
        self.transform = transform
        self.folder = folder
        self.imagePath = filepath + "images/" + folder
        self.labelPath = filepath + "labels/" + folder

        # Lists all image files in the images directory
        self.image_files = [f for f in os.listdir(self.imagePath) if f.endswith('.png')]

    def __len__(self):
        return len(self.image_files)

    def __getitem__(self, idx):
        img_name = self.image_files[idx]
        img_path = os.path.join(self.imagePath, img_name)
        annotation_path = os.path.join(self.labelPath, img_name.replace('.png', '.txt'))

        # Reads image
        image = read_image(img_path)

        # Reads bounding box annotations from text file
        boxes = []
        with open(annotation_path, 'r') as f:
            lines = f.readlines()
            for line in lines:
                centerX, centerY, width, height = map(float, line.strip().split())
                
                # Converts centerX, centerY, width, height to x_min, y_min, x_max, y_max
                x_min = (centerX - width / 2) * image.shape[2]
                y_min = (centerY - height / 2) * image.shape[1]
                x_max = (centerX + width / 2) * image.shape[2]
                y_max = (centerY + height / 2) * image.shape[1]
                boxes.append([x_min, y_min, x_max, y_max])

        boxes = torch.tensor(boxes, dtype=torch.float32)

        if self.transform:
            image = self.transform(image)

        return image, boxes

In [4]:
# Defines the Object Detection and Classification Model
class ObjectDetectionClassifier(nn.Module):
    def __init__(self, num_classes):
        super(ObjectDetectionClassifier, self).__init__()

        # Defines the custom backbone network
        # We can modify this section to define a different backbone architecture
        self.backbone = nn.Sequential(
            nn.Conv2d(3, 64, kernel_size=3, stride=1, padding=1),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Conv2d(64, 128, kernel_size=3, stride=1, padding=1),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Conv2d(128, 256, kernel_size=3, stride=1, padding=1),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2)
        )

        # Defines the anchor generator for the Region Proposal Network (RPN)
        anchor_generator = AnchorGenerator(sizes=((32, 64, 128, 256),),
                                           aspect_ratios=((0.5, 1.0, 2.0),))

        # Defines the Region Proposal Network (RPN)
        self.rpn = nn.Conv2d(256, anchor_generator.num_anchors_per_location()[0] * 4, kernel_size=3, stride=1, padding=1)

        # Defines the ROI Pooling module (using RoIAlign)
        self.roi_pooler = RoIAlign(output_size=(7, 7), spatial_scale=1.0, sampling_ratio=2)

        # Defines the final classifier
        self.classifier = nn.Sequential(
            nn.Linear(256 * 7 * 7, 1024),
            nn.ReLU(inplace=True),
            nn.Linear(1024, num_classes)
        )

    def forward(self, images, targets=None):
        # Backbone feature extraction
        features = self.backbone(images)

        # Region Proposal Network (RPN)
        proposals = self.rpn(features)

        # ROI Pooling (RoIAlign)
        box_features = self.roi_pooler(features, [proposals])

        # Flattens features
        box_features = box_features.view(box_features.size(0), -1)

        # Classifier
        class_logits = self.classifier(box_features)

        if self.training:
            return class_logits
        else:
            return class_logits

In [5]:
class UnspoiledLoss(nn.Module):
    def __init__(self, weight_bbox=1.0, weight_cls=1.0):
        super(YourCustomLossFunction, self).__init__()
        self.weight_bbox = weight_bbox
        self.weight_cls = weight_cls
        self.bbox_loss_fn = nn.SmoothL1Loss(reduction='mean')
        self.cls_loss_fn = nn.CrossEntropyLoss()

    def forward(self, predicted_boxes, predicted_scores, target_boxes, target_labels):
        
        # Computes bounding box regression loss
        bbox_loss = self.bbox_loss_fn(predicted_boxes, target_boxes)

        # Computes classification loss
        cls_loss = self.cls_loss_fn(predicted_scores, target_labels)

        # Combines the losses (weighted sum)
        total_loss = (self.weight_bbox * bbox_loss) + (self.weight_cls * cls_loss)

        return total_loss

# Model Things

In [23]:
# Defines the number of classes for our dataset
num_classes = 9  # Adjust based on our number of classes

# Instantiates our custom object detection and classification model
model = ObjectDetectionClassifier(num_classes)

# Loads pre-trained weights if available (optional)
# Replace 'path_to_pretrained_weights.pth' with the path to your pretrained weights file
# For example:
# model.load_state_dict(torch.load('path_to_pretrained_weights.pth'))

# Sets the model to evaluation mode
model.eval()

ObjectDetectionClassifier(
  (backbone): Sequential(
    (0): Conv2d(3, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): ReLU(inplace=True)
    (2): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (3): Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (4): ReLU(inplace=True)
    (5): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (6): Conv2d(128, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (7): ReLU(inplace=True)
    (8): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  )
  (rpn): Conv2d(256, 48, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (roi_pooler): RoIAlign(output_size=(7, 7), spatial_scale=1.0, sampling_ratio=2, aligned=False)
  (classifier): Sequential(
    (0): Linear(in_features=12544, out_features=1024, bias=True)
    (1): ReLU(inplace=True)
    (2): Linear(in_features=1024, out_features=9, bias=True)
  )
)

In [21]:
# Example normalized bounding boxes (centerX, centerY, width, height)
# We can replace these with our actual normalized bounding boxes
boxes_normalized = torch.tensor([[0.5, 0.5, 0.4, 0.3], [0.3, 0.4, 0.2, 0.5]])  # Example: Two normalized bounding boxes

# Converts normalized bounding boxes to absolute coordinates (x_min, y_min, x_max, y_max)
# centerX = boxes_normalized[:, 0], centerY = boxes_normalized[:, 1]
# width = boxes_normalized[:, 2], height = boxes_normalized[:, 3]
xCenter = boxes_normalized[:, 0] * 320
yCenter = boxes_normalized[:, 1] * 320
width = boxes_normalized[:, 2] * 320
height = boxes_normalized[:, 3] * 320

xMin = xCenter - (width / 2)
yMin = yCenter - (height / 2)
xMax = xCenter + (width / 2)
yMax = yCenter + (height / 2)

# Combines into bounding boxes tensor [N, 4] (x_min, y_min, x_max, y_max)
boxes_abs = torch.stack([x_min, y_min, x_max, y_max], dim=1)

# Ensure that the boxes tensor has the correct shape [N, 4]
assert boxes_abs.dim() == 2 and boxes_abs.size(1) == 4, "Boxes tensor should have shape [N, 4]"

# Convert the bounding box tensor to a list of tensors (for RoIAlign)
boxes_list = [boxes_abs]

# Example input tensor (batch size = 1, 3 channels, height = 320, width = 320)
images = torch.randn(1, 3, 320, 320)  # Assuming 320x320 RGB image

# Initialize RoIAlign module
roi_align = RoIAlign(output_size=(7, 7), spatial_scale=1.0, sampling_ratio=2)

# Apply RoIAlign with the prepared boxes list on the input images
with torch.no_grad():
    roi_features = roi_align(images, boxes_list)

# Print the shape of the RoI-aligned features
print("RoI-Aligned Features Shape:", roi_features.shape)

RoI-Aligned Features Shape: torch.Size([2, 3, 7, 7])


In [22]:
from torch.utils.data import DataLoader

# train/test/val
folder = "train"
filepath = "/Users/molte/OneDrive/Desktop/UAFS/~Spring 2024/Capstone/Unspoiled/AIModelTraining/dataset/"

# Create instances of your custom dataset (train and validation sets)
trainData = UnspoiledData(filepath, "train", transform=None)
valData = UnspoiledData(filepath, "val", transform=None)


batchSize = 3
numWorkers = 2

# Create data loaders
trainLoader = DataLoader(trainData, batch_size=batchSize, shuffle=True, num_workers=numWorkers)
valLoader = DataLoader(valData, batch_size=batchSize, shuffle=False, num_workers=numWorkers)

print("data loaded")

data loaded


In [20]:
import torch.optim as optim

# Define loss function
criterion = UnspoiledLoss()

# Define optimizer
optimizer = optim.SGD(model.parameters(), lr=learning_rate, momentum=0.9, weight_decay=1e-4)


NameError: name 'YourCustomLossFunction' is not defined

In [None]:
num_epochs = 2

# Move model to device (GPU if available)
print("before device")
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print("before model.to")
model.to(device)
print("before loop")

# Training loop
for epoch in range(num_epochs):
    print("train started")
    model.train()
    running_loss = 0.0
    print("train done")
    for images, targets in trainLoader:
        images, targets = images.to(device), targets.to(device)

        # Forward pass
        outputs = model(images, targets)

        # Compute loss
        loss = criterion(outputs, targets)
        print("Loss:", loss)
        
        # Backpropagation and optimization
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        running_loss += loss.item()

    # Print training loss for each epoch
    print(f'Epoch [{epoch + 1}/{num_epochs}], Loss: {running_loss / len(train_loader):.4f}')

    # Validation (optional)
    model.eval()
    # Perform validation if needed

# Save trained model weights
torch.save(model.state_dict(), 'path_to_save_model.pth')


In [39]:
for result in results:
    boxes = result.boxes
    masks = result.masks
    keypoints = result.keypoints
    probs = result.probs
    
    imgArray = result.plot()  # plot a BGR numpy array of predictions
    img = Image.fromarray(imgArray[..., ::-1])  # RGB PIL image
    img.show()  # show image
    img.save('./images_OLD/results.jpg')  # save image