In [None]:
!pip install fiftyone

In [None]:
import os
import shutil
import gc
import fiftyone as fo
import fiftyone.zoo as foz
import fiftyone.types as fot


classes = ["Dog", "Cat", "Deer", "Bear", "Bird", "Person", "Car", "Truck", "Airplane"]

base_export_dir = "/content/dataset"

if not os.path.exists(base_export_dir):
    os.makedirs(base_export_dir)

for cls in classes:
    print(f"Downloading Open Images subset for class '{cls}'...")

    dataset = foz.load_zoo_dataset(
        "open-images-v6",
        split="train",
        classes=[cls],
        max_samples=500,  # try with fewer samples for testing
        shuffle=True,
    )

    export_dir = os.path.join(base_export_dir, cls)

    dataset.export(
        export_dir=export_dir,
        dataset_type=fot.VOCDetectionDataset,
        label_field="detections",
        export_media="move",
    )



    # Rename folders to meet your file structure
    jpeg_images_dir = os.path.join(export_dir, "data")
    annotations_dir = os.path.join(export_dir, "labels")

    new_images_dir = os.path.join(export_dir, "images")
    if os.path.exists(jpeg_images_dir):
        if os.path.exists(new_images_dir):
            shutil.rmtree(new_images_dir)
        os.rename(jpeg_images_dir, new_images_dir)

    new_annotations_dir = os.path.join(export_dir, "annotations")
    if os.path.exists(annotations_dir):
        if os.path.exists(new_annotations_dir):
            shutil.rmtree(new_annotations_dir)
        os.rename(annotations_dir, new_annotations_dir)

    print(f"Exported and organized dataset for '{cls}' at {export_dir}")

    # Delete the dataset and free memory
    dataset.delete()
    gc.collect()

print("Dataset processing complete.")



In [None]:
# Import necessary libraries
import os
import torch
import torchvision
import torchvision.transforms as T
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor
from torch.utils.data import Dataset, DataLoader
from PIL import Image
import matplotlib.pyplot as plt
import matplotlib.patches as patches
import xml.etree.ElementTree as ET
import json

#########################################
# 1. Define the Classes Mapping
#########################################
# Mapping from string class names to integer labels.
# (0 is reserved for background; our classes start at 1.)
classes_map = {
    "Dog": 1,
    "Cat": 2,
    "Deer": 3,
    "Bear": 4,
    "Bird": 5,
    "Person": 6,
    "Car": 7,
    "Truck": 8,
    "Airplane": 9,
}
# Reverse mapping for inference: from integer label to string class name.
reverse_classes_map = {v: k for k, v in classes_map.items()}

#########################################
# 2. Define a Custom Dataset Class to Combine Multiple Datasets
#########################################
class CustomDataset(Dataset):
    """
    A custom dataset class for object detection that combines multiple datasets.
    Each dataset must follow this structure:

        dataset_root/
            images/       -> image files
            annotations/  -> annotation files (XML, JSON, or text)

    Annotation parsing:
      - XML: expects an <annotation> element with <object>/<name> and <bndbox>.
      - JSON: expects a dict with key "objects" holding list of objects.
      - Text: each line should be "xmin ymin xmax ymax label".

    The classes_map converts string class names into integer labels.
    Additionally, this class returns meta-information (image and annotation paths)
    to help track errors.
    """
    def __init__(self, roots, transforms=None, classes_map=None):
        # Ensure roots is a list
        if isinstance(roots, str):
            roots = [roots]
        self.samples = []  # list of tuples: (img_path, ann_path)
        self.transforms = transforms
        self.classes_map = classes_map

        # Gather image/annotation pairs from each dataset root.
        for root in roots:
            images_dir = os.path.join(root, "images")
            annotations_dir = os.path.join(root, "annotations")
            if not os.path.isdir(images_dir) or not os.path.isdir(annotations_dir):
                print(f"Warning: Missing 'images' or 'annotations' in {root}. Skipping this root.")
                continue
            imgs = sorted(os.listdir(images_dir))
            anns = sorted(os.listdir(annotations_dir))
            for img_file, ann_file in zip(imgs, anns):
                img_path = os.path.join(images_dir, img_file)
                ann_path = os.path.join(annotations_dir, ann_file)
                self.samples.append((img_path, ann_path))
        print(f"Total samples loaded: {len(self.samples)}")

    def parse_annotation(self, ann_path):
        boxes = []
        labels = []
        ext = os.path.splitext(ann_path)[1].lower()
        if ext == '.xml':
            tree = ET.parse(ann_path)
            root_elem = tree.getroot()
            for obj in root_elem.findall('object'):
                name = obj.find('name').text
                bndbox = obj.find('bndbox')
                xmin = float(bndbox.find('xmin').text)
                ymin = float(bndbox.find('ymin').text)
                xmax = float(bndbox.find('xmax').text)
                ymax = float(bndbox.find('ymax').text)
                boxes.append([xmin, ymin, xmax, ymax])
                if self.classes_map and name in self.classes_map:
                    label = self.classes_map[name]
                else:
                    try:
                        label = int(name)
                    except:
                        label = 0
                labels.append(label)
        elif ext == '.json':
            with open(ann_path, 'r') as f:
                data = json.load(f)
            for obj in data.get("objects", []):
                name = obj.get("name", "")
                bndbox = obj.get("bndbox", {})
                xmin = float(bndbox.get("xmin", 0))
                ymin = float(bndbox.get("ymin", 0))
                xmax = float(bndbox.get("xmax", 0))
                ymax = float(bndbox.get("ymax", 0))
                boxes.append([xmin, ymin, xmax, ymax])
                if self.classes_map and name in self.classes_map:
                    label = self.classes_map[name]
                else:
                    try:
                        label = int(name)
                    except:
                        label = 0
                labels.append(label)
        else:
            with open(ann_path, 'r') as f:
                for line in f:
                    parts = line.strip().split()
                    if len(parts) != 5:
                        continue
                    xmin, ymin, xmax, ymax = map(float, parts[:4])
                    label = int(parts[4])
                    boxes.append([xmin, ymin, xmax, ymax])
                    labels.append(label)
        return boxes, labels

    def __getitem__(self, idx):
        img_path, ann_path = self.samples[idx]
        img = Image.open(img_path).convert("RGB")
        boxes, labels = self.parse_annotation(ann_path)

        # Convert lists to torch tensors.
        boxes = torch.as_tensor(boxes, dtype=torch.float32)
        labels = torch.as_tensor(labels, dtype=torch.int64)
        target = {"boxes": boxes, "labels": labels}

        # Return meta info along with image and target
        meta = {"img_path": img_path, "ann_path": ann_path}

        if self.transforms is not None:
            img = self.transforms(img)

        return img, target, meta

    def __len__(self):
        return len(self.samples)

#########################################
# 3. Define Data Transforms
#########################################
def get_transform(train):
    transforms = [T.ToTensor()]
    if train:
        transforms.append(T.RandomHorizontalFlip(0.5))
    return T.Compose(transforms)

#########################################
# 4. Prepare the Combined Dataset and DataLoader
#########################################
# Define dataset roots (one per class). (Assuming on Colab under /content/dataset)
dataset_roots = [
    "/content/dataset/Dog",
    "/content/dataset/Cat",
    "/content/dataset/Deer",
    "/content/dataset/Bear",
    "/content/dataset/Bird",
    "/content/dataset/Person",
    "/content/dataset/Car",
    "/content/dataset/Truck",
    "/content/dataset/Airplane",
]

# Create the combined dataset.
dataset = CustomDataset(dataset_roots, transforms=get_transform(train=True), classes_map=classes_map)

# Update the collate function to handle the extra meta info.
def collate_fn(batch):
    images, targets, metas = zip(*batch)
    return list(images), list(targets), list(metas)

data_loader = DataLoader(dataset, batch_size=2, shuffle=True, num_workers=4, collate_fn=collate_fn)

#########################################
# 5. Build the Object Detection Model
#########################################
def get_model(num_classes):
    model = torchvision.models.detection.fasterrcnn_resnet50_fpn(pretrained=True)
    in_features = model.roi_heads.box_predictor.cls_score.in_features
    model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)
    return model

num_classes = 1 + len(classes_map)  # background + defined classes.
model = get_model(num_classes)
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
model.to(device)

#########################################
# 6. Define the Optimizer and Learning Rate Scheduler
#########################################
params = [p for p in model.parameters() if p.requires_grad]
optimizer = torch.optim.SGD(params, lr=0.005, momentum=0.9, weight_decay=0.0005)
lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=3, gamma=0.1)

#########################################
# 7. Training Loop with Exception Handling
#########################################
error_log = []  # To store errors with corresponding file paths.

num_epochs = 10
for epoch in range(num_epochs):
    model.train()
    iteration = 0
    for images, targets, metas in data_loader:
        # Attempt to process the entire batch.
        try:
            images_device = [img.to(device) for img in images]
            targets_device = [{k: v.to(device) for k, v in t.items()} for t in targets]
            loss_dict = model(images_device, targets_device)
            loss = sum(loss for loss in loss_dict.values())
        except Exception as e:
            # If batch-level processing fails, try processing each sample individually.
            valid_images = []
            valid_targets = []
            for i in range(len(images)):
                try:
                    img_i = images[i].to(device)
                    target_i = {k: v.to(device) for k, v in targets[i].items()}
                    # Process single sample.
                    _ = model([img_i], [target_i])
                    valid_images.append(images[i])
                    valid_targets.append(targets[i])
                except Exception as ex:
                    error_info = {
                        "img_path": metas[i]["img_path"],
                        "ann_path": metas[i]["ann_path"],
                        "error": str(ex)
                    }
                    error_log.append(error_info)
                    print(f"Error processing sample: {error_info}")
            if len(valid_images) == 0:
                print("Skipping entire batch due to errors.")
                continue
            images_device = [img.to(device) for img in valid_images]
            targets_device = [{k: v.to(device) for k, v in t.items()} for t in valid_targets]
            loss_dict = model(images_device, targets_device)
            loss = sum(loss for loss in loss_dict.values())

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        if iteration % 10 == 0:
            print(f"Epoch {epoch}, Iteration {iteration}, Loss: {loss.item():.4f}")
        iteration += 1

    lr_scheduler.step()
    print(f"Epoch {epoch} completed.")

print("Training complete.")
print(f"Total error samples encountered: {len(error_log)}")
if error_log:
    print("Error details:")
    for err in error_log:
        print(err)

# Save the trained model weights.
torch.save(model.state_dict(), "fasterrcnn_model.pth")

#########################################
# 8. Inference: Real-Time Object Detection Demo
#########################################
def predict_and_plot(image_path, model, device, threshold=0.5):
    model.eval()  # Set the model to evaluation mode
    img = Image.open(image_path).convert("RGB")
    transform = T.Compose([T.ToTensor()])
    img_tensor = transform(img).to(device)

    with torch.no_grad():
        prediction = model([img_tensor])

    fig, ax = plt.subplots(1, figsize=(12, 8))
    ax.imshow(img)

    # Iterate over detected bounding boxes.
    for i, box in enumerate(prediction[0]['boxes']):
        score = prediction[0]['scores'][i].item()
        if score > threshold:
            # Move the tensor to CPU and convert to numpy
            box_np = box.cpu().numpy()
            xmin, ymin, xmax, ymax = box_np
            rect = patches.Rectangle((xmin, ymin), xmax - xmin, ymax - ymin,
                                     linewidth=2, edgecolor='r', facecolor='none')
            ax.add_patch(rect)
            label_int = prediction[0]['labels'][i].item()
            label_str = reverse_classes_map.get(label_int, "N/A")
            ax.text(xmin, ymin, f"{label_str} {score:.2f}", color='yellow', fontsize=12)
    plt.axis('off')
    plt.show()



# Example usage: Run inference on random sample images.
# Assuming your combined dataset is in the variable 'dataset' (an instance of CustomDataset)
# and that dataset.samples is a list of (img_path, ann_path) tuples.
all_img_paths = [sample[0] for sample in dataset.samples]

# Randomly select 10 images
random_img_paths = random.sample(all_img_paths, 10)

# Run inference on each selected image
for img_path in random_img_paths:
    print(f"Processing: {img_path}")
    predict_and_plot(img_path, model, device)



In [None]:
import random
import matplotlib.pyplot as plt

# Function to perform inference and plot a single image.
def predict_and_plot(image_path, model, device, threshold=0.5):
    model.eval()  # Set the model to evaluation mode
    img = Image.open(image_path).convert("RGB")
    transform = T.Compose([T.ToTensor()])
    img_tensor = transform(img).to(device)

    with torch.no_grad():
        prediction = model([img_tensor])

    fig, ax = plt.subplots(1, figsize=(12, 8))
    ax.imshow(img)

    # Iterate over detected bounding boxes.
    for i, box in enumerate(prediction[0]['boxes']):
        score = prediction[0]['scores'][i].item()
        if score > threshold:
            # Move the tensor to CPU and convert to numpy
            box_np = box.cpu().numpy()
            xmin, ymin, xmax, ymax = box_np
            rect = patches.Rectangle((xmin, ymin), xmax - xmin, ymax - ymin,
                                     linewidth=2, edgecolor='r', facecolor='none')
            ax.add_patch(rect)
            label_int = prediction[0]['labels'][i].item()
            label_str = reverse_classes_map.get(label_int, "N/A")
            ax.text(xmin, ymin, f"{label_str} {score:.2f}", color='yellow', fontsize=12)
    plt.axis('off')
    plt.show()

# Assuming your combined dataset is in the variable 'dataset' (an instance of CustomDataset)
# and that dataset.samples is a list of (img_path, ann_path) tuples.
all_img_paths = [sample[0] for sample in dataset.samples]

# Randomly select 10 images
random_img_paths = random.sample(all_img_paths, 10)

# Run inference on each selected image
for img_path in random_img_paths:
    print(f"Processing: {img_path}")
    predict_and_plot(img_path, model, device)


Output hidden; open in https://colab.research.google.com to view.