# Labeling raw data

#### We will use `owlv2` zero-shot classifier from Google to label our dataset

#### Assuming there is a folder in a root dir with individual images extracted from original .mkv videos (with ffmpeg for example), set it's name to `PIGS_FOLDER_PATH` variable

In [1]:
import os
import random
import shutil
from glob import glob

from torchvision.ops import nms
from transformers import AutoProcessor, Owlv2ForObjectDetection
import torch
from PIL import Image, ImageDraw
from tqdm import tqdm

  from .autonotebook import tqdm as notebook_tqdm


In [11]:
PIGS_FOLDER_PATH = "extacted_frames"
MODEL_LABELED_IMGS_PATH = "model_bboxed_frames"
MODEL_LABELS = "model_frames_labels"

GROUNDING_PROMPT = ["individual pig", "man view from above"]
DEVICE = "cuda"
BATCH_SIZE = 4

In [3]:
model_id = "google/owlv2-base-patch16-ensemble"
processor = AutoProcessor.from_pretrained(model_id)
model = Owlv2ForObjectDetection.from_pretrained(model_id).to(DEVICE)

In [14]:
# batch inference + saving predictions in YOLO format
def batch_predict(imgs_path):
    images = [Image.open(img).convert("RGB") for img in imgs_path]

    inputs = processor(
        images=images, text=[GROUNDING_PROMPT] * len(images), return_tensors="pt"
    ).to(DEVICE)

    with torch.no_grad():
        outputs = model(**inputs)

    target_sizes = torch.Tensor([img.size for img in images])
    results = processor.post_process_object_detection(
        outputs=outputs, threshold=0.3, target_sizes=target_sizes
    )

    os.makedirs(MODEL_LABELED_IMGS_PATH, exist_ok=True)
    os.makedirs(MODEL_LABELS, exist_ok=True)

    text_queries = GROUNDING_PROMPT
    for image, result, img_path in zip(images, results, imgs_path):
        draw = ImageDraw.Draw(image)
        boxes = result["boxes"]
        scores = result["scores"]
        labels = result["labels"]

        if len(boxes) > 0:
            keep = nms(boxes, scores, iou_threshold=0.3)
            boxes = boxes[keep]
            scores = scores[keep]
            labels = labels[keep]

        img_width, img_height = image.size
        anno_lines = []
        for box, score, label in zip(boxes, scores, labels):
            if score > 0.3:
                box = [round(i) for i in box.tolist()]
                draw.rectangle(box, outline="blue", width=3)
                label_text = f"{text_queries[label]} {score:.2f}"
                draw.text((box[0], box[1]), label_text, fill="blue")

                # YOLO: [class_id, x_center, y_center, width, height]
                x_min, y_min, x_max, y_max = box
                width = x_max - x_min
                height = y_max - y_min
                x_center = x_min + width / 2
                y_center = y_min + height / 2
                x_center_norm = x_center / img_width
                y_center_norm = y_center / img_height
                width_norm = width / img_width
                height_norm = height / img_height
                anno_lines.append(
                    f"0 {x_center_norm:.6f} {y_center_norm:.6f} {width_norm:.6f} {height_norm:.6f}"
                )

        output_path = os.path.join(
            MODEL_LABELED_IMGS_PATH, f"output_{os.path.basename(img_path)}"
        )
        image.save(output_path)

        anno_path = os.path.join(
            MODEL_LABELS, f"{os.path.splitext(os.path.basename(img_path))[0]}.txt"
        )
        with open(anno_path, "w") as f:
            f.write("\n".join(anno_lines))

In [15]:
all_imgs = glob(f"{PIGS_FOLDER_PATH}/*.jpg")
for idx in tqdm(range(0, len(all_imgs), BATCH_SIZE)):
    curr_imgs = all_imgs[idx: idx + BATCH_SIZE]

    batch_predict(curr_imgs)

100%|██████████| 753/753 [16:20<00:00,  1.30s/it]


### Splitting into training and test sets

In [16]:
def split_train_test(
    output_dir="owlv2_preds", anno_dir="owlv2_annotations", train_ratio=0.8, seed=42
):
    random.seed(seed)

    img_files = glob(os.path.join(output_dir, "*.jpg"))
    img_basenames = [os.path.splitext(os.path.basename(f))[0] for f in img_files]

    anno_files = glob(os.path.join(anno_dir, "*.txt"))
    anno_basenames = [os.path.splitext(os.path.basename(f))[0] for f in anno_files]
    assert set(img_basenames) == set(
        anno_basenames
    ), "Mismatch between images and annotations"

    random.shuffle(img_files)
    train_size = int(len(img_files) * train_ratio)
    train_imgs = img_files[:train_size]
    test_imgs = img_files[train_size:]

    train_img_dir = os.path.join(output_dir, "train")
    test_img_dir = os.path.join(output_dir, "test")
    train_anno_dir = os.path.join(anno_dir, "train")
    test_anno_dir = os.path.join(anno_dir, "test")

    os.makedirs(train_img_dir, exist_ok=True)
    os.makedirs(test_img_dir, exist_ok=True)
    os.makedirs(train_anno_dir, exist_ok=True)
    os.makedirs(test_anno_dir, exist_ok=True)

    def move_files(img_list, img_dest_dir, anno_dest_dir):
        for img_path in img_list:
            img_basename = os.path.splitext(os.path.basename(img_path))[0]
            anno_path = os.path.join(anno_dir, f"{img_basename}.txt")

            shutil.move(
                img_path, os.path.join(img_dest_dir, os.path.basename(img_path))
            )
            if os.path.exists(anno_path):
                shutil.move(
                    anno_path, os.path.join(anno_dest_dir, f"{img_basename}.txt")
                )

    move_files(train_imgs, train_img_dir, train_anno_dir)
    print(f"Moved {len(train_imgs)} images and annotations to train folders")

    move_files(test_imgs, test_img_dir, test_anno_dir)
    print(f"Moved {len(test_imgs)} images and annotations to test folders")

In [17]:
split_train_test(
    output_dir=PIGS_FOLDER_PATH, anno_dir=MODEL_LABELS, train_ratio=0.9, seed=137
)

Moved 2709 images and annotations to train folders
Moved 301 images and annotations to test folders
