In [1]:
# Import necessary libraries
import os
import torch
import torchvision
import numpy as np
import matplotlib.pyplot as plt
from PIL import Image
import json
from tqdm import tqdm

# Custom Dataset Class

In [2]:
class TACODataset(torch.utils.data.Dataset):
    def __init__(self, root, annotation_file, transforms=None):
        self.root = root  # Path to 'TACO/data' directory
        self.transforms = transforms
        # Load annotations
        with open(annotation_file) as f:
            self.coco = json.load(f)
        self.image_info = {img['id']: img for img in self.coco['images']}
        self.annotations = self.coco['annotations']
        # Map category IDs to labels
        self.cat_id_to_label = {cat['id']: idx + 1 for idx, cat in enumerate(self.coco['categories'])}
        self.num_classes = len(self.cat_id_to_label) + 1  # Including background
        # Group annotations by image
        self.img_id_to_annots = {}
        for ann in self.annotations:
            self.img_id_to_annots.setdefault(ann['image_id'], []).append(ann)
        self.img_ids = list(self.image_info.keys())

    def __getitem__(self, idx):
        img_id = self.img_ids[idx]
        img_info = self.image_info[img_id]
        # The 'file_name' includes the batch directory, e.g., 'batch_1/00001.jpg'
        img_path = os.path.join(self.root, img_info['file_name'])
        img = Image.open(img_path).convert("RGB")
        annots = self.img_id_to_annots.get(img_id, [])
        boxes = []
        labels = []
        areas = []
        iscrowd = []
        for ann in annots:
            bbox = ann['bbox']
            x_min, y_min, width, height = bbox
            boxes.append([x_min, y_min, x_min + width, y_min + height])
            labels.append(self.cat_id_to_label[ann['category_id']])
            areas.append(width * height)
            iscrowd.append(0)
        if boxes:
            boxes = torch.as_tensor(boxes, dtype=torch.float32)
            labels = torch.as_tensor(labels, dtype=torch.int64)
            areas = torch.as_tensor(areas, dtype=torch.float32)
            iscrowd = torch.as_tensor(iscrowd, dtype=torch.int64)
        else:
            # If no annotations, create empty tensors
            boxes = torch.zeros((0, 4), dtype=torch.float32)
            labels = torch.zeros((0,), dtype=torch.int64)
            areas = torch.zeros((0,), dtype=torch.float32)
            iscrowd = torch.zeros((0,), dtype=torch.int64)
        target = {
            'boxes': boxes,
            'labels': labels,
            'image_id': torch.tensor([idx]),
            'area': areas,
            'iscrowd': iscrowd
        }
        if self.transforms:
            img, target = self.transforms(img, target)
        return img, target

    def __len__(self):
        return len(self.img_ids)


# Transformation to 224 x 224 pixel

In [3]:
import torchvision.transforms as T
import torchvision.transforms.functional as F

class Compose(object):
    def __init__(self, transforms):
        self.transforms = transforms

    def __call__(self, image, target):
        for t in self.transforms:
            image, target = t(image, target)
        return image, target

class Resize(object):
    def __init__(self, size):
        self.size = size  # (height, width)

    def __call__(self, image, target):
        # Get original size
        orig_width, orig_height = image.size
        # Resize image
        image = F.resize(image, self.size)
        # Calculate scale factors
        scale_x = self.size[1] / orig_width
        scale_y = self.size[0] / orig_height
        # Resize bounding boxes
        boxes = target['boxes']
        boxes = boxes * torch.tensor([scale_x, scale_y, scale_x, scale_y])
        target['boxes'] = boxes
        return image, target

class ToTensor(object):
    def __call__(self, image, target):
        image = F.to_tensor(image)
        return image, target

def get_transform(train):
    transforms = []
    transforms.append(Resize((224, 224)))  # Resize to 224x224
    transforms.append(ToTensor())
    if train:
        # Add data augmentation here if needed
        pass
    return Compose(transforms)


# Preparing the model

In [4]:
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor

def get_model(num_classes):
    # Load a pre-trained model for classification and return
    # only the features
    model = torchvision.models.detection.fasterrcnn_resnet50_fpn(weights='DEFAULT')
    # Get the number of input features for the classifier
    in_features = model.roi_heads.box_predictor.cls_score.in_features
    # Replace the pre-trained head with a new one
    model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)
    return model


# Training and evaluation

In [5]:
from torch.utils.data import DataLoader
import utils
import engine
import torch.optim as optim
from torch.optim.lr_scheduler import StepLR

# Paths to data and annotations
dataset_root = 'TACO-master/data'  # Point to 'TACO/data' directory
annotation_file = 'TACO-master/data/annotations.json'

# Create the dataset
dataset = TACODataset(dataset_root, annotation_file, transforms=get_transform(train=True))
dataset_test = TACODataset(dataset_root, annotation_file, transforms=get_transform(train=False))

# Split the dataset into train and test set
torch.manual_seed(1)
indices = torch.randperm(len(dataset)).tolist()
test_size = int(len(indices) * 0.2)
dataset = torch.utils.data.Subset(dataset, indices[:-test_size])
dataset_test = torch.utils.data.Subset(dataset_test, indices[-test_size:])

# Define training and validation data loaders
data_loader = DataLoader(dataset, batch_size=2, shuffle=True, num_workers=4, collate_fn=utils.collate_fn)
data_loader_test = DataLoader(dataset_test, batch_size=1, shuffle=False, num_workers=4, collate_fn=utils.collate_fn)

# Get the number of classes
num_classes = dataset.dataset.num_classes

# Initialize the model
model = get_model(num_classes)

# Move model to the right device
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
model.to(device)

# Construct an optimizer
params = [p for p in model.parameters() if p.requires_grad]
optimizer = optim.SGD(params, lr=0.005, momentum=0.9, weight_decay=0.0005)

# Learning rate scheduler
lr_scheduler = StepLR(optimizer, step_size=3, gamma=0.1)


In [6]:
import torch
print("Is CUDA available:", torch.cuda.is_available())
print("CUDA device name:", torch.cuda.get_device_name(0) if torch.cuda.is_available() else "No GPU found")


Is CUDA available: True
CUDA device name: NVIDIA GeForce GTX 1050


In [None]:
# Number of epochs
num_epochs = 10

for epoch in range(num_epochs):
    print(f"\nEpoch {epoch + 1}/{num_epochs}")
    
    # Training one epoch with tqdm progress bar
    with tqdm(total=len(data_loader), desc="Training", unit="batch") as pbar:
        engine.train_one_epoch(
            model,
            optimizer,
            data_loader,
            device,
            epoch,
            print_freq=10,
            pbar=pbar,  # Pass tqdm progress bar
        )
    
    # Update the learning rate
    lr_scheduler.step()
    
    # Evaluate on the test dataset with tqdm progress bar
    print("\nEvaluating...")
    with tqdm(total=len(data_loader_test), desc="Evaluating", unit="batch") as pbar:
        engine.evaluate(model, data_loader_test, device=device, pbar=pbar)

# Save the trained model
torch.save(model.state_dict(), 'taco_faster_rcnn.pth')
print("\nTraining completed and model saved.")


Epoch 1/10


Training:   0%|          | 0/600 [00:00<?, ?batch/s]

# Visualize the result

In [None]:
def visualize_predictions(model, dataset, device, num_images=5, score_threshold=0.5):
    model.eval()
    data_loader = DataLoader(dataset, batch_size=1, shuffle=True, collate_fn=utils.collate_fn)
    images_so_far = 0
    for imgs, targets in data_loader:
        if images_so_far >= num_images:
            break
        imgs = list(img.to(device) for img in imgs)
        outputs = model(imgs)
        for img, output in zip(imgs, outputs):
            images_so_far += 1
            plt.figure(figsize=(8, 8))
            img = img.cpu().permute(1, 2, 0).numpy()
            plt.imshow(img)
            ax = plt.gca()
            boxes = output['boxes'].cpu().detach().numpy()
            scores = output['scores'].cpu().detach().numpy()
            labels = output['labels'].cpu().detach().numpy()
            for box, score, label in zip(boxes, scores, labels):
                if score > score_threshold:
                    xmin, ymin, xmax, ymax = box
                    width, height = xmax - xmin, ymax - ymin
                    rect = plt.Rectangle((xmin, ymin), width, height, fill=False, color='red', linewidth=2)
                    ax.add_patch(rect)
                    category_id = [key for key, value in dataset.dataset.cat_id_to_label.items() if value == label][0]
                    category_name = [cat['name'] for cat in dataset.dataset.coco['categories'] if cat['id'] == category_id][0]
                    plt.text(xmin, ymin - 10, f'{category_name}: {score:.2f}', color='red', fontsize=12)
            plt.axis('off')
            plt.show()


In [None]:
# Visualize the predictions
visualize_predictions(model, dataset_test, device, num_images=5, score_threshold=0.5)