# 사전 설정

## 라이브러리 설치

In [1]:
import os
import sys
import math
import glob
import random
import shutil
import json

import numpy as np
import pandas as pd
import cv2
import matplotlib.pyplot as plt
from PIL import Image, ImageEnhance, ImageFilter

from tqdm import tqdm
from sklearn.metrics import average_precision_score, classification_report

import torch
import torch.nn as nn
import torch.optim as optim
import torch.optim.lr_scheduler as lr_scheduler
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader, random_split
from torch.utils.data.sampler import WeightedRandomSampler

import torchvision
import torchvision.models as models
import torchvision.transforms.functional as F2
import torchvision.transforms.v2 as v2
from torchvision import transforms
from torchvision.models.detection import fasterrcnn_resnet50_fpn

## 구글 드라이브 연결

In [2]:
from google.colab import drive
# 드라이브 마운트
drive.mount('/content/drive')
g_path  = "/content/drive/MyDrive/data"
os.makedirs(g_path, exist_ok=True)

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


## GPU 설정

In [3]:
# device 설정
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

cpu


# 데이터 불러오기 및 탐색

In [None]:
# Install the Kaggle API
# !pip install kaggle

In [None]:
# Download the competition data
# You will need to upload your kaggle.json API token for this step to work.
# See https://github.com/Kaggle/kaggle-api for details.
# !kaggle competitions download -c ai03-level1-project

In [None]:
# 관리자 권한 디렉토리 만들기
# !sudo mkdir -p /root/.config/kaggle

In [None]:
# json 파일 만든 디렉토리에 복사
# !cp "/content/drive/MyDrive/Sprint Project/kaggle.json" /root/.config/kaggle/

In [None]:
# 모드 변경 (Read/Write)
# !chmod 600 /root/.config/kaggle/kaggle.json

In [4]:
# 데이터 경로 설정
img_path = "/content/drive/MyDrive/data/ai03-level1-project.zip (Unzipped Files)/train_images"
json_path = "/content/drive/MyDrive/data/ai03-level1-project.zip (Unzipped Files)/train_annotations"

print(f"Image path: {img_path}")
print(f"Annotation path: {json_path}")

Image path: /content/drive/MyDrive/data/ai03-level1-project.zip (Unzipped Files)/train_images
Annotation path: /content/drive/MyDrive/data/ai03-level1-project.zip (Unzipped Files)/train_annotations


In [None]:
# Get the list of image files
#image_files = [os.path.join(root, f) for root, _, files in os.walk(img_path) for f in files if f.lower().endswith(('.png', '.jpg', '.jpeg'))]

#if not image_files:
#    print("No image files found in the directory.")
#else:
    # Load the first image to get the reference size
#    first_img_path = image_files[0]
#    first_img = cv2.imread(first_img_path)

#    if first_img is None:
#        print(f"Could not read the first image: {first_img_path}")
#    else:
#        first_h, first_w, _ = first_img.shape
#        first_size = (first_w, first_h)
#        print(f"Size of the first image ({os.path.basename(first_img_path)}): {first_size} (Width, Height)")

#        all_same_size = True
        # Check the size of remaining images
#        for img_file in image_files[1:]:
#            img = cv2.imread(img_file)
#           if img is not None:
#                h, w, _ = img.shape
#                current_size = (w, h)
#                if current_size != first_size:
#                    print(f"Image {os.path.basename(img_file)} has a different size: {current_size}")
#                    all_same_size = False
#            else:
#                print(f"Could not read image: {os.path.basename(img_file)}")

#        if all_same_size:
#            print("\nAll images checked have the same size as the first image.")

#print("\nSimple size check complete.")

In [None]:
# 데이터 수량 확인
#def count_files_in_directory(directory):
#   if not os.path.exists(directory):
#       return 0
#   return len([name for name in os.listdir(directory) if os.path.isfile(os.path.join(directory, name))])

#num_image_files = count_files_in_directory(img_path)
#num_annotation_files = count_files_in_directory(json_path)

#print(f"Number of image files in '{img_path}': {num_image_files}")
#print(f"Number of annotation files in '{json_path}': {num_annotation_files}")

### 데이터 폴더 확인 후 train_anotation 안에 하위 폴더들이 있고 그 안에 파일들 있는 것 확인.

In [None]:
#def count_files_recursive(directory):
#   if not os.path.exists(directory):
#       return 0
#       count = 0
#       for root, _, files in os.walk(directory):
#        count += len(files)
#       return count

#num_annotation_files_recursive = count_files_recursive(json_path)

#print(f"Total number of annotation files (including subdirectories) in '{json_path}': {num_annotation_files_recursive}")

In [5]:
# Count image files (non-recursive)
img_path = "/content/drive/MyDrive/data/ai03-level1-project.zip (Unzipped Files)/train_images"
num_image_files = sum([len(files) for r, d, files in os.walk(img_path)])
print(f"Number of image files in '{img_path}': {num_image_files}")

# Count annotation files (recursive)
json_path = "/content/drive/MyDrive/data/ai03-level1-project.zip (Unzipped Files)/train_annotations"
num_annotation_files_recursive = sum([len(files) for r, d, files in os.walk(json_path)])
print(f"Total number of annotation files (including subdirectories) in '{json_path}': {num_annotation_files_recursive}")

Number of image files in '/content/drive/MyDrive/data/ai03-level1-project.zip (Unzipped Files)/train_images': 1489
Total number of annotation files (including subdirectories) in '/content/drive/MyDrive/data/ai03-level1-project.zip (Unzipped Files)/train_annotations': 4526


In [6]:
# Pair image files and annotation file
def get_file_paths(img_dir, json_dir, ids_to_include=None):
    """
    Pairs image files with their corresponding annotation files.

    Args:
        img_dir (str): Directory containing image files.
        json_dir (str): Directory containing annotation files (can be recursive).
        ids_to_include (list, optional): A list of filenames (without extensions)
                                          to include in the pairing. If None, all
                                          matching files are paired.

    Returns:
        list: A list of tuples, where each tuple is (image_path, annotation_path).
    """
    image_files = [os.path.join(root, f) for root, _, files in os.walk(img_dir) for f in files if f.lower().endswith(('.png', '.jpg', '.jpeg'))]
    annotation_files = [os.path.join(root, f) for root, _, files in os.walk(json_dir) for f in files if f.lower().endswith('.json')]

    # Create a dictionary mapping image filenames (without extension) to their full paths
    image_dict = {os.path.splitext(os.path.basename(img))[0]: img for img in image_files}

    # Create a list of pairs of image and annotation file paths, matching by filename
    paired_files = []
    for annotation_path in annotation_files:
        annotation_filename_without_ext = os.path.splitext(os.path.basename(annotation_path))[0]

        # Check if the filename is in the image dictionary and if it should be included based on ids_to_include
        if annotation_filename_without_ext in image_dict and \
           (ids_to_include is None or annotation_filename_without_ext in ids_to_include):
            image_path = image_dict[annotation_filename_without_ext]
            paired_files.append((image_path, annotation_path))

    return paired_files

# Get the paired image and annotation files (using all files by default)
paired_files = get_file_paths(img_path, json_path)

print(f"Found {len(paired_files)} paired image and annotation files.")

Found 4526 paired image and annotation files.


In [7]:
import cv2
import json
import matplotlib.pyplot as plt
import os

# Assuming 'paired_files' is already defined and contains (image_path, annotation_path) tuples
# If not, you'll need to run the cell that defines 'paired_files' first.
if 'paired_files' not in locals() or not paired_files:
    print("Error: 'paired_files' not found or is empty. Please run the cell that loads and pairs image and annotation files.")
else:
    # Select a sample image and annotation file (e.g., the first one in the list)
    # Iterate through the first 5 paired files or fewer if less than 5 exist
    num_samples_to_show = min(5, len(paired_files))

    for i in range(num_samples_to_show):
        sample_image_path, sample_annotation_path = paired_files[i]

        # Load the image
        image = cv2.imread(sample_image_path)
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) # Convert to RGB for matplotlib

        # Load the annotation file
        with open(sample_annotation_path, 'r') as f:
            annotation_data = json.load(f)

        # Extract bounding boxes from the annotation data
        # Assuming the annotation format is similar to the sample shown previously
        # where 'annotations' is a list of dicts and each dict has a 'bbox' key [x, y, w, h]
        bounding_boxes = []
        if 'annotations' in annotation_data:
            for annotation in annotation_data['annotations']:
                if 'bbox' in annotation:
                    x, y, w, h = annotation['bbox']
                    bounding_boxes.append((x, y, w, h))

        # Draw bounding boxes on the image
        image_with_boxes = image.copy()
        for (x, y, w, h) in bounding_boxes:
            # Draw a rectangle (bounding box) on the image
            # cv2.rectangle(image, (x, y), (x+w, y+h), color, thickness)
            # Color is in BGR format for OpenCV, so use (255, 0, 0) for blue.
            # If using matplotlib to display, keep it in RGB. We are using matplotlib, so color can be (0, 0, 255) for blue
            cv2.rectangle(image_with_boxes, (x, y), (x+w, y+h), (0, 0, 255), 2) # Blue color, thickness 2

        # Display the image with bounding boxes
        plt.figure(figsize=(10, 10))
        plt.imshow(image_with_boxes)
        plt.title(f"Sample {i+1}: Image with Bounding Boxes: {os.path.basename(sample_image_path)}")
        plt.axis('off') # Hide axes
        plt.show()

        print(f"\nSample {i+1}: Loaded annotations for {os.path.basename(sample_image_path)}:")
        display(annotation_data)

Output hidden; open in https://colab.research.google.com to view.

Here's a custom dataset class for loading your image and annotation data. This class will:

- Inherit from `torch.utils.data.Dataset`.
- Implement `__len__` to return the total number of data samples.
- Implement `__getitem__` to load an image and its corresponding annotations for a given index.
- Include basic image loading and annotation parsing based on the structure of your JSON files.
- Allow for optional data transformations.

In [50]:
from torch.utils.data import Dataset
import os
import json
import cv2
import torch
import numpy as np
import torchvision.transforms.functional as F2 # Import F2 for tensor conversion
from collections import defaultdict # Import defaultdict for COCO format
from tqdm import tqdm # Import tqdm for progress bar


class PillDetectionDataset(Dataset):
    def __init__(self, paired_files, transforms=None):
        """
        Args:
            paired_files (list): List of tuples, where each tuple is (image_path, annotation_path).
            transforms (callable, optional): Optional transform to be applied on a sample.
        """
        self.paired_files = paired_files
        self.transforms = transforms

        # Create a mapping from original category IDs to COCO category IDs (1-indexed)
        # We need to iterate through all annotation files to get the unique original category IDs
        unique_original_category_ids = set()
        print("Collecting unique original category IDs for mapping...")
        # Use tqdm to show progress for collecting unique IDs
        for _, annotation_path in tqdm(paired_files, desc="Collecting Unique Category IDs"):
            try:
                with open(annotation_path, 'r') as f:
                    annotation_data = json.load(f)
                if 'annotations' in annotation_data:
                    for annotation in annotation_data['annotations']:
                        if 'category_id' in annotation:
                            unique_original_category_ids.add(annotation['category_id'])
            except Exception as e:
                print(f"Error processing annotation file {annotation_path}: {e}")
                continue # Skip problematic annotation files


        # Sort the unique original category IDs to create a consistent mapping
        sorted_original_category_ids = sorted(list(unique_original_category_ids))
        # Create a mapping: original_id -> coco_id (1-indexed)
        self.original_to_coco_category_id = {
            original_id: i + 1 for i, original_id in enumerate(sorted_original_category_ids)
        }
        # Determine the actual number of classes based on unique original IDs
        self.num_actual_classes = len(sorted_original_category_ids)
        self.expected_max_label = self.num_actual_classes # Expected max 1-indexed label

        print(f"Created mapping for {len(self.original_to_coco_category_id)} original categories. Actual number of classes: {self.num_actual_classes}")
        # Print the mapping for inspection (optional)
        # print("Original to COCO category ID mapping:")
        # display(self.original_to_coco_category_id)


    def __len__(self):
        return len(self.paired_files)

    def __getitem__(self, idx):
        img_path, annotation_path = self.paired_files[idx]

        # Load image
        img = cv2.imread(img_path)
        if img is None:
            print(f"Warning: Could not load image {img_path}. Skipping.")
            # Return None or raise an error, depending on desired behavior
            # For now, returning None and handling it in the DataLoader loop (if collate_fn supports it)
            return None, None

        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) # Convert to RGB
        img_height, img_width, _ = img.shape

        # Load annotations
        try:
            with open(annotation_path, 'r') as f:
                annotation_data = json.load(f)
        except Exception as e:
            print(f"Error loading annotation file {annotation_path}: {e}. Skipping image.")
            return None, None


        # Extract bounding boxes and labels
        boxes = []
        labels = []
        # Assuming 'annotations' is a list of dicts and each dict has 'bbox' and 'category_id'
        if 'annotations' in annotation_data:
            for annotation in annotation_data['annotations']:
                if 'bbox' in annotation and 'category_id' in annotation:
                    x, y, w, h = annotation['bbox']

                    # Validate bounding box coordinates
                    # Ensure coordinates are within image bounds and width/height are positive
                    x_min, y_min, x_max, y_max = x, y, x + w, y + h

                    # Clip coordinates to image bounds
                    x_min = max(0, x_min)
                    y_min = max(0, y_min)
                    x_max = min(img_width, x_max)
                    y_max = min(img_height, y_max) # Corrected typo: img_height

                    # Check for invalid box dimensions after clipping
                    if x_max > x_min and y_max > y_min:
                        # Get the COCO format label using the mapping
                        original_label = annotation['category_id']

                        if original_label in self.original_to_coco_category_id:
                            coco_label = self.original_to_coco_category_id[original_label]
                            # Validate mapped COCO label range (should be 1 to num_actual_classes)
                            # This check is redundant if original_label is in the mapping, but kept for clarity
                            if 1 <= coco_label <= self.expected_max_label:
                                boxes.append([x_min, y_min, x_max, y_max]) # Convert [x, y, w, h] to [x_min, y_min, x_max, y_max]
                                labels.append(coco_label)
                            else:
                                # This case should ideally not happen if mapping is correct and original IDs are within unique_original_category_ids
                                print(f"Error: Mapped COCO label {coco_label} for original ID {original_label} is out of expected range [1, {self.expected_max_label}] in {annotation_path}. Skipping annotation.")
                        else:
                             # If original category ID is not in the mapping, skip this annotation
                             print(f"Warning: Original category ID {original_label} found in {annotation_path} but not in initial mapping. Skipping annotation.")


                    else:
                        print(f"Warning: Invalid bounding box dimensions [{x}, {y}, {w}, {h}] for image {os.path.basename(img_path)}. Skipping annotation.")


        # Convert boxes and labels to tensors
        boxes = torch.as_tensor(boxes, dtype=torch.float32)
        labels = torch.as_tensor(labels, dtype=torch.int64)

        # Create target dictionary
        target = {}
        target["boxes"] = boxes
        target["labels"] = labels
        # You might want to add image_id, area, iscrowd etc. depending on your needs
        target["image_id"] = torch.tensor([idx]) # Simple image_id for now

        # Convert image to tensor
        # Using F2.to_image and F2.to_dtype to handle different image types and convert to float
        try:
            img = F2.to_image(img)
            img = F2.to_dtype(img, torch.float32, scale=True) # Scale to [0, 1]
        except Exception as e:
            print(f"Error converting image {img_path} to tensor: {e}. Skipping.")
            return None, None


        if self.transforms:
             # Apply transforms to both image and target
             # Note: Applying transforms to bounding boxes requires special handling.
             # The transforms you use should be compatible with bounding box transformations.
             # For simplicity, this example only applies transforms to the image.
             # You will need to modify this part if you need data augmentation that affects bounding boxes.
            img = self.transforms(img)

        # Ensure the number of boxes and labels match after validation and processing
        if boxes.shape[0] != labels.shape[0]:
             print(f"Warning: Mismatch between number of boxes ({boxes.shape[0]}) and labels ({labels.shape[0]}) in image {os.path.basename(img_path)}. Skipping image.")
             return None, None

        return img, target

    def get_annotations(self):
        """
        Returns annotations in COCO format.
        """
        coco_format_data = {
            "images": [],
            "annotations": [],
            "categories": []
        }

        annotation_id_counter = 0
        # Create COCO categories based on the mapping
        # Ensure categories are included for all original IDs in the mapping
        coco_format_data["categories"] = [
            {"id": coco_id, "name": f"category_{original_id}", "supercategory": "none"}
            for original_id, coco_id in sorted(self.original_to_coco_category_id.items(), key=lambda item: item[1]) # Sort by coco_id
        ]
        # Add background category (ID 0)
        if not any(cat['id'] == 0 for cat in coco_format_data["categories"]):
             coco_format_data["categories"].insert(0, {"id": 0, "name": "background", "supercategory": "none"}) # Insert at the beginning


        for idx, (img_path, annotation_path) in enumerate(tqdm(self.paired_files, desc="Formatting to COCO")):
            # Add image info
            img = cv2.imread(img_path)
            if img is None:
                continue # Skip if image could not be loaded
            img_height, img_width, _ = img.shape
            coco_format_data["images"].append({
                "id": idx, # Use index as image_id for simplicity
                "width": img_width,
                "height": img_height,
                "file_name": os.path.basename(img_path)
            })

            # Load annotations
            try:
                with open(annotation_path, 'r') as f:
                    annotation_data = json.load(f)
            except Exception as e:
                print(f"Error loading annotation file {annotation_path}: {e}. Skipping annotations for this image.")
                continue


            if 'annotations' in annotation_data:
                for annotation in annotation_data['annotations']:
                    if 'bbox' in annotation and 'category_id' in annotation:
                        x, y, w, h = annotation['bbox']
                        original_label = annotation['category_id']

                        if original_label in self.original_to_coco_category_id:
                            coco_label = self.original_to_coco_category_id[original_label]

                            # Basic validation for bbox (optional, but good practice)
                            # Ensure bbox is within image bounds and has positive dimensions
                            x_min, y_min, x_max, y_max = x, y, x + w, y + h
                            x_min = max(0, x_min)
                            y_min = max(0, y_min)
                            x_max = min(img_width, x_max)
                            y_max = min(img_height, y_max)
                            valid_bbox = x_max > x_min and y_max > y_min

                            if valid_bbox:
                                coco_format_data["annotations"].append({
                                    "id": annotation_id_counter,
                                    "image_id": idx, # Use index as image_id
                                    "category_id": coco_label,
                                    "bbox": [x_min, y_min, x_max - x_min, y_max - y_min], # Store in [x, y, w, h] format for COCO
                                    "area": (x_max - x_min) * (y_max - y_min),
                                    "iscrowd": annotation.get("iscrowd", 0) # Assume 0 if not present
                                })
                                annotation_id_counter += 1
                            else:
                                print(f"Warning: Invalid or out-of-bounds bounding box after clipping in {os.path.basename(img_path)}. Skipping annotation.")

                        # No else needed here, as original_label not in mapping is handled during __getitem__ init


        # Create a dummy COCO object from the formatted data
        # This is a workaround as CocoEvaluator expects a COCO object
        class DummyCOCO:
            def __init__(self, dataset):
                self.dataset = dataset
                self.imgToAnns, self.catToImgs = self.createIndex()
                self.imgs = {img['id']: img for img in dataset['images']} # Add imgs attribute
                self.cats = {cat['id']: cat for cat in dataset['categories']} # Add cats attribute
                self.anns = {ann['id']: ann for ann in dataset['annotations']} # Add anns attribute


            def createIndex(self):
                # Simplified indexing for evaluation
                imgToAnns = defaultdict(list)
                catToImgs = defaultdict(list)
                for ann in self.dataset['annotations']:
                    imgToAnns[ann['image_id']].append(ann)
                    catToImgs[ann['category_id']].append(ann['image_id'])
                return imgToAnns, catToImgs

            def getImgIds(self, imgIds=None, catIds=None):
                 imgIds = imgIds or list(self.imgs.keys())
                 catIds = catIds or list(self.cats.keys())

                 img_ids_with_cat = set()
                 for ann in self.dataset['annotations']:
                     if ann['category_id'] in catIds:
                         img_ids_with_cat.add(ann['image_id'])

                 if imgIds:
                     return list(set(imgIds) & img_ids_with_cat)
                 else:
                     return list(img_ids_with_cat)


            def loadImgs(self, ids=None):
                if ids is None:
                    return self.dataset['images']
                else:
                    return [self.imgs[id] for id in ids if id in self.imgs]


            def loadAnns(self, ids=None):
                 if ids is None:
                     return self.dataset['annotations']
                 else:
                     return [self.anns[id] for id in ids if id in self.anns]


            def getCatIds(self, catNms=None, supNms=None, catIds=None):
                 cats = self.dataset['categories']
                 cats = cats if catNms is None else [cat for cat in cats if cat['name'] in catNms]
                 cats = cats if supNms is None else [cat for cat in cats if cat['supercategory'] in supNms]
                 cat_ids = [cat['id'] for cat in cats]

                 if catIds:
                     return list(set(cat_ids) & set(catIds))
                 else:
                     return cat_ids

            def loadCats(self, ids):
                return [self.cats[id] for id in ids if id in self.cats]


        return DummyCOCO(coco_format_data)

This dataset class takes the list of `paired_files` generated earlier and an optional `transforms` object. The `__getitem__` method loads the image and its corresponding JSON annotation, extracts the bounding boxes and category IDs, converts them to tensors, and returns the image and target dictionary.

**Important Considerations:**

- **Bounding Box Transformations:** If you apply data augmentations that change the image geometry (e.g., resizing, cropping, flipping), you will need to implement corresponding transformations for the bounding boxes. Libraries like `torchvision.transforms.v2` or `albumentations` can help with this. The current code only applies image transformations.
- **Image ID:** A simple sequential image ID is used here. You might want to use a more robust identifier if needed.
- **Other Annotation Data:** The current implementation only extracts bounding boxes and category IDs. You can extend it to include other information from the annotation file (e.g., segmentation masks, keypoints) if your task requires it.

Now that you have the `PillDetectionDataset` class, let's create DataLoaders for training and validation. We'll first split the `paired_files` into training and validation sets.

In [41]:
from torch.utils.data import DataLoader, random_split

# Define the split ratio (e.g., 80% for training, 20% for validation)
train_ratio = 0.8
val_ratio = 0.2

# Ensure the sum of ratios is 1
assert train_ratio + val_ratio == 1.0

# Calculate the number of samples for each set
total_samples = len(paired_files)
train_size = int(train_ratio * total_samples)
val_size = total_samples - train_size # Ensure all samples are used

# Split the dataset
# We split the paired_files list directly as the dataset takes this list as input
train_paired_files, val_paired_files = random_split(paired_files, [train_size, val_size])

print(f"Total samples: {total_samples}")
print(f"Training samples: {len(train_paired_files)}")
print(f"Validation samples: {len(val_paired_files)}")

Total samples: 4526
Training samples: 3620
Validation samples: 906


Now, we'll create the `PillDetectionDataset` instances for the training and validation sets and then create the DataLoaders.

For object detection, a custom `collate_fn` is usually needed for the DataLoader to handle targets (bounding boxes, labels, etc.) of varying sizes in a batch.

In [42]:
from torchvision.transforms import functional as F

# Define a collation function for the DataLoader
# This function takes a batch of samples (image, target) and stacks them into tensors
# while handling the target dictionary.
def collate_fn(batch):
    return tuple(zip(*batch))

# Create dataset instances for training and validation
# You can add transforms here if needed, e.g., data augmentation for training
train_dataset = PillDetectionDataset(train_paired_files)
val_dataset = PillDetectionDataset(val_paired_files)

# Define batch size
batch_size = 2 # You can adjust this based on your GPU memory

# Create DataLoaders
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, collate_fn=collate_fn)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False, collate_fn=collate_fn)

print("\nDataLoaders created successfully.")
print(f"Training DataLoader has {len(train_loader)} batches of size {batch_size}") # Use batch_size from the loader
print(f"Validation DataLoader has {len(val_loader)} batches of size {batch_size}") # Use batch_size from the loader


DataLoaders created successfully.
Training DataLoader has 1810 batches of size 2
Validation DataLoader has 453 batches of size 2


In [24]:
# Determine the number of unique category IDs
unique_category_ids = set()
for _, annotation_path in tqdm(paired_files, desc="Collecting unique category IDs"):
    with open(annotation_path, 'r') as f:
        annotation_data = json.load(f)
    if 'annotations' in annotation_data:
        for annotation in annotation_data['annotations']:
            if 'category_id' in annotation:
                unique_category_ids.add(annotation['category_id'])

# The number of classes for the model is the number of unique categories + 1 (for background)
num_classes = len(unique_category_ids)
print(f"\nFound {num_classes} unique category IDs in the dataset.")

Collecting unique category IDs:   0%|          | 13/4526 [00:04<28:34,  2.63it/s]


KeyboardInterrupt: 

In [47]:
import torchvision
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor

def get_model_instance_segmentation(num_classes):
    # load a model pre-trained on COCO
    model = torchvision.models.detection.fasterrcnn_resnet50_fpn(weights="DEFAULT")

    # get the number of input features for the classifier
    in_features = model.roi_heads.box_predictor.cls_score.in_features

    # replace the pre-trained head with a new one that has num_classes
    # Note: The number of classes in the classifier head should be num_classes + 1
    # because the background is considered as class 0.
    model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes + 1)

    return model

# Based on previous execution, the actual number of unique categories is 73.
num_classes = 73

# get the model using our helper function
model = get_model_instance_segmentation(num_classes)

# move model to the right device
model.to(device)

print("Faster R-CNN model defined and moved to device.")

Faster R-CNN model defined and moved to device.


In [48]:
import torch
from tqdm import tqdm
import numpy as np
from collections import defaultdict

# Helper function to calculate Intersection over Union (IoU)
def calculate_iou(box1, box2):
    """
    Calculates the Intersection over Union (IoU) of two bounding boxes.
    Boxes are expected in [x_min, y_min, x_max, y_max] format.
    """
    # Determine the coordinates of the intersection rectangle
    x_min_inter = max(box1[0], box2[0])
    y_min_inter = max(box1[1], box2[1])
    x_max_inter = min(box1[2], box2[2])
    y_max_inter = min(box1[3], box2[3])

    # Compute the area of intersection rectangle
    inter_area = max(0, x_max_inter - x_min_inter) * max(0, y_max_inter - y_min_inter)

    # Compute the area of both the prediction and ground-truth rectangles
    box1_area = (box1[2] - box1[0]) * (box1[3] - box1[1])
    box2_area = (box2[2] - box2[0]) * (box2[3] - box2[1])

    # Compute the intersection over union by dividing the intersection area by the sum of prediction and ground-truth areas - the intersection area
    iou = inter_area / (box1_area + box2_area - inter_area + 1e-6) # Add a small epsilon to avoid division by zero

    return iou

# Helper function to calculate Average Precision (AP) for a single class
def calculate_ap(recall, precision):
    """
    Calculates Average Precision (AP) from precision and recall values.
    Uses the 11-point interpolation method (common in original PASCAL VOC).
    For a more accurate method (used in COCO), you would interpolate over all unique recall values.
    """
    # 11-point interpolation
    ap = 0.0
    for t in np.arange(0.0, 1.1, 0.1):
        if np.sum(recall >= t) == 0:
            p = 0
        else:
            p = np.max(precision[recall >= t])
        ap += p / 11.0
    return ap

# Evaluation function to calculate mAP
def evaluate(model, data_loader, device, iou_threshold=0.5):
    """
    Evaluates the model on the provided data loader and calculates mAP.

    Args:
        model: The object detection model to evaluate.
        data_loader: The DataLoader for the evaluation dataset.
        device: The device to run evaluation on (e.g., 'cuda' or 'cpu').
        iou_threshold (float): The IoU threshold for considering a detection as a true positive.

    Returns:
        float: The calculated mean Average Precision (mAP).
    """
    model.eval() # Set the model to evaluation mode

    # Store predictions and ground truth for each image
    all_predictions = []
    all_ground_truth = []
    image_ids = []

    with torch.no_grad(): # Disable gradient calculation during evaluation
        for images, targets in tqdm(data_loader, desc="Evaluating"):
            images = list(image.to(device) for image in images)
            # targets are the ground truth annotations
            targets = [{k: v.to(device) for k, v in t.items()} for t in targets]

            # Get predictions from the model
            predictions = model(images)

            # Collect predictions and ground truth for each image in the batch
            for i, p in enumerate(predictions):
                 # Move tensors to CPU and convert to numpy arrays or lists for easier processing
                 boxes = p['boxes'].cpu().numpy()
                 scores = p['scores'].cpu().numpy()
                 labels = p['labels'].cpu().numpy() # These are the 1-indexed labels from the model

                 # Get ground truth for the current image
                 gt_boxes = targets[i]['boxes'].cpu().numpy()
                 gt_labels = targets[i]['labels'].cpu().numpy() # These are the 1-indexed labels from the dataset

                 # Store predictions and ground truth along with image ID
                 # Assuming image_id is available in targets
                 img_id = targets[i]['image_id'].item() if targets and 'image_id' in targets[i] else len(all_predictions) # Use index if image_id not available

                 all_predictions.append({
                     "image_id": img_id,
                     "boxes": boxes,
                     "scores": scores,
                     "labels": labels
                 })
                 all_ground_truth.append({
                     "image_id": img_id,
                     "boxes": gt_boxes,
                     "labels": gt_labels,
                     "detected": np.zeros(len(gt_boxes), dtype=bool) # Keep track of detected ground truth boxes
                 })
                 image_ids.append(img_id)

    # --- Calculate mAP ---
    # This is a simplified mAP calculation. For a more rigorous evaluation (like COCO),
    # you would need to consider different IoU thresholds and more sophisticated matching.

    # Get all unique class labels (excluding background, assuming background is 0)
    # Collect all labels from ground truth and predictions to find unique labels
    all_labels = set()
    for gt in all_ground_truth:
        all_labels.update(gt['labels'])
    for pred in all_predictions:
        all_labels.update(pred['labels'])

    # Remove background label if present (assuming background is 0)
    if 0 in all_labels:
        all_labels.remove(0)

    unique_classes = sorted(list(all_labels))

    average_precisions = []

    for class_id in tqdm(unique_classes, desc="Calculating AP per class"):
        # Get predictions and ground truth for the current class across all images
        class_predictions = []
        class_ground_truth = []
        for img_pred, img_gt in zip(all_predictions, all_ground_truth):
            # Filter predictions for the current class
            class_preds_for_img = [(img_pred['boxes'][i], img_pred['scores'][i])
                                   for i in range(len(img_pred['labels']))
                                   if img_pred['labels'][i] == class_id]
            class_predictions.extend(class_preds_for_img)

            # Filter ground truth for the current class
            class_gts_for_img = [img_gt['boxes'][i]
                                 for i in range(len(img_gt['labels']))
                                 if img_gt['labels'][i] == class_id]
            class_ground_truth.extend(class_gts_for_img) # Append boxes

            # Reset 'detected' flag for ground truth for this class across all images
            # This is not entirely correct for per-class AP calculation over all images.
            # A more rigorous approach tracks detections per image and per class.
            # For simplicity in this basic implementation, we'll reset per class.
            # In a true implementation, you would need to manage detection status per ground truth box.
            # For this basic version, we'll just count total ground truths for the class.
            # The matching logic below is also simplified.

        # Sort predictions by confidence score in descending order
        class_predictions.sort(key=lambda x: x[1], reverse=True)

        # Simplified True Positives and False Positives calculation
        TP = np.zeros(len(class_predictions))
        FP = np.zeros(len(class_predictions))
        num_ground_truths = len(class_ground_truth)

        # Keep track of which ground truth boxes have been matched for this class across all images
        # This requires a more complex structure than a simple boolean array if ground truths
        # from different images are in a single list.
        # For a correct implementation, you need to match predictions to ground truths within each image first.
        # Let's refine the process to match within images.

        # Re-structure: Match predictions to ground truths within each image
        img_wise_matches = defaultdict(list) # Store (pred_idx, gt_idx, iou) for matches in each image
        img_wise_detected_gt = defaultdict(set) # Store indices of detected ground truths per image

        # Iterate through predictions and match with ground truth within the same image
        # This requires knowing which prediction corresponds to which image.
        # The current structure of class_predictions flattens across images.
        # We need to iterate through images again.

        # Let's restart the matching logic for per-image matching

        true_positives = [] # List of booleans, whether each prediction is a TP
        false_positives = [] # List of booleans, whether each prediction is a FP
        scores = [] # List of scores for all predictions of this class

        # Iterate through each image's predictions and ground truths
        for img_pred, img_gt in zip(all_predictions, all_ground_truth):
            img_preds_for_class = [(img_pred['boxes'][i], img_pred['scores'][i])
                                   for i in range(len(img_pred['labels']))
                                   if img_pred['labels'][i] == class_id]

            img_gts_for_class = [(img_gt['boxes'][i], i) # Store box and original index
                                 for i in range(len(img_gt['labels']))
                                 if img_gt['labels'][i] == class_id]

            # Keep track of detected ground truth indices for this image and class
            detected_gt_indices = set()

            # Sort predictions for this image by score
            img_preds_for_class.sort(key=lambda x: x[1], reverse=True)

            for pred_box, score in img_preds_for_class:
                scores.append(score)
                is_tp = False
                best_iou = 0.0
                best_gt_idx = -1

                # Find the best matching ground truth box for this prediction within the image
                for gt_box, gt_idx in img_gts_for_class:
                    iou = calculate_iou(pred_box, gt_box)
                    if iou > best_iou:
                        best_iou = iou
                        best_gt_idx = gt_idx

                # Determine if it's a True Positive or False Positive
                if best_iou >= iou_threshold:
                    # Check if the best matching ground truth has already been detected by a higher scoring prediction
                    if best_gt_idx not in detected_gt_indices:
                        is_tp = True
                        detected_gt_indices.add(best_gt_idx) # Mark this ground truth as detected

                if is_tp:
                    true_positives.append(True)
                    false_positives.append(False)
                else:
                    false_positives.append(True)
                    true_positives.append(False) # Not a true positive

        # Sort TP, FP, and scores by score in descending order
        # Combine scores with TP and FP flags for sorting
        sorted_indices = np.argsort(scores)[::-1]
        sorted_tp = np.array(true_positives)[sorted_indices]
        sorted_fp = np.array(false_positives)[sorted_indices]

        # Calculate cumulative TP and FP
        cumulative_tp = np.cumsum(sorted_tp)
        cumulative_fp = np.cumsum(sorted_fp)

        # Calculate Precision and Recall
        # Precision = TP / (TP + FP)
        # Recall = TP / Number of Ground Truths for this class
        num_ground_truths_for_class = sum(len([lbl for lbl in gt['labels'] if lbl == class_id]) for gt in all_ground_truth)

        if num_ground_truths_for_class == 0:
            # If there are no ground truths for this class, AP is 0 unless there are also no predictions.
            # If there are predictions but no ground truths, precision is 0, recall is 0. AP is 0.
            # If there are no predictions and no ground truths, AP is typically considered 1 (vacuously true)
            # or sometimes excluded from mAP calculation. For simplicity, we'll treat as 0 AP if no ground truths.
            average_precisions.append(0.0)
            continue

        precision = cumulative_tp / (cumulative_tp + cumulative_fp + 1e-6) # Add epsilon for stability
        recall = cumulative_tp / num_ground_truths_for_class

        # Calculate AP using the 11-point interpolation method
        ap = calculate_ap(recall, precision)
        average_precisions.append(ap)

    # Calculate mAP by averaging the APs for all classes
    if len(average_precisions) > 0:
        mAP = np.mean(average_precisions)
    else:
        mAP = 0.0 # No classes found or evaluated

    print(f"\nCalculated mAP @ IoU={iou_threshold}: {mAP:.4f}")

    # Return the mAP value
    return {"mAP": mAP}

In [51]:
import torch
import os

# Assuming model, optimizer, scheduler, train_loader, val_loader, device are already defined
# Assuming train_one_epoch and evaluate functions are already defined (specifically the one from cell 222c0423)

# Define the number of epochs
num_epochs = 10 # You can adjust this

# Define the directory to save the model checkpoints
model_save_dir = "/content/drive/MyDrive/model_checkpoints"
os.makedirs(model_save_dir, exist_ok=True)

# Training loop
print("Starting training...")
best_mAP = 0.0 # To keep track of the best mAP for saving the best model

for epoch in range(num_epochs):
    # Train for one epoch
    train_one_epoch(model, optimizer, train_loader, device, epoch, print_freq=10)

    # Update the learning rate
    # Assuming you are using a scheduler defined as 'scheduler'
    if 'scheduler' in globals() and scheduler is not None:
         scheduler.step()
    else:
         print("Warning: Learning rate scheduler 'scheduler' not found or is None. Skipping scheduler step.")


    # Evaluate on the validation set
    print("\nEvaluating on validation set...")
    # Call the evaluate function which now calculates mAP directly
    eval_results = evaluate(model, val_loader, device)

    # Check if evaluation returned results and if mAP is available
    if eval_results is not None and "mAP" in eval_results:
        current_mAP = eval_results["mAP"]
        print(f"Validation mAP: {current_mAP:.4f}")

        # Save the model if the current mAP is better than the best mAP
        if current_mAP > best_mAP:
            best_mAP = current_mAP
            # Include epoch number in the filename
            model_save_path = os.path.join(model_save_dir, f"fasterrcnn_resnet50_fpn_best_mAP_epoch_{epoch:03d}.pth")
            torch.save(model.state_dict(), model_save_path)
            print(f"Saved best model to {model_save_path} with mAP: {best_mAP:.4f}")
        else:
            print(f"Validation mAP ({current_mAP:.4f}) did not improve from best mAP ({best_mAP:.4f})")
    else:
        print("Evaluation results or mAP not available. Skipping model saving based on mAP for this epoch.")


print("\nTraining finished.")

# You can load the best model later using:
# model = get_model_instance_segmentation(num_classes) # Initialize model architecture
# model.load_state_dict(torch.load(os.path.join(model_save_dir, "fasterrcnn_resnet50_fpn_best_mAP_epoch_XXX.pth"))) # Replace XXX with the epoch number
# model.to(device)

Starting training...


IndexError: Target 27733 is out of bounds.