In [1]:
from detr_dataset import COCODataset
from torchvision.transforms import ToTensor
from torch.utils.data import DataLoader

In [None]:
import json
from datasets import Dataset, DatasetDict
from PIL import Image
import os
from sklearn.model_selection import train_test_split

# Define DataLoader function
def load_coco_dataset(json_path, image_dir):
    print("Loading COCO JSON file...")
    # Read COCO JSON file
    with open(json_path, 'r') as f:
        coco_data = json.load(f)
    print("Loaded COCO JSON file successfully.")

    # Extract annotations and images information
    annotations = coco_data['annotations']
    images = {img['id']: img for img in coco_data['images']}
    print(f"Found {len(images)} images and {len(annotations)} annotations.")

    # Translate category names to English
    for category in coco_data['categories']:
        if 'name' in category:
            # Add your specific translation logic here if necessary
            category['name'] = category['name'].lower()  # Example: convert to lowercase

    # Construct dataset list
    dataset = []
    for i, ann in enumerate(annotations):
        if i % 100 == 0:
            print(f"Processing annotation {i}/{len(annotations)}...")
        img_info = images[ann['image_id']]
        
        # Get image-related information
        img_path = os.path.join(image_dir, img_info['file_name'])
        try:
            image = Image.open(img_path).convert("RGB")
        except Exception as e:
            print(f"Error loading image {img_path}: {e}")
            continue

        # Correct area calculation from bbox
        bbox = ann['bbox']  # Format: [x_min, y_min, width, height]
        calculated_area = bbox[2] * bbox[3]

        # Construct object information
        obj_info = {
            'id': ann['category_id'],
            'area': calculated_area,  # Use calculated bbox area
            'bbox': bbox,
            'category': coco_data['categories'][ann['category_id'] - 1]['name']
        }

        # If the image already exists, append the corresponding object
        existing = next((item for item in dataset if item['image_id'] == img_info['id']), None)
        if existing:
            existing['objects']['id'].append(obj_info['id'])
            existing['objects']['area'].append(obj_info['area'])
            existing['objects']['bbox'].append(obj_info['bbox'])
            existing['objects']['category'].append(obj_info['category'])
        else:
            dataset.append({
                'image': image,
                'image_id': img_info['id'],
                'width': img_info['width'],
                'height': img_info['height'],
                'objects': {
                    'id': [obj_info['id']],
                    'area': [obj_info['area']],
                    'bbox': [obj_info['bbox']],
                    'category': [obj_info['category']]
                }
            })

    print("Splitting dataset into train and test...")
    # Split the dataset into train and test
    train_data, test_data = train_test_split(dataset, test_size=0.15, random_state=42)

    print("Formatting datasets...")
    # Format the data using Hugging Face Dataset
    train_dataset = Dataset.from_list(train_data)
    test_dataset = Dataset.from_list(test_data)

    print("Datasets formatted successfully.")
    return DatasetDict({'train': train_dataset, 'test': test_dataset})

# Example usage
json_path = "./0115_T0_dataset_coco/result.json"
image_dir = "./0115_T0_dataset_coco/images"
print("Starting dataset loading...")
dataset = load_coco_dataset(json_path, image_dir)
print("Dataset loading complete.")

In [None]:
# View dataset structure
#print("Dataset structure:")
#print(dataset)

print("Sample train data:")
dataset['train'][0]

#print("Sample test data:")
#dataset['test'][0]


In [4]:
# First, extract out the train and test set

train_dataset = dataset["train"]
test_dataset = dataset["test"]

In [None]:
import numpy as np
from PIL import Image, ImageDraw


def draw_image_from_idx(dataset, idx):
    sample = dataset[idx]
    image = sample["image"]
    annotations = sample["objects"]
    draw = ImageDraw.Draw(image)
    width, height = sample["width"], sample["height"]

    for i in range(len(annotations["id"])):
        box = annotations["bbox"][i]
        class_idx = annotations["id"][i]
        x, y, w, h = tuple(box)
        if max(box) > 1.0:
            x1, y1 = int(x), int(y)
            x2, y2 = int(x + w), int(y + h)
        else:
            x1 = int(x * width)
            y1 = int(y * height)
            x2 = int((x + w) * width)
            y2 = int((y + h) * height)
        draw.rectangle((x1, y1, x2, y2), outline="red", width=1)
        draw.text((x1, y1), annotations["category"][i], fill="white")
    return image


draw_image_from_idx(dataset=train_dataset, idx=8)

In [None]:
import matplotlib.pyplot as plt


def plot_images(dataset, indices):
    """
    Plot images and their annotations.
    """
    num_rows = len(indices) // 3
    num_cols = 3
    fig, axes = plt.subplots(num_rows, num_cols, figsize=(15, 10))

    for i, idx in enumerate(indices):
        row = i // num_cols
        col = i % num_cols

        # Draw image
        image = draw_image_from_idx(dataset, idx)

        # Display image on the corresponding subplot
        axes[row, col].imshow(image)
        axes[row, col].axis("off")

    plt.tight_layout()
    plt.show()


# Now use the function to plot images

plot_images(train_dataset, range(9))

In [None]:
from transformers import AutoImageProcessor

checkpoint = "facebook/detr-resnet-50-dc5"
image_processor = AutoImageProcessor.from_pretrained(checkpoint)

#### Preprocessing the Dataset

Before passing the images to the image_processor, let’s also apply different types of augmentations to the images along with their corresponding bounding boxes.

In simple terms, augmentations are some set of random transformations like rotations, resizing etc. These are applied to get more samples and to make the vision model more robust towards different conditions of the image. We will use the albumentations library to achieve this. It let’s you to create random transformations of the images so that your sample size increases for training.

In [8]:
import albumentations
import numpy as np
import torch

transform = albumentations.Compose(
    [
        albumentations.Resize(700, 600),
        albumentations.HorizontalFlip(p=1.0),
        albumentations.RandomBrightnessContrast(p=1.0),
    ],
    bbox_params=albumentations.BboxParams(format="coco", label_fields=["category"]),
)

Once we initialize all the transformations, we need to make a function which formats the annotations and returns the a list of annotation with a very specific format.

This is because the image_processor expects the annotations to be in the following format: {'image_id': int, 'annotations': List[Dict]}, where each dictionary is a COCO object annotation.

In [9]:
def formatted_anns(image_id, category, area, bbox):
    annotations = []
    for i in range(0, len(category)):
        new_ann = {
            "image_id": image_id,
            "category_id": category[i],
            "isCrowd": 0,
            "area": area[i],
            "bbox": list(bbox[i]),
        }
        annotations.append(new_ann)

    return annotations

Finally, we combine the image and annotation transformations to do transformations over the whole batch of dataset.
Here is the final code to do so:

In [10]:
# transforming a batch

def transform_aug_ann(examples):
    image_ids = examples["image_id"]
    images, bboxes, area, categories = [], [], [], []
    for image, objects in zip(examples["image"], examples["objects"]):
        image = np.array(image.convert("RGB"))[:, :, ::-1]
        out = transform(image=image, bboxes=objects["bbox"], category=objects["id"])

        area.append(objects["area"])
        images.append(out["image"])
        bboxes.append(out["bboxes"])
        categories.append(out["category"])

    targets = [
        {"image_id": id_, "annotations": formatted_anns(id_, cat_, ar_, box_)}
        for id_, cat_, ar_, box_ in zip(image_ids, categories, area, bboxes)
    ]

    return image_processor(images=images, annotations=targets, return_tensors="pt")

Finally, all you have to do is apply this preprocessing function to the entire dataset

In [11]:
# Apply transformations for both train and test dataset

train_dataset_transformed = train_dataset.with_transform(transform_aug_ann)
test_dataset_transformed = test_dataset.with_transform(transform_aug_ann)

In [None]:
train_dataset_transformed[0]

A collate_fn is responsible for taking a list of samples from a dataset and converting them into a batch suitable for model’s input format.

In general a DataCollator typically performs tasks such as padding, truncating etc. In a custom collate function, we often define what and how we want to group the data into batches or simply, how to represent each batch.

Data collator mainly puts the data together and then preprocesses them. Let’s make our collate function.

In [13]:
def collate_fn(batch):
    pixel_values = [item["pixel_values"] for item in batch]
    encoding = image_processor.pad(pixel_values, return_tensors="pt")
    labels = [item["labels"] for item in batch]
    batch = {}
    batch["pixel_values"] = encoding["pixel_values"]
    batch["pixel_mask"] = encoding["pixel_mask"]
    batch["labels"] = labels
    return batch

In [None]:
from transformers import AutoModelForObjectDetection

id2label = {0: "defect"}
label2id = {v: k for k, v in id2label.items()}


model = AutoModelForObjectDetection.from_pretrained(
    checkpoint,
    id2label=id2label,
    label2id=label2id,
    ignore_mismatched_sizes=True,
)

#### Training a DETR Model.

So, all the heavy lifting is done so far. Now, all that is left is to assemble each part of the puzzle one by one. Let’s go!

The training procedure involves the following steps:

1. Loading the base (pre-trained) model with AutoModelForObjectDetection using the same checkpoint as in the preprocessing.

2. Defining all the hyperparameters and additional arguments inside TrainingArguments.

3. Pass the training arguments inside HuggingFace Trainer, along with the model, dataset and image.

4. Call the train() method and fine-tune your model.

When loading the model from the same checkpoint that you used for the preprocessing, remember to pass the label2id and id2label maps that you created earlier from the dataset’s metadata. Additionally, we specify ignore_mismatched_sizes=True to replace the existing classification head with a new one.

In [16]:
#from huggingface_hub import notebook_login

#notebook_login()

In [None]:
from transformers import TrainingArguments
from transformers import Trainer
import wandb
wandb.init(project="my_personal_project", entity=None)


# Define the training arguments
training_args = TrainingArguments(
    output_dir="detr-resnet-50-T0_LSR_seg-finetuned",
    per_device_train_batch_size=2,
    num_train_epochs=5,
    fp16=True,
    save_steps=100,
    logging_steps=20,
    learning_rate=5e-6,
    weight_decay=1e-4,
    save_total_limit=2,
    remove_unused_columns=False,
    push_to_hub=True,
    report_to="wandb",
    run_name="detr_finetune_experiment"
)



# Define the trainer
trainer = Trainer(
    model=model,
    args=training_args,
    data_collator=collate_fn,
    train_dataset=train_dataset_transformed,
    eval_dataset=test_dataset_transformed,
    tokenizer=image_processor,
)

trainer.train()

Once training is finished, you can now delete the model, because checkpoints are already uploaded in HuggingFace Hub

In [18]:
del model
torch.cuda.synchronize()

#### Testing and Inference

Now we will try to do inference of our new fine-tuned model. For this tutorial, we will be testing for this image:

In [None]:
import requests
from transformers import pipeline


image = Image.open("./0115_T0_dataset_coco/images/0a0526f9-FindDefect1_AECA1AOC2SY014C21_1669_709.bmp")

# make the object detection pipeline

obj_detector = pipeline(
    "object-detection", model="./detr-resnet-50-T0_LSR_seg-finetuned/checkpoint-4555"
)
results = obj_detector(train_dataset[100]["image"])

print(results)

Now let’s make a very simple function to plot the results on our image. We get score, label and corresponding bounding boxes co-ordinates from results, which we will we use to draw in the image.

In [33]:
def plot_results(image, results, threshold=0.7):
    image = Image.fromarray(np.uint8(image))
    draw = ImageDraw.Draw(image)
    for result in results:
        score = result["score"]
        label = result["label"]
        box = list(result["box"].values())
        if score > threshold:
            x, y, x2, y2 = tuple(box)
            draw.rectangle((x, y, x2, y2), outline="red", width=1)
            draw.text((x, y), label, fill="white")
            draw.text(
                (x + 0.5, y - 0.5),
                text=str(score),
                fill="green" if score > 0.7 else "red",
            )
    return image

In [None]:
%matplotlib inline
plot_results(image, results)

Now, let’s club everything together into a simple function.

In [None]:
def predict(image, pipeline, threshold=0.7):
    results = pipeline(image)
    return plot_results(image, results, threshold)


# Let's test for another test image

img = test_dataset[0]["image"]
predict(img, obj_detector)

In [None]:
from tqdm.auto import tqdm


def plot_images(dataset, indices):
    """
    Plot images and their annotations.
    """
    num_rows = len(indices) // 3
    num_cols = 3
    fig, axes = plt.subplots(num_rows, num_cols, figsize=(15, 10))

    for i, idx in tqdm(enumerate(indices), total=len(indices)):
        row = i // num_cols
        col = i % num_cols

        # Draw image
        image = predict(dataset[idx]["image"], obj_detector)

        # Display image on the corresponding subplot
        axes[row, col].imshow(image)
        axes[row, col].axis("off")

    plt.tight_layout()
    plt.show()


plot_images(test_dataset, range(6))