In [None]:
import os
HOME = "/home/iot/bd-lpr-rtdetr-trocr-swin2sr"
print(HOME)

In [None]:
import torch
TORCH_VERSION = ".".join(torch.__version__.split(".")[:2])
CUDA_VERSION = torch.__version__.split("+")[-1]
print("torch: ", TORCH_VERSION, "; cuda: ", CUDA_VERSION)

In [None]:
import roboflow
import supervision
import transformers
import pytorch_lightning
print(
    "roboflow:", roboflow.__version__,
    "; supervision:", supervision.__version__,
    "; transformers:", transformers.__version__,
    "; pytorch_lightning:", pytorch_lightning.__version__
)

In [None]:
from transformers import RTDetrImageProcessor, RTDetrV2ForObjectDetection

DEVICE = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
CHECKPOINT = 'PekingU/rtdetr_v2_r101vd'
CONFIDENCE_TRESHOLD = 0.5
IOU_TRESHOLD = 0.8
image_processor = RTDetrImageProcessor.from_pretrained(CHECKPOINT)
model = RTDetrV2ForObjectDetection.from_pretrained(CHECKPOINT)
model.to(DEVICE)

In [None]:
from roboflow import Roboflow
rf = Roboflow(api_key="85ptyZqJterXZHoJSHfp")
project = rf.workspace("roboflow-universe-projects").project("license-plate-recognition-rxg4e")
version = project.version(11)
dataset = version.download("coco")

In [None]:
import torchvision

ANNOTATION_FILE_NAME = "_annotations.coco.json"
TRAIN_DIRECTORY = os.path.join(dataset.location, "train")
VAL_DIRECTORY = os.path.join(dataset.location, "valid")
TEST_DIRECTORY = os.path.join(dataset.location, "test")

In [None]:
class CocoDetection(torchvision.datasets.CocoDetection):
    def __init__(self, image_directory_path: str, image_processor, train: bool = True):
        annotation_file_path = os.path.join(image_directory_path, ANNOTATION_FILE_NAME)
        super(CocoDetection, self).__init__(image_directory_path, annotation_file_path)
        self.image_processor = image_processor

    def __getitem__(self, idx):
        images, annotations = super(CocoDetection, self).__getitem__(idx)
        image_id = self.ids[idx]
        annotations = {'image_id': image_id, 'annotations': annotations}
        encoding = self.image_processor(images=images, annotations=annotations, return_tensors="pt")
        pixel_values = encoding["pixel_values"].squeeze()
        target = encoding["labels"][0]
        return pixel_values, target

In [None]:
TRAIN_DATASET = CocoDetection(image_directory_path=TRAIN_DIRECTORY, image_processor=image_processor, train=True)
VAL_DATASET = CocoDetection(image_directory_path=VAL_DIRECTORY, image_processor=image_processor, train=False)
TEST_DATASET = CocoDetection(image_directory_path=TEST_DIRECTORY, image_processor=image_processor, train=False)
print("Number of training examples:", len(TRAIN_DATASET))
print("Number of validation examples:", len(VAL_DATASET))
print("Number of test examples:", len(TEST_DATASET))

In [None]:
import random
import cv2
import numpy as np
import supervision as sv

image_ids = TRAIN_DATASET.coco.getImgIds()
image_id = random.choice(image_ids)
print('Image #{}'.format(image_id))
image = TRAIN_DATASET.coco.loadImgs(image_id)[0]
annotations = TRAIN_DATASET.coco.imgToAnns[image_id]
image_path = os.path.join(TRAIN_DATASET.root, image['file_name'])
image = cv2.imread(image_path)
detections = sv.Detections.from_coco_annotations(coco_annotation=annotations)
categories = TRAIN_DATASET.coco.cats
id2label = {k: v['name'] for k, v in categories.items()}
labels = [f"{id2label[class_id]}" for _, _, class_id, _ in detections]
box_annotator = sv.BoxAnnotator()
frame = box_annotator.annotate(scene=image, detections=detections, labels=labels)

%matplotlib inline
sv.show_frame_in_notebook(image, (16, 16))

In [None]:
from torch.utils.data import DataLoader

def collate_fn(batch):
    pixel_values = [item[0] for item in batch]
    encoding = image_processor.pad(pixel_values, return_tensors="pt")
    labels = [item[1] for item in batch]
    return {
        'pixel_values': encoding['pixel_values'],
        'pixel_mask': encoding['pixel_mask'],
        'labels': labels
    }

BATCH_SIZE = 8

TRAIN_DATALOADER = DataLoader(dataset=TRAIN_DATASET, collate_fn=collate_fn, batch_size=BATCH_SIZE, shuffle=True)
VAL_DATALOADER = DataLoader(dataset=VAL_DATASET, collate_fn=collate_fn, batch_size=BATCH_SIZE)
TEST_DATALOADER = DataLoader(dataset=TEST_DATASET, collate_fn=collate_fn, batch_size=BATCH_SIZE)

In [None]:
import pytorch_lightning as pl

class RTDETR(pl.LightningModule):
    def __init__(self, lr, lr_backbone, weight_decay):
        super().__init__()
        self.model = RTDetrV2ForObjectDetection.from_pretrained(
            pretrained_model_name_or_path=CHECKPOINT,
            num_labels=len(id2label),
            ignore_mismatched_sizes=True
        )
        self.lr = lr
        self.lr_backbone = lr_backbone
        self.weight_decay = weight_decay

    def forward(self, pixel_values, pixel_mask):
        return self.model(pixel_values=pixel_values, pixel_mask=pixel_mask)

    def common_step(self, batch, batch_idx):
        pixel_values = batch["pixel_values"]
        pixel_mask = batch["pixel_mask"]
        labels = [{k: v.to(self.device) for k, v in t.items()} for t in batch["labels"]]
        outputs = self.model(pixel_values=pixel_values, pixel_mask=pixel_mask, labels=labels)
        loss = outputs.loss
        loss_dict = outputs.loss_dict
        return loss, loss_dict

    def training_step(self, batch, batch_idx):
        loss, loss_dict = self.common_step(batch, batch_idx)
        self.log("training_loss", loss)
        for k, v in loss_dict.items():
            self.log("train_" + k, v.item())
        return loss

    def validation_step(self, batch, batch_idx):
        loss, loss_dict = self.common_step(batch, batch_idx)
        self.log("validation/loss", loss)
        for k, v in loss_dict.items():
            self.log("validation_" + k, v.item())
        return loss

    def configure_optimizers(self):
        param_dicts = [
            {"params": [p for n, p in self.named_parameters() if "backbone" not in n and p.requires_grad]},
            {"params": [p for n, p in self.named_parameters() if "backbone" in n and p.requires_grad], "lr": self.lr_backbone},
        ]
        return torch.optim.AdamW(param_dicts, lr=self.lr, weight_decay=self.weight_decay)

    def train_dataloader(self):
        return TRAIN_DATALOADER

    def val_dataloader(self):
        return VAL_DATALOADER

In [None]:
checkpoint_path = 'lightning_logs/version_6/checkpoints/epoch=41-step=4662.ckpt'
model = RTDETR.load_from_checkpoint(checkpoint_path, lr=1e-4, lr_backbone=1e-5, weight_decay=1e-4)
batch = next(iter(TRAIN_DATALOADER))
batch = {
    key: value.to(DEVICE) if isinstance(value, torch.Tensor) else value 
    for key, value in batch.items()
}
model.to(DEVICE)
model.eval()
with torch.no_grad():
    outputs = model(pixel_values=batch['pixel_values'], pixel_mask=batch['pixel_mask'])

In [None]:
# from pytorch_lightning import Trainer

# %cd {HOME}
# MAX_EPOCHS = 10
# model.train()
# trainer = Trainer(devices=1, accelerator="gpu", max_epochs=MAX_EPOCHS, gradient_clip_val=0.1, accumulate_grad_batches=8, log_every_n_steps=5)
# trainer.fit(model)

In [None]:
model.to(DEVICE)
image_ids = TEST_DATASET.coco.getImgIds()
image_id = random.choice(image_ids)
print('Image #{}'.format(image_id))
image = TEST_DATASET.coco.loadImgs(image_id)[0]
annotations = TEST_DATASET.coco.imgToAnns[image_id]
image_path = os.path.join(TEST_DATASET.root, image['file_name'])
image = cv2.imread(image_path)
detections = sv.Detections.from_coco_annotations(coco_annotation=annotations)
labels = [f"{id2label[class_id]}" for _, _, class_id, _ in detections]
frame = box_annotator.annotate(scene=image.copy(), detections=detections, labels=labels)
print('ground truth')
%matplotlib inline
sv.show_frame_in_notebook(frame, (16, 16))

model.eval()
with torch.no_grad():
    inputs = image_processor(images=image, return_tensors='pt')
    inputs = image_processor.pad(inputs['pixel_values'], return_tensors='pt')
    inputs.to(DEVICE)
    outputs = model(**inputs)
    target_sizes = torch.tensor([image.shape[:2]]).to(DEVICE)
    results = image_processor.post_process_object_detection(outputs=outputs, threshold=CONFIDENCE_TRESHOLD, target_sizes=target_sizes)[0]
detections = sv.Detections.from_transformers(transformers_results=results).with_nms(threshold=0.5)
labels = [f"{id2label[class_id]} {confidence:.2f}" for _, confidence, class_id, _ in detections]
frame = box_annotator.annotate(scene=image.copy(), detections=detections, labels=labels)
print('detections')
%matplotlib inline
sv.show_frame_in_notebook(frame, (16, 16))

In [None]:
def convert_to_xywh(boxes):
    xmin, ymin, xmax, ymax = boxes.unbind(1)
    return torch.stack((xmin, ymin, xmax - xmin, ymax - ymin), dim=1)

def prepare_for_coco_detection(predictions):
    coco_results = []
    for original_id, prediction in predictions.items():
        if len(prediction) == 0:
            continue

        boxes = prediction["boxes"]
        boxes = convert_to_xywh(boxes).tolist()
        scores = prediction["scores"].tolist()
        labels = prediction["labels"].tolist()

        coco_results.extend(
            [
                {
                    "image_id": original_id,
                    "category_id": labels[k],
                    "bbox": box,
                    "score": scores[k],
                }
                for k, box in enumerate(boxes)
            ]
        )
    return coco_results

In [None]:
from coco_eval import CocoEvaluator
from tqdm.notebook import tqdm
import numpy as np

evaluator = CocoEvaluator(coco_gt=TEST_DATASET.coco, iou_types=["bbox"])

print("Running evaluation...")

for idx, batch in enumerate(tqdm(TEST_DATALOADER)):
    pixel_values = batch["pixel_values"].to(DEVICE)
    pixel_mask = batch["pixel_mask"].to(DEVICE)
    labels = [{k: v.to(DEVICE) for k, v in t.items()} for t in batch["labels"]]

    model.eval()
    with torch.no_grad():
      outputs = model(pixel_values=pixel_values, pixel_mask=pixel_mask)

    orig_target_sizes = torch.stack([target["orig_size"] for target in labels], dim=0)
    results = image_processor.post_process_object_detection(outputs, target_sizes=orig_target_sizes)

    predictions = {target['image_id'].item(): output for target, output in zip(labels, results)}
    predictions = prepare_for_coco_detection(predictions)
    evaluator.update(predictions)

evaluator.synchronize_between_processes()
evaluator.accumulate()
evaluator.summarize()

In [None]:
MODEL_PATH = os.path.join(HOME, 'rtdetr')
model.model.save_pretrained(MODEL_PATH)
image_processor.save_pretrained(MODEL_PATH)

In [None]:
from transformers import RTDetrImageProcessor, RTDetrV2ForObjectDetection

device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
model = RTDetrV2ForObjectDetection.from_pretrained("rtdetr")
processor = RTDetrImageProcessor.from_pretrained("rtdetr")
model.config.id2label = {0: "Unknown", 1: "License Plate"}
model.to(device)
model.eval()

In [None]:
import cv2
from tqdm.notebook import tqdm
import supervision as sv

# Open video
video_path = "License Plate Detection Test.mp4"
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
    print("Error: Cannot open video")
    exit()

total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
fps = cap.get(cv2.CAP_PROP_FPS)
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
print(f"Video: {total_frames} frames, {fps} FPS, {width}x{height}")

# Set up output video
output_path = "rtdetr_output_video_3.mp4"
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))

# Initialize annotator
box_annotator = sv.BoxAnnotator()

# Process frames
pbar = tqdm(total=total_frames, desc="Processing")
for _ in range(total_frames):
    ret, frame = cap.read()
    if not ret:
        print(f"Skipped frame {_}")
        pbar.update(1)
        continue

    with torch.no_grad():
        # Preprocess and infer
        inputs = processor(images=frame, return_tensors='pt')
        inputs = processor.pad(inputs['pixel_values'], return_tensors='pt')
        inputs.to(device)
        outputs = model(**inputs)
        target_sizes = torch.tensor([frame.shape[:2]]).to(device)
        results = processor.post_process_object_detection(
            outputs=outputs, threshold=0.5, target_sizes=target_sizes
        )[0]

    # Annotate
    detections = sv.Detections.from_transformers(transformers_results=results)
    annotated_frame = box_annotator.annotate(
        scene=frame, detections=detections, labels=[f"{model.config.id2label[class_id]} {confidence:.2f}" for _, confidence, class_id, _ in detections]
    )

    # Write frame
    out.write(annotated_frame)
    pbar.update(1)

pbar.close()

# Release resources
cap.release()
out.release()
print(f"Output saved to {output_path}")