In [2]:
import os
import json
from sklearn.model_selection import train_test_split

# Paths
dataset_path = "dataset"
images_path = os.path.join(dataset_path, "images")
annotations_path = os.path.join(dataset_path, "annotations", "instances_default.json")

# Load annotations
with open(annotations_path, "r") as f:
    annotations = json.load(f)

# Extract image IDs and file names
image_ids = [img["id"] for img in annotations["images"]]
image_files = [img["file_name"] for img in annotations["images"]]

# Split dataset (80% train, 20% val)
train_ids, val_ids, train_files, val_files = train_test_split(
    image_ids, image_files, test_size=0.2, random_state=42
)

# Create train and val folders
train_path = os.path.join(dataset_path, "train")
val_path = os.path.join(dataset_path, "val")
os.makedirs(os.path.join(train_path, "images"), exist_ok=True)
os.makedirs(os.path.join(train_path, "annotations"), exist_ok=True)
os.makedirs(os.path.join(val_path, "images"), exist_ok=True)
os.makedirs(os.path.join(val_path, "annotations"), exist_ok=True)

# Move images to train and val folders
for img_id, img_file in zip(train_ids, train_files):
    shutil.move(
        os.path.join(images_path, img_file),
        os.path.join(train_path, "images", img_file)
    )

for img_id, img_file in zip(val_ids, val_files):
    shutil.move(
        os.path.join(images_path, img_file),
        os.path.join(val_path, "images", img_file)
    )

# Filter annotations for train and val sets
train_annotations = {
    "images": [img for img in annotations["images"] if img["id"] in train_ids],
    "annotations": [ann for ann in annotations["annotations"] if ann["image_id"] in train_ids],
    "categories": annotations["categories"]
}

val_annotations = {
    "images": [img for img in annotations["images"] if img["id"] in val_ids],
    "annotations": [ann for ann in annotations["annotations"] if ann["image_id"] in val_ids],
    "categories": annotations["categories"]
}

# Save train and val annotations
with open(os.path.join(train_path, "annotations", "train.json"), "w") as f:
    json.dump(train_annotations, f)

with open(os.path.join(val_path, "annotations", "val.json"), "w") as f:
    json.dump(val_annotations, f)

print("Dataset split completed!")

Dataset split completed!


In [3]:
pip install torch torchvision

Note: you may need to restart the kernel to use updated packages.


In [3]:
pip install pycocotools

Collecting pycocotools
  Obtaining dependency information for pycocotools from https://files.pythonhosted.org/packages/2e/f5/dfa78dc72e47dfe1ada7b37fedcb338454750470358a6dfcfdfda35fa337/pycocotools-2.0.8-cp311-cp311-win_amd64.whl.metadata
  Downloading pycocotools-2.0.8-cp311-cp311-win_amd64.whl.metadata (1.1 kB)
Downloading pycocotools-2.0.8-cp311-cp311-win_amd64.whl (85 kB)
   ---------------------------------------- 0.0/85.3 kB ? eta -:--:--
   -------------- ------------------------- 30.7/85.3 kB 1.4 MB/s eta 0:00:01
   ------------------------ --------------- 51.2/85.3 kB 525.1 kB/s eta 0:00:01
   --------------------------------- ------ 71.7/85.3 kB 660.6 kB/s eta 0:00:01
   -------------------------------------- - 81.9/85.3 kB 383.3 kB/s eta 0:00:01
   ---------------------------------------- 85.3/85.3 kB 344.3 kB/s eta 0:00:00
Installing collected packages: pycocotools
Successfully installed pycocotools-2.0.8
Note: you may need to restart the kernel to use updated packages.


In [12]:
import torch
import torch.optim as optim
from torch.utils.data import DataLoader
from torchvision.datasets import CocoDetection
from torchvision.transforms import ToTensor
import torchvision
from torchvision.models.detection import FasterRCNN
from torchvision.models.detection.rpn import AnchorGenerator

# Define device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

# Define model
backbone = torchvision.models.mobilenet_v2(weights="IMAGENET1K_V1").features  # Updated for latest torchvision
backbone.out_channels = 1280
anchor_generator = AnchorGenerator(
    sizes=((32, 64, 128, 256, 512),),
    aspect_ratios=((0.5, 1.0, 2.0),)
)
model = FasterRCNN(
    backbone,
    num_classes=4,  # Number of classes + 1 (background)
    rpn_anchor_generator=anchor_generator
)
model.to(device)

# Define datasets
train_image_dir = "dataset/train/images"
train_annotation_path = "dataset/train/annotations/train.json"
val_image_dir = "dataset/val/images"
val_annotation_path = "dataset/val/annotations/val.json"

# Define custom transform
from torchvision import transforms
transform = transforms.Compose([
    transforms.ToTensor()  # Keep images in original size (FasterRCNN supports variable sizes)
])

train_dataset = CocoDetection(
    root=train_image_dir,
    annFile=train_annotation_path,
    transform=transform
)
val_dataset = CocoDetection(
    root=val_image_dir,
    annFile=val_annotation_path,
    transform=transform
)

# Define custom collate function to handle variable-sized images
def collate_fn(batch):
    return tuple(zip(*batch))

# Define data loaders with collate_fn
train_loader = DataLoader(train_dataset, batch_size=4, shuffle=True, collate_fn=collate_fn)
val_loader = DataLoader(val_dataset, batch_size=4, shuffle=False, collate_fn=collate_fn)

# Define optimizer
optimizer = optim.SGD(model.parameters(), lr=0.005, momentum=0.9, weight_decay=0.0005)

# Convert targets to required format
def convert_targets(targets):
    converted_targets = []
    for target in targets:
        boxes = []
        labels = []
        for obj in target:
            bbox = obj["bbox"]  # COCO format: [x, y, width, height]
            label = obj["category_id"]
            boxes.append([bbox[0], bbox[1], bbox[0] + bbox[2], bbox[1] + bbox[3]])  # Convert to [x_min, y_min, x_max, y_max]
            labels.append(label)
        converted_targets.append({
            "boxes": torch.tensor(boxes, dtype=torch.float32),
            "labels": torch.tensor(labels, dtype=torch.int64)
        })
    return converted_targets

# Training loop
num_epochs = 10
for epoch in range(num_epochs):
    model.train()
    total_loss = 0
    for images, targets in train_loader:
        images = list(image.to(device) for image in images)
        targets = convert_targets(targets)
        targets = [{k: v.to(device) for k, v in t.items()} for t in targets]

        loss_dict = model(images, targets)
        losses = sum(loss for loss in loss_dict.values())

        optimizer.zero_grad()
        losses.backward()
        optimizer.step()

        total_loss += losses.item()

    print(f"Epoch {epoch+1}/{num_epochs}, Loss: {total_loss / len(train_loader):.4f}")


Using device: cpu
loading annotations into memory...
Done (t=0.01s)
creating index...
index created!
loading annotations into memory...
Done (t=0.00s)
creating index...
index created!
Epoch 1/10, Loss: 1.2118
Epoch 2/10, Loss: 0.9571
Epoch 3/10, Loss: 0.9450
Epoch 4/10, Loss: 0.9137
Epoch 5/10, Loss: 0.9043
Epoch 6/10, Loss: 0.9175
Epoch 7/10, Loss: 0.9102
Epoch 8/10, Loss: 0.9074
Epoch 9/10, Loss: 0.8865
Epoch 10/10, Loss: 0.8525


In [14]:
import torch
from torchvision.ops.boxes import box_iou
from tqdm import tqdm  # Progress bar for easy tracking

# Function to convert model outputs to a readable format
def convert_outputs(outputs):
    results = []
    for output in outputs:
        boxes = output["boxes"].cpu().detach().numpy()  # Convert tensor to NumPy
        labels = output["labels"].cpu().detach().numpy()
        scores = output["scores"].cpu().detach().numpy()
        results.append({"boxes": boxes, "labels": labels, "scores": scores})
    return results

# Function to compute mean Average Precision (mAP)
def compute_map(pred_boxes, pred_labels, pred_scores, gt_boxes, gt_labels, iou_threshold=0.5):
    """
    Computes mAP at a given IoU threshold.
    """
    correct_detections = 0
    total_predictions = 0
    total_ground_truths = 0

    for i in range(len(pred_boxes)):  # Iterate through batch
        if len(gt_boxes[i]) == 0:  # No ground truth for this image
            continue

        ious = box_iou(torch.tensor(pred_boxes[i]), torch.tensor(gt_boxes[i]))
        matched = ious.max(dim=1).values > iou_threshold  # Check if IoU > threshold

        correct_detections += matched.sum().item()
        total_predictions += len(pred_boxes[i])
        total_ground_truths += len(gt_boxes[i])

    precision = correct_detections / total_predictions if total_predictions > 0 else 0
    recall = correct_detections / total_ground_truths if total_ground_truths > 0 else 0
    return precision, recall

# Evaluation function
def evaluate_model(model, val_loader, device):
    model.eval()  # Set model to evaluation mode
    total_precision, total_recall = 0, 0
    num_batches = len(val_loader)

    with torch.no_grad():  # Disable gradient calculations
        for images, targets in tqdm(val_loader, desc="Evaluating Model"):
            images = [img.to(device) for img in images]
            outputs = model(images)  # Get predictions

            # Convert model outputs
            pred_results = convert_outputs(outputs)

            # Convert ground truth to same format
            gt_results = convert_targets(targets)

            # Extract necessary information
            pred_boxes = [res["boxes"] for res in pred_results]
            pred_labels = [res["labels"] for res in pred_results]
            pred_scores = [res["scores"] for res in pred_results]
            gt_boxes = [res["boxes"].cpu().numpy() for res in gt_results]
            gt_labels = [res["labels"].cpu().numpy() for res in gt_results]

            # Compute mAP metrics
            precision, recall = compute_map(pred_boxes, pred_labels, pred_scores, gt_boxes, gt_labels)
            total_precision += precision
            total_recall += recall

    avg_precision = total_precision / num_batches
    avg_recall = total_recall / num_batches
    print(f"\nEvaluation Results:\n - Precision: {avg_precision:.4f}\n - Recall: {avg_recall:.4f}")

# Call the evaluation function
evaluate_model(model, val_loader, device)


Evaluating Model: 100%|██████████████████████████████████████████████████████████████████| 3/3 [01:05<00:00, 21.85s/it]


Evaluation Results:
 - Precision: 0.0975
 - Recall: 0.8840





In [15]:
torch.save(model.state_dict(), "best.pt")