In [4]:
# Install Kaggle API
!pip install -q kaggle

# # Upload kaggle.json
# from google.colab import files
# files.upload()

# Create Kaggle folder and move kaggle.json there
!mkdir -p ~/.kaggle
!mv kaggle.json ~/.kaggle/
!chmod 600 ~/.kaggle/kaggle.json

# Download the dataset
!kaggle datasets download -d siddharthkumarsah/ships-in-aerial-images

# Unzip the dataset
!unzip ships-in-aerial-images.zip -d dataset

[1;30;43mStreaming output truncated to the last 5000 lines.[0m
  inflating: dataset/ships-aerial-images/train/labels/x0593_png.rf.33fd03bd0d9896e31b73ac56c1e458c8.txt  
  inflating: dataset/ships-aerial-images/train/labels/x0594_png.rf.1a86349573567a368a653b5b39fff4af.txt  
  inflating: dataset/ships-aerial-images/train/labels/x0594_png.rf.ff6551627101ade7c4ecd6887fa45546.txt  
  inflating: dataset/ships-aerial-images/train/labels/x0595_png.rf.bc03f4a3bc686c5d239d8552ce57b5f5.txt  
  inflating: dataset/ships-aerial-images/train/labels/x0597_png.rf.918ed233995c3645a1ca8d14bf35a81c.txt  
  inflating: dataset/ships-aerial-images/train/labels/x0598_png.rf.94d8be92f98bf3ff36cf9f51b1e17bc9.txt  
  inflating: dataset/ships-aerial-images/train/labels/x0598_png.rf.a5b224c7880ba832c9d6d5aba3123e28.txt  
  inflating: dataset/ships-aerial-images/train/labels/x0599_png.rf.d215436f7b245cf0a244ab1c79118c83.txt  
  inflating: dataset/ships-aerial-images/train/labels/x0600_png.rf.6d245857b874aeb58d67

In [12]:
import numpy as np

In [16]:
import os
import json
import cv2

# Define your category mapping
categories = [
    {"id": 1, "name": "ship"}
]

def create_coco_annotation(ann_id, image_id, width, height, annotations):
    coco_annotations = []
    for ann in annotations:
        coco_annotation = {
            "id": ann_id,
            "image_id": image_id,
            "category_id": ann['category_id'],
            "bbox": ann['bbox'],
            "area": ann['bbox'][2] * ann['bbox'][3],
            "iscrowd": 0
        }
        coco_annotations.append(coco_annotation)
        ann_id += 1
    return coco_annotations, ann_id

def create_coco_image(image_id, file_name, width, height):
    return {
        "id": image_id,
        "file_name": file_name,
        "width": width,
        "height": height
    }

# Function to process a single dataset (train, valid, or test)
def convert_dataset_to_coco(dataset_path):
    images_path = os.path.join(dataset_path, "images")
    annotations_path = os.path.join(dataset_path, "labels")
    output_path = os.path.join(dataset_path, "annotations_coco")

    os.makedirs(output_path, exist_ok=True)

    images = []
    annotations = []
    annotation_id = 1

    for image_filename in os.listdir(images_path):
        if not image_filename.endswith(".jpg"):
            continue

        image_id = os.path.splitext(image_filename)[0]
        image_path = os.path.join(images_path, image_filename)
        annotation_path = os.path.join(annotations_path, f"{image_id}.txt")

        # Error handling for image loading
        try:
            image = cv2.imread(image_path)
            if image is None:  # Check if image loaded successfully
                print(f"Error loading image: {image_path}")
                continue
            height, width, _ = image.shape
        except Exception as e:
            print(f"Error processing image {image_path}: {e}")
            continue

        images.append(create_coco_image(image_id, image_filename, width, height))

        image_annotations = []
        if os.path.exists(annotation_path) and os.path.getsize(annotation_path) > 0:
            with open(annotation_path, "r") as file:
                for line in file:
                    parts = line.strip().split()
                    yolo_class_id = int(parts[0])
                    x_center = float(parts[1]) * width
                    y_center = float(parts[2]) * height
                    bbox_width = float(parts[3]) * width
                    bbox_height = float(parts[4]) * height

                    # Convert YOLO bbox format to COCO bbox format
                    x_min = x_center - bbox_width / 2
                    y_min = y_center - bbox_height / 2

                    image_annotations.append({
                        "category_id": yolo_class_id + 1,  # COCO category IDs are 1-based
                        "bbox": [x_min, y_min, bbox_width, bbox_height]
                    })

        if image_annotations:
            coco_image_annotations, annotation_id = create_coco_annotation(
                annotation_id, image_id, width, height, image_annotations
            )
            annotations.extend(coco_image_annotations)

    coco_format = {
        "images": images,
        "annotations": annotations,
        "categories": categories
    }

    output_json_path = os.path.join(output_path, "annotations_coco.json")
    with open(output_json_path, "w") as json_file:
        json.dump(coco_format, json_file, indent=4)

    print(f"COCO format annotations saved to {output_json_path}")


dataset_base_path = "/content/dataset/ships-aerial-images"
for dataset_type in ["train", "valid", "test"]:
  dataset_path = os.path.join(dataset_base_path, dataset_type)
  convert_dataset_to_coco(dataset_path)

COCO format annotations saved to /content/dataset/ships-aerial-images/train/annotations_coco/annotations_coco.json
COCO format annotations saved to /content/dataset/ships-aerial-images/valid/annotations_coco/annotations_coco.json
COCO format annotations saved to /content/dataset/ships-aerial-images/test/annotations_coco/annotations_coco.json


In [17]:
import os
import json
import torch
from PIL import Image
from torchvision.transforms import functional as F
from torch.utils.data import DataLoader
from tqdm import tqdm

class ShipDataset(torch.utils.data.Dataset):
    def __init__(self, root, transforms=None):
        self.root = root
        self.transforms = transforms
        self.imgs = list(sorted(os.listdir(os.path.join(root, "images"))))

        # Load COCO annotations
        with open(os.path.join(root, "annotations_coco", "annotations_coco.json"), 'r') as f:
            self.coco_annotations = json.load(f)

        # Create a mapping from image ID to annotations
        self.img_id_to_annotations = {}
        for ann in self.coco_annotations["annotations"]:
            img_id = ann["image_id"]
            if img_id not in self.img_id_to_annotations:
                self.img_id_to_annotations[img_id] = []
            self.img_id_to_annotations[img_id].append(ann)

        # Create a mapping from image ID to image info
        self.img_id_to_info = {img["id"]: img for img in self.coco_annotations["images"]}

    def __getitem__(self, idx):
      img_filename = self.imgs[idx]
      img_id = os.path.splitext(img_filename)[0]

      if img_id not in self.img_id_to_annotations:
          return None, None

      img_info = self.img_id_to_info[img_id]
      img_path = os.path.join(self.root, "images", img_filename)

      try:
          img = Image.open(img_path).convert("RGB")
      except IOError:
          print(f"Error loading image: {img_path}")
          return None, None  # Return None for both image and target

      annotations = self.img_id_to_annotations[img_id]

      boxes = []
      labels = []

      for ann in annotations:
          labels.append(ann["category_id"])
          bbox = ann["bbox"]
          xmin = bbox[0]
          ymin = bbox[1]
          xmax = bbox[0] + bbox[2]
          ymax = bbox[1] + bbox[3]

          # Check if the bounding box has positive height and width
          if xmax > xmin and ymax > ymin:
              boxes.append([xmin, ymin, xmax, ymax])
          else:
              # Skip invalid bounding boxes
              continue

      if not boxes:
          # If no valid bounding boxes are found, return None
          return None, None

      boxes = torch.as_tensor(boxes, dtype=torch.float32)
      labels = torch.as_tensor(labels, dtype=torch.int64)

      image_id = torch.tensor([idx])
      area = (boxes[:, 3] - boxes[:, 1]) * (boxes[:, 2] - boxes[:, 0])
      iscrowd = torch.zeros((len(boxes),), dtype=torch.int64)

      target = {
          "boxes": boxes,
          "labels": labels,
          "image_id": image_id,
          "area": area,
          "iscrowd": iscrowd
      }

      if self.transforms is not None:
          img = self.transforms(img)

      return img, target


    def __len__(self):
        return len(self.imgs)


def collate_fn(batch):
    return tuple(zip(*batch))


In [18]:
import torchvision
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor

def get_model(num_classes):
    # Load a model pre-trained on COCO
    model = torchvision.models.detection.fasterrcnn_resnet50_fpn(pretrained=True)

    # Get the number of input features for the classifier
    in_features = model.roi_heads.box_predictor.cls_score.in_features

    # Replace the pre-trained head with a new one
    model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)

    return model

# Our dataset has one class (ship) + background
num_classes = 2
model = get_model(num_classes)



In [19]:
import torch
from torch.utils.data import DataLoader
import torchvision.transforms as T
# Define transformations
transform = T.Compose([T.ToTensor()])

# Create the dataset and dataloader
dataset = ShipDataset("/content/dataset/ships-aerial-images/train", transforms=transform)
data_loader = DataLoader(dataset, batch_size=8, shuffle=True, num_workers=0, collate_fn=collate_fn)

# Move model to the right device
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
model.to(device)

# Construct an optimizer
params = [p for p in model.parameters() if p.requires_grad]
optimizer = torch.optim.SGD(params, lr=0.005, momentum=0.9, weight_decay=0.0005)

# Learning rate scheduler
lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=3, gamma=0.1)

# Number of epochs
num_epochs = 1

# Training loop
for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    progress_bar = tqdm(data_loader, desc=f"Epoch {epoch+1}/{num_epochs}", unit="batch")

    for i, (images, targets) in enumerate(progress_bar):
        images = list(image.to(device) for image in images if image is not None)
        targets = [{k: v.to(device) for k, v in t.items()} for t in targets if t is not None]

        # Forward pass
        loss_dict = model(images, targets)

        # Compute losses
        losses = sum(loss for loss in loss_dict.values())

        # Backward pass
        optimizer.zero_grad()
        losses.backward()
        optimizer.step()

        # Update running loss
        running_loss += losses.item()

        # Update progress bar
        progress_bar.set_postfix(loss=running_loss/(i+1))

    # Update the learning rate
    lr_scheduler.step()

print("Training complete!")

Epoch 1/1: 100%|██████████| 1213/1213 [30:28<00:00,  1.51s/batch, loss=0.295]

Training complete!





In [20]:
# Function to calculate IoU (Intersection over Union)
def calculate_iou(box1, box2):
    """Calculates Intersection over Union (IoU) for two bounding boxes.

    Args:
        box1: [xmin, ymin, xmax, ymax]
        box2: [xmin, ymin, xmax, ymax]

    Returns:
        iou: IoU value (float)
    """
    x1 = max(box1[0], box2[0])
    y1 = max(box1[1], box2[1])
    x2 = min(box1[2], box2[2])
    y2 = min(box1[3], box2[3])

    # Calculate area of intersection
    intersection_area = max(0, x2 - x1 + 1) * max(0, y2 - y1 + 1)

    # Calculate area of both bounding boxes
    box1_area = (box1[2] - box1[0] + 1) * (box1[3] - box1[1] + 1)
    box2_area = (box2[2] - box2[0] + 1) * (box2[3] - box2[1] + 1)

    # Calculate IoU
    iou = intersection_area / float(box1_area + box2_area - intersection_area)
    return iou

# Function to evaluate the model
@torch.no_grad()
def evaluate(model, data_loader, device, iou_threshold=0.5):
    model.eval()
    all_predictions = []

    for images, targets in tqdm(data_loader, desc="Evaluating"):
        images = list(image.to(device) for image in images if image is not None)
        targets = [{k: v.to(device) for k, v in t.items()} for t in targets if t is not None]

        predictions = model(images)

        for i, prediction in enumerate(predictions):
            confident_predictions = prediction['scores'] > 0.5
            boxes = prediction['boxes'][confident_predictions].cpu().numpy()
            labels = prediction['labels'][confident_predictions].cpu().numpy()
            scores = prediction['scores'][confident_predictions].cpu().numpy()

            image_predictions = []
            for box, label, score in zip(boxes, labels, scores):
                image_predictions.append({
                    'bbox': box.tolist(),
                    'category_id': label,
                    'score': score
                })

            all_predictions.append({
                'image_id': targets[i]['image_id'].item(),
                'predictions': image_predictions
            })

    return all_predictions

In [None]:


# Calculate mAP
def calculate_map(predictions, ground_truth, iou_threshold=0.5):
    """Calculates mean Average Precision (mAP) for a single class.
    """
    all_gt_boxes = []
    for gt in ground_truth:
        for ann in gt['annotations']:
            all_gt_boxes.append({
                'image_id': gt['id'],
                'bbox': ann['bbox'],
                'category_id': ann['category_id']
            })

    # No need to iterate through categories, as we have only one
    # Sort predictions by confidence score (descending)
    sorted_predictions = sorted(
        [p for pred in predictions for p in pred['predictions']],
        key=lambda x: x['score'],
        reverse=True
    )

    tp = np.zeros(len(sorted_predictions))
    fp = np.zeros(len(sorted_predictions))
    num_gt_boxes = len(all_gt_boxes)

    # Calculate TP and FP for each prediction
    for i, prediction in enumerate(sorted_predictions):
        matched = False
        for j, gt_box in enumerate(all_gt_boxes):
            if prediction['image_id'] == gt_box['image_id'] and calculate_iou(prediction['bbox'], gt_box['bbox']) >= iou_threshold:
                matched = True
                all_gt_boxes.pop(j)  # Remove the matched ground truth box
                break

        if matched:
            tp[i] = 1
        else:
            fp[i] = 1

    # Calculate precision and recall
    tp_cumulative = np.cumsum(tp)
    fp_cumulative = np.cumsum(fp)
    precision = tp_cumulative / (tp_cumulative + fp_cumulative + 1e-10)
    recall = tp_cumulative / (num_gt_boxes + 1e-10)

    # Calculate AP using the precision-recall curve
    ap = calculate_ap_from_pr(precision, recall)
    return ap

def calculate_ap_from_pr(precision, recall):
    """Calculates Average Precision (AP) from precision and recall values.
    """
    ap = 0.0
    for i in range(len(recall) - 1):
        ap += (recall[i + 1] - recall[i]) * precision[i + 1]
    return ap

    # Evaluation
# Load the validation dataset
val_dataset = ShipDataset("/content/dataset/ships-aerial-images/valid", transforms=transform)
val_data_loader = DataLoader(val_dataset, batch_size=8, shuffle=False, num_workers=0, collate_fn=collate_fn)

# Evaluate the model
predictions = evaluate(model, val_data_loader, device)

# Load ground truth annotations for the validation set (in COCO format)
with open('/content/dataset/ships-aerial-images/valid/annotations_coco/annotations_coco.json', 'r') as f:
    gt_data = json.load(f)

# Calculate and print mAP
map_score = calculate_map(predictions, gt_data['images'])
print(f"mAP: {map_score:.4f}")

Evaluating:   4%|▎         | 10/271 [00:06<02:44,  1.59it/s]