In [1]:
# Creating a dataset class for coco format
import os
import torch
import torch.utils.data
import torchvision
from PIL import Image
from pycocotools.coco import COCO

class COCO23Dataset(torch.utils.data.Dataset):
    def __init__(self, root, annotation, transforms=None):
        self.root = root
        self.transforms = transforms
        self.coco = COCO(annotation)
        self.ids = list(sorted(self.coco.imgs.keys()))

    def __getitem__(self, index):
        coco = self.coco
        img_id = self.ids[index]
        ann_ids = coco.getAnnIds(imgIds=img_id)
        coco_annotation = coco.loadAnns(ann_ids)
        path = coco.loadImgs(img_id)[0]['file_name']
        img = Image.open(os.path.join(self.root, path))

        num_objs = len(coco_annotation)

        # Bounding boxes and their labels
        boxes = []
        labels = []
        for annotation in coco_annotation:
            xmin = annotation['bbox'][0]
            ymin = annotation['bbox'][1]
            xmax = xmin + annotation['bbox'][2]
            ymax = ymin + annotation['bbox'][3]
            boxes.append([xmin, ymin, xmax, ymax])

            # Assign category_id as label
            labels.append(annotation['category_id'])

        boxes = torch.as_tensor(boxes, dtype=torch.float32)
        labels = torch.as_tensor(labels, dtype=torch.int64)
        img_id = torch.tensor([img_id])

        # Area and iscrowd fields if needed
        areas = [annotation['area'] for annotation in coco_annotation]
        areas = torch.as_tensor(areas, dtype=torch.float32)
        iscrowd = torch.zeros((num_objs,), dtype=torch.int64)

        # Construct the annotations
        annotations = {
            "boxes": boxes,
            "labels": labels,
            "image_id": img_id,
            "area": areas,
            "iscrowd": iscrowd
        }

        if self.transforms is not None:
            img = self.transforms(img)

        return img, annotations

    def __len__(self):
        return len(self.ids)


In [2]:
# In my case, just added ToTensor as a transform
def get_transform():
    custom_transforms = []
    custom_transforms.append(torchvision.transforms.ToTensor())
    return torchvision.transforms.Compose(custom_transforms)

In [3]:
# Select GPU if available with cuda else cpu
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
print(device)

cuda


In [4]:
# This line gives the python code access to the files on the drive. path:"/content/drive/MyDrive/BeeVisionCase_OmerOzer/

from google.colab import drive
drive.mount('/content/drive')


Mounted at /content/drive


In [5]:
from traitlets.traitlets import validate
# path to train data and coco file
# this path is coming from above line which give access to the drive folder. you can set your own path
train_data_dir = '/content/drive/MyDrive/BeeVisionCase_OmerOzer/training_random_images'
train_coco = '/content/drive/MyDrive/BeeVisionCase_OmerOzer/random_train_626_coco.json'

# path to validation data and coco file
validate_data_dir = '/content/drive/MyDrive/BeeVisionCase_OmerOzer/validation_random_images'
validate_coco = '/content/drive/MyDrive/BeeVisionCase_OmerOzer/validation_random_coco.json'

# Train  Dataset
my_dataset = COCO23Dataset(root=train_data_dir,
                          annotation=train_coco,
                          transforms=get_transform()
                          )
# Validate dataset
validate_dataset = COCO23Dataset(root=validate_data_dir,
                          annotation=validate_coco,
                          transforms=get_transform()
                          )
# collate_fn needs for batch
def collate_fn(batch):
    return tuple(zip(*batch))

# Train Batch size
train_batch_size = 5

# Validate Batch size
validate_batch_size = 5

# Train DataLoader
data_loader = torch.utils.data.DataLoader(my_dataset,
                                          batch_size=train_batch_size,
                                          shuffle=True,
                                          collate_fn=collate_fn)
# validation dataloader
val_data_loader = torch.utils.data.DataLoader(validate_dataset,
                                          batch_size=validate_batch_size,
                                          shuffle=True,
                                          collate_fn=collate_fn)

loading annotations into memory...
Done (t=0.60s)
creating index...
index created!
loading annotations into memory...
Done (t=0.63s)
creating index...
index created!


In [6]:
#CREATE THE MODEL
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor

def get_model_instance_segmentation(num_classes):
    # load an instance segmentation model pre-trained pre-trained on COCO
    model = torchvision.models.detection.fasterrcnn_resnet50_fpn(pretrained=False)
    # get number of input features for the classifier
    in_features = model.roi_heads.box_predictor.cls_score.in_features
    # replace the pre-trained head with a new one
    model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)

    return model


# 24 classes including background
num_classes = 24
model = get_model_instance_segmentation(num_classes)

#if you want to start from pretrained model you can load the parameters here (this line loads my trained model and it does not need training)
model.load_state_dict(torch.load('/content/drive/MyDrive/BeeVisionCase_OmerOzer/model_weights_14epoch_626_0172.pth'))

# move model to the right device
model.to(device)





Downloading: "https://download.pytorch.org/models/resnet50-0676ba61.pth" to /root/.cache/torch/hub/checkpoints/resnet50-0676ba61.pth
100%|██████████| 97.8M/97.8M [00:00<00:00, 119MB/s]


FasterRCNN(
  (transform): GeneralizedRCNNTransform(
      Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
      Resize(min_size=(800,), max_size=1333, mode='bilinear')
  )
  (backbone): BackboneWithFPN(
    (body): IntermediateLayerGetter(
      (conv1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
      (bn1): FrozenBatchNorm2d(64, eps=1e-05)
      (relu): ReLU(inplace=True)
      (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
      (layer1): Sequential(
        (0): Bottleneck(
          (conv1): Conv2d(64, 64, kernel_size=(1, 1), stride=(1, 1), bias=False)
          (bn1): FrozenBatchNorm2d(64, eps=1e-05)
          (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
          (bn2): FrozenBatchNorm2d(64, eps=1e-05)
          (conv3): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)
          (bn3): FrozenBatchNorm2d(256, eps=1e-05)
          (relu

In [None]:
# Train the model with desired parameters
# parameters
params = [p for p in model.parameters() if p.requires_grad]
optimizer = torch.optim.SGD(params, lr=0.005, momentum=0.9, weight_decay=0.0005)

len_dataloader = len(data_loader)
num_epochs = 1

for epoch in range(num_epochs):
    model.train()
    i = 0
    total_train_loss = 0
    for imgs, annotations in data_loader:
        i += 1
        imgs = list(img.to(device) for img in imgs)
        annotations = [{k: v.to(device) for k, v in t.items()} for t in annotations]
        loss_dict = model(imgs, annotations)
        losses = sum(loss for loss in loss_dict.values())
        total_train_loss += losses.item()

        optimizer.zero_grad()
        losses.backward()
        optimizer.step()

        avg_train_loss = total_train_loss / len(data_loader)
        print(f"Epoch {epoch+1}, Avg Training loss in epoch: {avg_train_loss}")
    # Validation phase
    model.train()
    total_val_loss = 0
    with torch.no_grad():
        for imgs, annotations in val_data_loader:
            imgs = list(img.to(device) for img in imgs)
            annotations = [{k: v.to(device) for k, v in t.items()} for t in annotations]
            loss_dict = model(imgs, annotations)
            losses = sum(loss for loss in loss_dict.values())
            total_val_loss += losses.item()

    avg_val_loss = total_val_loss / len(val_data_loader)
    print(f"Epoch {epoch+1}, Validation loss: {avg_val_loss}")

Epoch 1, Avg Training loss in epoch: 0.000921867610443206
Epoch 1, Avg Training loss in epoch: 0.0016772403485245174
Epoch 1, Avg Training loss in epoch: 0.002448730348121552
Epoch 1, Avg Training loss in epoch: 0.003959435851327957
Epoch 1, Avg Training loss in epoch: 0.004802066477991286
Epoch 1, Avg Training loss in epoch: 0.0056231215832725405
Epoch 1, Avg Training loss in epoch: 0.0068242209298270086
Epoch 1, Avg Training loss in epoch: 0.007558973535658821
Epoch 1, Avg Training loss in epoch: 0.008366331931144472
Epoch 1, Avg Training loss in epoch: 0.009065949905013282
Epoch 1, Avg Training loss in epoch: 0.009864818659566697
Epoch 1, Avg Training loss in epoch: 0.010399899371559657
Epoch 1, Avg Training loss in epoch: 0.011140652710483187
Epoch 1, Avg Training loss in epoch: 0.012286361247774154
Epoch 1, Avg Training loss in epoch: 0.01317708371650605
Epoch 1, Avg Training loss in epoch: 0.014192750411374229
Epoch 1, Avg Training loss in epoch: 0.015101232048537996
Epoch 1, Avg

In [None]:
# If you want to save the parameters after training
torch.save(model.state_dict(), 'model_weights_14epoch_626_0172.pth')


In [None]:
# Load the saved weights to test manually
model.load_state_dict(torch.load('model_weights_14epoch_626_0172.pth'))


<All keys matched successfully>

In [7]:
#A simple accuracy metric by considering how many bounding boxes predicted by the model match with the
#ground truth boxes (true positives), given a certain Intersection over Union (IoU) threshold.
def calculate_accuracy(model, data_loader, device, iou_threshold=0.5):
    model.eval()

    total_predictions = 0
    correct_predictions = 0

    with torch.no_grad():
        for imgs, annotations in data_loader:
            imgs = list(img.to(device) for img in imgs)
            model_predictions = model(imgs)

            for prediction, ground_truth in zip(model_predictions, annotations):
                pred_boxes = prediction['boxes'].cpu().numpy()
                gt_boxes = ground_truth['boxes'].cpu().numpy()

                for pred_box in pred_boxes:
                    for gt_box in gt_boxes:
                        iou = compute_iou(pred_box, gt_box)
                        if iou > iou_threshold:
                            correct_predictions += 1
                            break  # Only count the predicted box once
                total_predictions += len(pred_boxes)

    accuracy = correct_predictions / total_predictions if total_predictions > 0 else 0
    return accuracy

def compute_iou(boxA, boxB):
    # Determine the (x, y)-coordinates of the intersection rectangle
    xA = max(boxA[0], boxB[0])
    yA = max(boxA[1], boxB[1])
    xB = min(boxA[2], boxB[2])
    yB = min(boxA[3], boxB[3])

    # Compute the area of intersection rectangle
    inter_area = max(0, xB - xA + 1) * max(0, yB - yA + 1)

    # Compute the area of both the prediction and ground-truth rectangles
    boxA_area = (boxA[2] - boxA[0] + 1) * (boxA[3] - boxA[1] + 1)
    boxB_area = (boxB[2] - boxB[0] + 1) * (boxB[3] - boxB[1] + 1)

    # Compute the intersection over union
    iou = inter_area / float(boxA_area + boxB_area - inter_area)

    return iou

# Now call the function
accuracy = calculate_accuracy(model, val_data_loader, device)
print(f"Accuracy: {accuracy:.4f}")


Accuracy: 0.7640


In [8]:
# Jpeg input, Jpeg output with bbox's
import torchvision.transforms as T
from PIL import Image, ImageDraw, ImageFont
from IPython.display import display

categories = [
    { "supercategory": "type", "id": 1, "name": "usps-normal" },
    { "supercategory": "type", "id": 2, "name": "usps-sure" },
    { "supercategory": "type", "id": 3, "name": "usps-first" },
    { "supercategory": "type", "id": 4, "name": "usps-prio" },
    { "supercategory": "type", "id": 5, "name": "yellow" },
    { "supercategory": "type", "id": 6, "name": "id-below" },
    { "supercategory": "type", "id": 7, "name": "lex32" },
    { "supercategory": "type", "id": 8, "name": "dhl" },
    { "supercategory": "type", "id": 9, "name": "ls-small" },
    { "supercategory": "type", "id": 10, "name": "ls" },
    { "supercategory": "type", "id": 11, "name": "fedex-g" },
    { "supercategory": "type", "id": 12, "name": "small" },
    { "supercategory": "type", "id": 13, "name": "ups-ground" },
    { "supercategory": "type", "id": 14, "name": "fragile-warning" },
    { "supercategory": "type", "id": 15, "name": "usps-boxes" },
    { "supercategory": "type", "id": 16, "name": "usps-four-qr" },
    { "supercategory": "type", "id": 17, "name": "fedex-s" },
    { "supercategory": "type", "id": 18, "name": "unknown-f" },
    { "supercategory": "type", "id": 19, "name": "usps-parcel" },
    { "supercategory": "type", "id": 20, "name": "usps-media" },
    { "supercategory": "type", "id": 21, "name": "fedex-h" },
    { "supercategory": "type", "id": 22, "name": "fedex-e" },
    { "supercategory": "type", "id": 23, "name": "usps-3rdparty" }
]

id_to_name = {category["id"]: category["name"] for category in categories}



model.eval()

def predict_image(image_path, model, device, detection_threshold):
    # Load Image
    image = Image.open(image_path).convert("RGB")
    transform = T.Compose([T.ToTensor()])
    image = transform(image)

    # Push to GPU & Add batch dimension
    image = image.to(device).unsqueeze(0)

    # Get predictions
    with torch.no_grad():
        prediction = model(image)

    # Filter out predictions below a certain threshold
    boxes = prediction[0]['boxes'][prediction[0]['scores'] > detection_threshold].cpu().numpy()
    scores = prediction[0]['scores'][prediction[0]['scores'] > detection_threshold].cpu().numpy()
    labels = prediction[0]['labels'][prediction[0]['scores'] > detection_threshold].cpu().numpy()

    return boxes, scores, labels, image.squeeze(0).permute(1, 2, 0).cpu().numpy()

def draw_boxes(boxes, scores, labels, image_np, id_to_name, color='red'):
    image = Image.fromarray((image_np * 255).astype('uint8'))
    draw = ImageDraw.Draw(image)
    font = ImageFont.truetype("/content/drive/MyDrive/BeeVisionCase_OmerOzer/arial.ttf", 50)

    for box, score, label in zip(boxes, scores, labels):
        class_name = id_to_name[label]
        draw.rectangle([(box[0], box[1]), (box[2], box[3])], outline=color, width=6)
        draw.text((box[0] +25, box[1] - 55), text=f"{class_name} {score:.2f}", fill=color, font=font)

    return image



# Test with example images from validation dataset
#image_path = "/content/drive/MyDrive/BeeVisionCase_OmerOzer/21577.jpeg"
image_path = "/content/drive/MyDrive/BeeVisionCase_OmerOzer/21568.jpeg"

boxes, scores, labels, img_np = predict_image(image_path, model, device, 0.4)
image_with_boxes = draw_boxes(boxes, scores, labels, img_np, id_to_name)
display(image_with_boxes)



Output hidden; open in https://colab.research.google.com to view.