In [1]:
import os
import torch
from torch.utils.data import Dataset
import rasterio

class YOLOTifDataset(Dataset):
    def __init__(self, root_dir, transforms=None):
        """
        Args:
            root_dir (str): Directory containing `images/` and `labels/` subdirectories.
            transforms (callable, optional): A function/transform to apply to the images.
        """
        self.image_dir = os.path.join(root_dir, "images")
        self.label_dir = os.path.join(root_dir, "labels")
        self.transforms = transforms
        self.image_filenames = [f for f in os.listdir(self.image_dir) if f.endswith('.tif')]

    def __len__(self):
        return len(self.image_filenames)

    def __getitem__(self, idx):
        # Load image
        img_filename = self.image_filenames[idx]
        img_path = os.path.join(self.image_dir, img_filename)

        with rasterio.open(img_path) as src:
            img = src.read()  # Read all channels

        # Convert image to tensor and normalize to [0, 1]
        img = torch.tensor(img).float() / 255.0

        if img.ndimension() == 2:  # Handle single-channel images
            img = img.unsqueeze(0)

        # Load YOLO annotation
        label_filename = img_filename.replace('.tif', '.txt')
        label_path = os.path.join(self.label_dir, label_filename)

        boxes = []
        labels = []

        if os.path.exists(label_path):
            with open(label_path, 'r') as f:
                for line in f:
                    values = list(map(float, line.strip().split()))
                    class_id = int(values[0])
                    points = values[1:]  # Remaining are x, y pairs for the polygon

                    # Reshape to a list of (x, y) points
                    polygon_points = [(points[i] * img.shape[2], points[i + 1] * img.shape[1]) for i in range(0, len(points), 2)]

                    # Extract bounding box from polygon points
                    x_coords = [p[0] for p in polygon_points]
                    y_coords = [p[1] for p in polygon_points]

                    x_min = min(x_coords)
                    y_min = min(y_coords)
                    x_max = max(x_coords)
                    y_max = max(y_coords)

                    boxes.append([x_min, y_min, x_max, y_max])
                    labels.append(class_id)

        boxes = torch.tensor(boxes, dtype=torch.float32)
        labels = torch.tensor(labels, dtype=torch.int64)

        target = {"boxes": boxes, "labels": labels}

        if self.transforms:
            img = self.transforms(img)

        return img, target


In [2]:
from torchvision.models.detection import fasterrcnn_resnet50_fpn_v2

# Load the pre-trained model
model = fasterrcnn_resnet50_fpn_v2(weights="DEFAULT")

# Modify the first convolutional layer for 4-channel input
model.backbone.body.conv1 = torch.nn.Conv2d(4, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)

model.transform.image_mean = [0.485, 0.456, 0.406, 0.406]
model.transform.image_std = [0.229, 0.224, 0.225, 0.225]


In [3]:
from torch.utils.data import DataLoader

# Directories for train and validation sets
train_dir = "datasets/01m-All-1/train"
valid_dir = "datasets/01m-All-1/valid"

# Initialize datasets
train_dataset = YOLOTifDataset(train_dir)
valid_dataset = YOLOTifDataset(valid_dir)

# Define DataLoader with appropriate batch size and collate function
def collate_fn(batch):
    return tuple(zip(*batch))

train_loader = DataLoader(train_dataset, batch_size=2, shuffle=True, collate_fn=collate_fn)
valid_loader = DataLoader(valid_dataset, batch_size=2, shuffle=False, collate_fn=collate_fn)

# Move model to GPU
model = model.to("cuda")

# Training loop
optimizer = torch.optim.SGD(model.parameters(), lr=0.005, momentum=0.9, weight_decay=0.0005)
num_epochs = 10
model.train()

for epoch in range(num_epochs):
    total_loss = 0
    for images, targets in train_loader:
        # Move data to the same device as the model
        device = "cuda"
        images = [img.to(device) for img in images]
        targets = [{k: v.to(device) for k, v in t.items()} for t in targets]

        # Forward pass
        loss_dict = model(images, targets)
        losses = sum(loss for loss in loss_dict.values())

        # Backpropagation
        optimizer.zero_grad()
        losses.backward()
        optimizer.step()

        total_loss += losses.item()

    print(f"Epoch {epoch + 1}, Loss: {total_loss / len(train_loader)}")

  dataset = DatasetReader(path, driver=driver, sharing=sharing, **kwargs)


Epoch 1, Loss: 0.2383308510024902
Epoch 2, Loss: 0.16398950927902625
Epoch 3, Loss: 0.12231005060993418
Epoch 4, Loss: 0.08927100112873919
Epoch 5, Loss: 0.07185531217498775
Epoch 6, Loss: 0.05955997153032053
Epoch 7, Loss: 0.04961862116153175
Epoch 8, Loss: 0.03981745494175938
Epoch 9, Loss: 0.03453000266947843
Epoch 10, Loss: 0.03228045784009184


In [4]:
!pip install torchmetrics

Collecting torchmetrics
  Downloading torchmetrics-1.6.1-py3-none-any.whl.metadata (21 kB)
Collecting lightning-utilities>=0.8.0 (from torchmetrics)
  Downloading lightning_utilities-0.11.9-py3-none-any.whl.metadata (5.2 kB)
Downloading torchmetrics-1.6.1-py3-none-any.whl (927 kB)
   ---------------------------------------- 0.0/927.3 kB ? eta -:--:--
   -- ------------------------------------- 61.4/927.3 kB 1.7 MB/s eta 0:00:01
   ----------- ---------------------------- 276.5/927.3 kB 3.4 MB/s eta 0:00:01
   ----------------------------- ---------- 686.1/927.3 kB 5.4 MB/s eta 0:00:01
   ---------------------------------------- 927.3/927.3 kB 5.9 MB/s eta 0:00:00
Downloading lightning_utilities-0.11.9-py3-none-any.whl (28 kB)
Installing collected packages: lightning-utilities, torchmetrics
Successfully installed lightning-utilities-0.11.9 torchmetrics-1.6.1


In [10]:
# Save the model
torch.save(model.state_dict(), "fasterrcnn.pth")

In [None]:
# Test directory
test_dir = "datasets/01m-All-1/test"

# Initialize test dataset and dataloader
test_dataset = YOLOTifDataset(test_dir)
test_loader = DataLoader(test_dataset, batch_size=2, shuffle=False, collate_fn=collate_fn)

from torchmetrics.detection import MeanAveragePrecision
from scipy.spatial.distance import euclidean

# Initialize the Mean Average Precision metric
metric = MeanAveragePrecision(iou_thresholds=[0.5, 0.95])

model = torch.load("fasterrcnn.pth")

model.eval()
all_euclidean_distances = []

with torch.no_grad():
    for images, targets in test_loader:
        images = [img.to("cuda") for img in images]
        targets = [{k: v.to("cuda") for k, v in t.items()} for t in targets]

        # Get predictions from the model
        outputs = model(images)

        # Convert predictions and targets to torchmetrics format
        preds = []
        gts = []

        for i in range(len(outputs)):
            pred_boxes = outputs[i]['boxes'].detach().cpu()
            pred_scores = outputs[i]['scores'].detach().cpu()
            pred_labels = outputs[i]['labels'].detach().cpu()

            target_boxes = targets[i]['boxes'].detach().cpu()
            target_labels = targets[i]['labels'].detach().cpu()

            # Append predictions and targets for torchmetrics
            preds.append({
                "boxes": pred_boxes,
                "scores": pred_scores,
                "labels": pred_labels
            })
            gts.append({
                "boxes": target_boxes,
                "labels": target_labels
            })

            # Compute Euclidean distances between midpoints of predicted and target boxes
            for p_box, t_box in zip(pred_boxes, target_boxes):
                pred_mid = [(p_box[0] + p_box[2]) / 2, (p_box[1] + p_box[3]) / 2]
                target_mid = [(t_box[0] + t_box[2]) / 2, (t_box[1] + t_box[3]) / 2]
                dist = euclidean(pred_mid, target_mid)
                all_euclidean_distances.append(dist)

        # Update the metric
        metric.update(preds, gts)

# Compute final metrics
final_metrics = metric.compute()
print("Test Set Evaluation Metrics:")
print(f"mAP@0.5: {final_metrics['map_50']:.4f}")
print(f"mAP@0.5:0.95: {final_metrics['map']:.4f}")

# Precision and Recall
print(f"Precision: {final_metrics['map_50_per_class_precision']}")
print(f"Recall: {final_metrics['map_50_per_class_recall']}")

# Average Euclidean Distance between Midpoints
if all_euclidean_distances:
    avg_euclidean_distance = sum(all_euclidean_distances) / len(all_euclidean_distances)
    print(f"Average Euclidean Distance between Midpoints: {avg_euclidean_distance:.4f}")


  dataset = DatasetReader(path, driver=driver, sharing=sharing, **kwargs)


ValueError: numpy.dtype size changed, may indicate binary incompatibility. Expected 96 from C header, got 88 from PyObject

In [9]:
metric.compute()

ValueError: numpy.dtype size changed, may indicate binary incompatibility. Expected 96 from C header, got 88 from PyObject