## Dataprep

In [None]:
from roboflow import Roboflow

rf = Roboflow(api_key="3xQnZJNNlVmhFwAJLzhi")
project = rf.workspace("rosenheim").project("caqao-ainoc")
version = project.version(6)
dataset = version.download("yolov8")

In [None]:
import os
import shutil

base_dir = "cocoa-2"
splits = ["train", "test", "valid"]
dir_labels = [os.path.join(base_dir, f"{split}/labels") for split in splits]
dir_images = [os.path.join(base_dir, f"{split}/images") for split in splits]


def move_files(src_dir, dest_dir):
    if not os.path.exists(src_dir):
        print(f"Source directory {src_dir} does not exist. Skipping.")
        return
    if not os.path.exists(dest_dir):
        os.makedirs(dest_dir, exist_ok=True)
    for file in os.listdir(src_dir):
        src_file = os.path.join(src_dir, file)
        dest_file = os.path.join(dest_dir, file)
        shutil.move(src_file, dest_file)


# Move files from train and test to valid
for i in range(0, 2):
    move_files(dir_labels[i], dir_labels[2])
    move_files(dir_images[i], dir_images[2])

print("All files have been moved successfully!")

In [8]:
for split in splits[:2]:  # Only train and test
    split_dir = os.path.join(base_dir, split)
    if os.path.exists(split_dir):
        shutil.rmtree(split_dir)

In [1]:
import fiftyone as fo

In [None]:
dataset = fo.Dataset.from_dir(
    dataset_dir="CAQAO-6",
    dataset_type=fo.types.YOLOv5Dataset,
)

In [None]:
unique_resolutions = set()

# Loop through all the samples in the dataset
for sample in dataset:
    image_path = sample.filepath
    # Read the image using OpenCV
    img = cv2.imread(image_path)

    # Get the resolution of the current image
    height, width = img.shape[:2]

    # Add the resolution to the set
    unique_resolutions.add((width, height))

# Output all unique resolutions
if unique_resolutions:
    print("Unique resolutions found in the dataset:")
    for resolution in unique_resolutions:
        print(f"{resolution[0]}x{resolution[1]}")
else:
    print("No images found in the dataset.")

In [None]:
import fiftyone as fo
import cv2
import random

# Load the YOLOv5 dataset
# target_width, target_height = 4032, 3024

# # Loop through all the samples in the dataset
# for sample in dataset:
#     image_path = sample.filepath
#     img = cv2.imread(image_path)

#     # Get the original resolution of the image
#     original_height, original_width = img.shape[:2]

#     # If the image is not already in the target resolution, resize it
#     if (original_width, original_height) != (target_width, target_height):
#         # Calculate the scaling factors
#         scaling_x = target_width / original_width
#         scaling_y = target_height / original_height

#         # Resize the image to the target resolution
#         img_resized = cv2.resize(img, (target_width, target_height))

#         # Save the resized image back
#         cv2.imwrite(image_path, img_resized)

#         # Adjust bounding boxes
#         detections = sample.ground_truth.detections  # Assuming `ground_truth` stores the detections
#         for detection in detections:
#             # Get the original bounding box [x, y, width, height]
#             bbox = detection.bounding_box

#             # Scale the bounding box coordinates
#             x_min = bbox[0] * scaling_x
#             y_min = bbox[1] * scaling_y
#             width = bbox[2] * scaling_x
#             height = bbox[3] * scaling_y

#             # Update the bounding box with the scaled values
#             detection.bounding_box = [x_min, y_min, width, height]

#         # Save the updated sample
#         sample.save()

#     else:
#         print(f"Image {image_path} is already at the target resolution.")

# Shuffle the dataset
samples = dataset.view()
sample_ids = [sample.id for sample in samples]
random.shuffle(sample_ids)

# Split into train, val, test (70%, 20%, 10%)
total_samples = len(sample_ids)
train_split = int(0.7 * total_samples)
val_split = int(0.9 * total_samples)

train_ids = sample_ids[:train_split]
val_ids = sample_ids[train_split:val_split]
test_ids = sample_ids[val_split:]

# Tag the samples
for sample_id in train_ids:
    sample = dataset[sample_id]
    sample.tags.append("train")
    sample.save()

for sample_id in val_ids:
    sample = dataset[sample_id]
    sample.tags.append("val")
    sample.save()

for sample_id in test_ids:
    sample = dataset[sample_id]
    sample.tags.append("test")
    sample.save()

# Export the dataset with splits
dataset.export(
    export_dir="cocoa-2-processed",
    dataset_type=fo.types.FiftyOneDataset,
)

In [None]:
ds = fo.Dataset.from_dir(
    dataset_dir="cocoa-2-processed",
    dataset_type=fo.types.FiftyOneDataset,
)

In [None]:
import fiftyone as fo

# Define the class mapping
defect_classes = ["Slaty", "Mouldy", "Insect-damaged", "Germinated"]
non_defect_classes = [
    "Brown - G1",
    "Partly Purple - G2",
    "Partly Purple - G3",
    "Total Purple - G2",
    "Total Purple - G3",
    "Total Purple - G4",
    "Very Dark Brown - G3",
    "Very Dark Brown - G4",
    "Brown - G4",
    "Partly Purple - G4",
    "Total Purple - G1",
    "Partly Purple - G1",
    "Very Dark Brown - G2",
    "Brown - G2",
    "Very Dark Brown - G1",
    "Brown - G3",
]

class_mapping = {cls: 1 for cls in defect_classes}
class_mapping.update({cls: 0 for cls in non_defect_classes})

# Load your FiftyOne dataset

# Update the dataset with the new class mapping
for sample in ds:
    detections = (
        sample.ground_truth.detections
    )  # Assuming `ground_truth` stores the detections
    for detection in detections:
        original_label = detection.label
        if original_label in class_mapping:
            detection.label = (
                "Defect"
                if class_mapping[original_label] == 1
                else "Non-Defect"
            )

    # Save the updated sample
    sample.save()

# Verify the changes
print("Dataset updated with new class mappings!")

In [None]:
ds.export(
    export_dir="cocoa-2-processed",
    dataset_type=fo.types.FiftyOneDataset,
)

In [None]:
fo.Session(ds, port=27771)

In [22]:
for sample in ds:
    image_path = sample.filepath
    img = Image.open(image_path)

    # Resize to 1920x1080 if it doesn't match
    if img.size != (1920, 1080):
        print("no")
        img = img.resize((1920, 1080), Image.Resampling.LANCZOS)
        img.save(image_path)

In [None]:
import fiftyone as fo
from pathlib import Path

# Load the FiftyOne dataset

# Define export directory and splits
export_dir = "roboflow_yolov8"
splits = ["train", "val", "test"]

# Ensure the export directory exists
Path(export_dir).mkdir(exist_ok=True)

# Export each split
for split in splits:
    # Select samples tagged with the current split
    split_view = ds.match_tags(split)

    # Define split-specific export directory
    split_export_dir = Path(export_dir) / split

    # Convert the export directory to a string
    split_export_dir_str = str(split_export_dir)

    # Export the split
    split_view.export(
        export_dir=split_export_dir_str,
        dataset_type=fo.types.YOLOv5Dataset,  # YOLOv8 uses the same format as YOLOv5
        label_field="ground_truth",  # Replace with your label field
        include_media=True,
        overwrite=True,
    )

In [None]:
import os
import shutil
from pathlib import Path

# Define the root directory
root_dir = export_dir
splits = ["train", "val", "test"]

for split in splits:
    # Define paths
    split_dir = Path(root_dir) / split
    images_src = split_dir / "images" / "val"
    labels_src = split_dir / "labels" / "val"
    req_dir = Path(root_dir)
    images_dest = req_dir / "images" / split
    labels_dest = req_dir / "labels" / split

    # Create destination directories if not exist
    images_dest.mkdir(parents=True, exist_ok=True)
    labels_dest.mkdir(parents=True, exist_ok=True)

    # Move image files
    if images_src.exists():
        for file in images_src.iterdir():
            if file.is_file():
                shutil.move(str(file), images_dest)
        # Remove the now-empty directory
        images_src.rmdir()

    # Move label files
    if labels_src.exists():
        for file in labels_src.iterdir():
            if file.is_file():
                shutil.move(str(file), labels_dest)
        # Remove the now-empty directory
        labels_src.rmdir()

# Verify and print the final structure
print("Dataset rearranged successfully!")

## Pred

In [1]:
import fiftyone as fo

In [2]:
from typing import Union

import numpy as np


def xyxy2normalized_topleftxywh(
    xmin: Union[float, np.ndarray],
    ymin: Union[float, np.ndarray],
    xmax: Union[float, np.ndarray],
    ymax: Union[float, np.ndarray],
    bg_w: Union[int, np.ndarray],
    bg_h: Union[int, np.ndarray],
):
    """
    :return:
    """
    w = xmax - xmin
    h = ymax - ymin
    x_topleft_norm = xmin / bg_w
    y_topleft_norm = ymin / bg_h
    w_norm = w / bg_w
    h_norm = h / bg_h

    return x_topleft_norm, y_topleft_norm, w_norm, h_norm

In [None]:
ds = fo.Dataset.from_dir(
    dataset_dir="cocoa-2-processed",
    dataset_type=fo.types.FiftyOneDataset,
)

In [4]:
ds.default_classes = ["Defect", "Non-Defect"]

### Normal yolo pred (change jupyter kernel to paper_env)

In [3]:
from ultralytics import YOLO

model = YOLO("runs/detect/train2/weights/best.pt")

In [None]:
for sample in ds:
    detections = []
    v8_result = model.predict(sample.filepath, imgsz=640)
    img = v8_result[0].orig_img
    preds = v8_result[0].boxes.data
    for i, box in enumerate(preds):
        x1, y1, x2, y2 = box[:4].int().tolist()
        cnf = float(box[4])  # Confidence score from YOLO output
        class_id = int(box[5])  # Class ID from YOLO output

        # Convert to normalized top-left xywh format
        x_topleft_norm, y_topleft_norm, w_norm, h_norm = (
            xyxy2normalized_topleftxywh(x1, y1, x2, y2, 1920, 1080)
        )

        # Append to FiftyOne detections
        detections.append(
            fo.Detection(
                label=ds.default_classes[class_id],
                bounding_box=[x_topleft_norm, y_topleft_norm, w_norm, h_norm],
                confidence=cnf,
            )
        )

    # Assign detections to the sample and save
    sample["predictions"] = fo.Detections(detections=detections)
    sample.save()

### Combined yolo pred (change jupyter kernel to paper_env2)

In [5]:
from ultralytics import YOLO
import torch
from ultralytics.engine.trainer import get_vgg16_model
from torchvision import transforms
from EMA import (
    EMA_region_begin,
    EMA_region_define,
    EMA_region_end,
    EMA_init,
    EMA_finalize,
)

In [None]:
v8_model = YOLO(
    "/home/paineni/MasterThesis/runs/detect/train2/weights/best.pt"
)
vgg_model, _ = get_vgg16_model()
vgg_model_path = "vgg_16_16.pth"
vgg_model.load_state_dict(torch.load(vgg_model_path), strict=False)

In [None]:
vgg_model.to("cuda")
vgg_model.eval()

In [8]:
# Define the preprocessing transformations


preprocess = transforms.Compose(
    [
        transforms.Resize((224, 224)),
        transforms.ToTensor(),
        transforms.Normalize(
            mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]
        ),
    ]
)

In [9]:
import torch.nn.functional as F


def get_class_prediction(cropped_image, model):
    image = preprocess(cropped_image)
    image = image.unsqueeze(0).to(
        "cuda"
    )  # Add batch dimension and move to device
    with torch.no_grad():
        output = model(image)
        probabilities = F.softmax(
            output, dim=1
        )  # Apply softmax to get probabilities
        confidence, predicted = torch.max(probabilities, 1)
    return predicted.item(), confidence.item()

In [10]:
from PIL import Image, ImageOps


def resize_with_padding_pil(
    image, output_size=(224, 224), padding_color=(0, 0, 0)
):
    """
    Resizes a PIL image to the desired size with padding to maintain the aspect ratio.
    If the image is smaller than the target size, only padding is applied.

    Args:
    - image (PIL.Image.Image): The input image.
    - output_size (tuple): The desired output size as (height, width). Default is (224, 224).
    - padding_color (tuple): The RGB color for the padding. Default is black (0, 0, 0).

    Returns:
    - PIL.Image.Image: The resized and padded image.
    """
    # Get the original dimensions
    original_width, original_height = image.size
    target_width, target_height = output_size

    # Check if resizing is needed
    if original_width > target_width or original_height > target_height:
        # Calculate the scaling factor
        scale = min(
            target_width / original_width, target_height / original_height
        )

        # Compute new dimensions
        new_width = int(original_width * scale)
        new_height = int(original_height * scale)

        # Resize the image
        image = image.resize((new_width, new_height), Image.LANCZOS)

    # Add padding to make the final size match output_size
    padded_image = ImageOps.pad(
        image,
        output_size,
        color=padding_color,
        centering=(0.5, 0.5),  # Center the image
    )

    return padded_image

In [None]:
EMA_init()
region = EMA_region_define("abc")
EMA_region_begin(region)
for sample in ds:
    detections = []
    v8_result = v8_model.predict(
        sample.filepath,
        imgsz=640,
    )
    img = v8_result[0].orig_img
    preds = v8_result[0].boxes.data
    for i, box in enumerate(preds):
        x1, y1, x2, y2 = box[:4].int().tolist()
        cropped_np = img[y1:y2, x1:x2, :]
        cropped_image = Image.fromarray(
            cropped_np
        )  # Convert to PIL Image for preprocessing
        cropped_image = resize_with_padding_pil(
            cropped_image, output_size=(224, 224)
        )

        class_id, cnf = get_class_prediction(cropped_image, vgg_model)
        x_topleft_norm, y_topleft_norm, w_norm, h_norm = (
            xyxy2normalized_topleftxywh(x1, y1, x2, y2, 1920, 1080)
        )
        detections.append(
            fo.Detection(
                label=ds.default_classes[int(class_id)],
                bounding_box=[x_topleft_norm, y_topleft_norm, w_norm, h_norm],
                confidence=cnf,
            )
        )
        sample["predictions"] = fo.Detections(detections=detections)
        sample.save()
EMA_region_end(region)
EMA_finalize()

### view results

In [None]:
fo.Session(ds, port=23672)

## Evaluation

In [None]:
import os
import tempfile
from matplotlib.figure import Figure
import pandas as pd

artifacts_path = "atrifacts"
splits = ["train", "val", "test"]
for split_tag in splits:
    view = ds.match_tags([split_tag])

    # Evaluate the objects in the `predictions`
    # field with respect to the
    # objects in the `ground_truth` field
    eval_key = f"eval_predictions_{split_tag}"
    results = view.evaluate_detections(
        "predictions",
        gt_field="ground_truth",
        compute_mAP=True,
        classes=ds.default_classes,
        missing="background",
        classwise=False,
    )
    # whether to consider objects with different label
    # values as always non-overlapping (True) or to compute IoUs
    # for all objects regardless of label (False)

    # the COCO mAP evaluator averages the mAP
    # over 10 IoU thresholds from 0.5 to 0.95
    # with a step size of 0.05 (AP@[0.5:0.05:0.95])
    # To be found in the source of fiftyone.
    # "https://github.com/voxel51/fiftyone/blob/"
    # "acf3a8f886505d852903e320d057057813261993/fiftyone/"
    # "utils/eval/coco.py#L91"
    mAP = results.mAP()
    print(f"mAP@[0.5:0.05:0.95] {split_tag} : " + str(mAP))
    classwise_ap_df = pd.DataFrame(columns=["Label", "AP@[0.5:0.05:0.95]"])
    label_values = ds.count_values("ground_truth.detections.label")
    for label in ds.default_classes:
        class_AP = results.mAP([label])
        print(f"AP@[0.5:0.05:0.95] of {split_tag} ({label}): " + str(class_AP))
        classwise_ap_df = classwise_ap_df._append(
            {
                "Label": label,
                "AP@[0.5:0.05:0.95]": class_AP,
                "support": label_values[label],
            },
            ignore_index=True,
        )

    with tempfile.TemporaryDirectory() as tmpworkdir:
        temp_csv_path = os.path.join(
            tmpworkdir, f"{split_tag}_classwise_ap_values.csv"
        )
        classwise_ap_df.to_csv(temp_csv_path, index=False)

    results.print_report()
    report = results.report()
    weighted_avg_precision = report["weighted avg"]["precision"]
    weighted_avg_recall = report["weighted avg"]["recall"]

    # Print some statistics about the total TP/FP/FN counts
    # mean_tp = view.sum(f"{eval_key}_tp")
    # mean_fp = view.sum(f"{eval_key}_fp")
    # mean_fn = view.sum(f"{eval_key}_fn")
    # print(f"TP ({split_tag}): {mean_tp}")
    # print(f"FP ({split_tag}): {mean_fp}")
    # print(f"FN ({split_tag}): {mean_fn}")

    class_counts = view.count_values("predictions.detections.label")

    # pr_curve_path = os.path.join(artifacts_path, f"PR_curve_{split_tag}.png")
    # pr_curve_plot: Figure = results.plot_pr_curves(
    #     classes=list(class_counts.keys()),
    #     backend="matplotlib",
    #     style="dark_background",
    # )
    # pr_curve_plot.savefig(pr_curve_path, dpi=250)

    # conf_mat_path = os.path.join(
    #     artifacts_path, f"confusion_matrix_{split_tag}.png"
    # )
    # conf_mat_plot: Figure = results.plot_confusion_matrix(backend="matplotlib")
    # conf_mat_plot.savefig(conf_mat_path, dpi=250)

ds.save()