In [1]:
import os
import albumentations as A
import cv2

In [2]:
# Data constants
ROBOFLOW_API_KEY = "6Zww8x98VFrvYjTYjNnN"

DATA_FOLDER = "../data/"

DATASET_DETAILS = {
    "dataset_amount": 2,
    "workspace_name": ["computer-vision-project-y5j59", "computer-vision-project-y5j59"],
    "project_name": ["pomelo-yotwr-e5cxd", "pomelo-ripeness-detection-using-yolov7-network-fr6ma"],
    "model_format": ["coco", "multiclass"],
    "version": [1, 1],
    "dataset_folder_name": ["pomelo-1", "Pomelo-Ripeness-Detection-using-YOLOv7-Network-1"],
}

RIPE_COLUMN_GROUPS = ["ripe", "old", "overripe"]
NOT_RIPE_COLUMN_GROUPS = ["young"]
IGNORE_COLUMNS = ["testset", "filename", "not-ripe", "ripe"]

TOTAL_PATH = os.path.join(DATA_FOLDER, "total")

DETECTION_VAL_SIZE = 0.1
DETECTION_TEST_SIZE = 0.1

In [3]:
# Augmentation constants
CLASSIFICATION_AUGS = A.Compose([
    A.VerticalFlip(p=0.5), # Flip the image vertically (no horizontal flip for real-world images)
    A.ShiftScaleRotate(shift_limit=0.1, scale_limit=0.1, rotate_limit=60, interpolation=cv2.INTER_CUBIC, border_mode=cv2.BORDER_REPLICATE, p=0.5), # Shift, scale, and rotate the image
    A.HueSaturationValue(hue_shift_limit=20, sat_shift_limit=20, val_shift_limit=20, p=0.5), # Change the hue, saturation, and value of the image, in case that smartphone camera did filter the image
    A.RandomBrightnessContrast(p=0.2), # Change the brightness and contrast of the image, for handling the lighting condition
])
DETECTION_AUGS = A.Compose([
    A.BBoxSafeRandomCrop(p=0.5), # Randomly crop the image while keeping the bounding boxes safe
    A.HorizontalFlip(p=0.5), # Flip the image horizontally
    A.ShiftScaleRotate(shift_limit=0.1, scale_limit=0.1, rotate_limit=60, interpolation=cv2.INTER_CUBIC, border_mode=cv2.BORDER_REPLICATE, p=0.5), # Shift, scale, and rotate the image
    A.GaussianBlur(blur_limit=(3, 7), p=0.5), # Add Gaussian blur, in case that user did not focus the camera well
    A.HueSaturationValue(hue_shift_limit=20, sat_shift_limit=20, val_shift_limit=20, p=0.5), # Change the hue, saturation, and value of the image, in case that smartphone camera did filter the image
    A.RandomBrightnessContrast(p=0.2), # Change the brightness and contrast of the image, for handling the lighting condition
], bbox_params=A.BboxParams(format='coco', label_fields=['category_ids'], min_visibility=0.8))

AUGMENTATION_AMOUNT = 5

  original_init(self, **validated_kwargs)


In [4]:
from roboflow import Roboflow

rf = Roboflow(api_key=ROBOFLOW_API_KEY)

# Download data

In [5]:
import shutil
import os

In [6]:
# Download the datasets
for dataset_index in range(DATASET_DETAILS["dataset_amount"]):
    workspace_name = DATASET_DETAILS["workspace_name"][dataset_index]
    project_name = DATASET_DETAILS["project_name"][dataset_index]
    model_format = DATASET_DETAILS["model_format"][dataset_index]
    version_number = DATASET_DETAILS["version"][dataset_index]
    dataset_folder_name = DATASET_DETAILS["dataset_folder_name"][dataset_index]

    # Download the dataset
    project = rf.workspace(workspace_name).project(project_name)
    version = project.version(version_number)
    dataset = version.download(model_format)

    # Define the source and destination paths
    source_path = dataset_folder_name
    destination_path = os.path.join(DATA_FOLDER, dataset_folder_name)

    # Create the destination directory if it doesn't exist
    if not os.path.exists(destination_path):
        os.makedirs(destination_path)

    # Remove exists destination folder
    shutil.rmtree(destination_path, ignore_errors=True)

    # Move the dataset folder to the destination
    shutil.move(source_path, destination_path)

loading Roboflow workspace...
loading Roboflow project...


Downloading Dataset Version Zip in pomelo-1 to coco:: 100%|██████████| 58108/58108 [00:04<00:00, 11731.83it/s]





Extracting Dataset Version Zip to pomelo-1 in coco:: 100%|██████████| 219/219 [00:00<00:00, 1662.64it/s]

loading Roboflow workspace...





loading Roboflow project...


Downloading Dataset Version Zip in Pomelo-Ripeness-Detection-using-YOLOv7-Network-1 to multiclass:: 100%|██████████| 249939/249939 [00:16<00:00, 15319.57it/s]





Extracting Dataset Version Zip to Pomelo-Ripeness-Detection-using-YOLOv7-Network-1 in multiclass:: 100%|██████████| 242/242 [00:00<00:00, 376.13it/s]


In [7]:
# Rename folder
for dataset_index in range(DATASET_DETAILS["dataset_amount"]):
    dataset_folder_name = DATASET_DETAILS["dataset_folder_name"][dataset_index]
    new_dataset_folder_name = "pomelo-" + (
        "detection" if DATASET_DETAILS["model_format"][dataset_index] == "coco" else "classification"
    )

    new_dataset_folder_path = os.path.join(DATA_FOLDER, new_dataset_folder_name)
    dataset_folder_path = os.path.join(DATA_FOLDER, dataset_folder_name)

    # Remove exists new dataset folder
    shutil.rmtree(new_dataset_folder_path, ignore_errors=True)
    # Rename the dataset folder
    shutil.move(dataset_folder_path, new_dataset_folder_path)

# Preprocess data

In [8]:
import json
import os
import shutil
import copy
from sklearn.model_selection import train_test_split

In [9]:
detection_folder = os.path.join(DATA_FOLDER, "pomelo-detection")
temp_folder = os.path.join(detection_folder, "temp")
train_folder = os.path.join(detection_folder, "train")
valid_folder = os.path.join(detection_folder, "valid")
test_folder = os.path.join(detection_folder, "test")

os.makedirs(temp_folder, exist_ok=True)
os.makedirs(train_folder, exist_ok=True)
os.makedirs(valid_folder, exist_ok=True)
os.makedirs(test_folder, exist_ok=True)

In [10]:
# Move all from train folder to temp folder
if os.path.exists(temp_folder):
    shutil.rmtree(temp_folder, ignore_errors=True)
os.makedirs(temp_folder, exist_ok=True)
for filename in os.listdir(train_folder):
    file_path = os.path.join(train_folder, filename)
    shutil.move(file_path, temp_folder)

In [11]:
annotation_json = json.load(open(os.path.join(temp_folder, "_annotations.coco.json")))

In [12]:
total_images_id = [image["id"] for image in annotation_json["images"]]
train_images_id, valid_images_id = train_test_split(total_images_id, test_size=DETECTION_VAL_SIZE, random_state=42)
train_images_id, test_images_id = train_test_split(train_images_id, test_size=DETECTION_TEST_SIZE, random_state=42)

print(f"Total images: {len(total_images_id)}")
print(f"Train images: {len(train_images_id)}")
print(f"Valid images: {len(valid_images_id)}")
print(f"Test images: {len(test_images_id)}")

Total images: 215
Train images: 173
Valid images: 22
Test images: 20


In [13]:
default_annotations = {
    "info": annotation_json["info"],
    "licenses": annotation_json["licenses"],
    "categories": annotation_json["categories"],
    "images": [],
    "annotations": [],
}
train_annotations = copy.deepcopy(default_annotations)
valid_annotations = copy.deepcopy(default_annotations)
test_annotations = copy.deepcopy(default_annotations)

target_folder_by_set = {"train": train_folder, "valid": valid_folder, "test": test_folder}

for image in annotation_json["images"]:
    image_id = image["id"]
    file_is_in = None
    if image_id in train_images_id:
        file_is_in = "train"
        train_annotations["images"].append(image)
    elif image_id in valid_images_id:
        file_is_in = "valid"
        valid_annotations["images"].append(image)
    elif image_id in test_images_id:
        file_is_in = "test"
        test_annotations["images"].append(image)

    # Add image to the corresponding set
    source_image_path = os.path.join(temp_folder, image["file_name"])
    target_image_path = os.path.join(target_folder_by_set[file_is_in], image["file_name"])
    shutil.copy(source_image_path, target_image_path)

for annotation in annotation_json["annotations"]:
    image_id = annotation["image_id"]
    if image_id in train_images_id:
        train_annotations["annotations"].append(annotation)
    if image_id in valid_images_id:
        valid_annotations["annotations"].append(annotation)
    if image_id in test_images_id:
        test_annotations["annotations"].append(annotation)

# Save the new annotations
for set_name, annotations in zip(["train", "valid", "test"], [train_annotations, valid_annotations, test_annotations]):
    with open(os.path.join(target_folder_by_set[set_name], "_annotations.coco.json"), "w") as f:
        json.dump(annotations, f)

# Remove temp folder
shutil.rmtree(temp_folder, ignore_errors=True)

# Augmentation data

In [None]:
import itertools
import cv2
import os
import shutil
import copy
import json
import pandas as pd
from tqdm import tqdm


In [15]:
# Augment detection tasks
classification_folder = os.path.join(DATA_FOLDER, "pomelo-detection")
classification_temp_folder = os.path.join(classification_folder, "temp")
target_folder = [os.path.join(classification_folder, target) for target in ["train", "valid", "test"]]

for folder in target_folder:
    print(f"Augmenting {folder}...")
    if os.path.exists(classification_temp_folder):
        shutil.rmtree(classification_temp_folder, ignore_errors=True)
    os.makedirs(classification_temp_folder, exist_ok=True)

    # Load annotations
    with open(os.path.join(folder, "_annotations.coco.json"), "r") as f:
        annotations = json.load(f)
    new_annotations = copy.deepcopy(annotations)

    max_image_id = max([image["id"] for image in annotations["images"]])
    image_id_generator = itertools.count(max_image_id + 1)
    
    looper = tqdm(annotations["images"], desc="Augmenting images", unit="image")
    for image_data in looper:
        # Get file path
        file_path = os.path.join(folder, image_data["file_name"])

        # Get annotations
        image_annotations = [annotation for annotation in annotations["annotations"] if annotation["image_id"] == image_data["id"]]
        bboxes, category_ids = [], []
        for annotation in image_annotations:
            # Get bounding box coordinates
            x, y, w, h = annotation["bbox"]
            bboxes.append([x, y, w, h])
            category_ids.append(annotation["category_id"])
        looper.set_postfix({"bboxes": len(bboxes)})

        # Read image
        image = cv2.imread(file_path)

        for _ in range(AUGMENTATION_AMOUNT):
            looper.set_postfix({"augmented_bboxes": _ + 1})
            # Apply augmentations
            augmented = DETECTION_AUGS(image=image, bboxes=bboxes, category_ids=category_ids)
            augmented_image = augmented["image"]
            augmented_bboxes = augmented["bboxes"]
            augmented_category_ids = augmented["category_ids"]

            # Add augmented image
            new_image_id = next(image_id_generator)
            new_image_name = f"{new_image_id}.jpg"
            new_image_path = os.path.join(classification_temp_folder, new_image_name)
            cv2.imwrite(new_image_path, augmented_image)

            # Add augmented annotations
            for bbox, category_id in zip(augmented_bboxes, augmented_category_ids):
                new_annotation = {
                    "id": new_image_id,
                    "image_id": new_image_id,
                    "category_id": int(category_id),
                    "bbox": bbox,
                    "area": bbox[2] * bbox[3],
                    "senmentation": [],
                    "iscrowd": 0,
                }
                new_annotations["annotations"].append(new_annotation)

            # Add image to annotations
            new_image = {
                'id': new_image_id,
                'license': image_data["license"],
                'file_name': new_image_name,
                'height': image.shape[0],
                'width': image.shape[1],
                'date_captured': image_data["date_captured"],
                'extra': {
                    'name': new_image_name
                }
            }
            new_annotations["images"].append(new_image)

        # Move original image to temp folder
        shutil.move(file_path, os.path.join(classification_temp_folder, image_data["file_name"]))

    # Save new annotations
    with open(os.path.join(classification_temp_folder, "_annotations.coco.json"), "w") as f:
        json.dump(new_annotations, f)

    # Remove original folder 
    shutil.rmtree(folder, ignore_errors=True)

    # Move temp folder to original folder
    shutil.move(classification_temp_folder, folder)

    print(f"Augmented {folder} successfully.")
    print(f"\tOriginal images: {len(annotations['images'])}")
    print(f"\tAugmented images: {len(new_annotations['images'])}")

Augmenting ../data/pomelo-detection/train...


Augmenting images: 100%|██████████| 173/173 [00:25<00:00,  6.71image/s, augmented_bboxes=5]


Augmented ../data/pomelo-detection/train successfully.
	Original images: 173
	Augmented images: 1038
Augmenting ../data/pomelo-detection/valid...


Augmenting images: 100%|██████████| 22/22 [00:03<00:00,  6.46image/s, augmented_bboxes=5]


Augmented ../data/pomelo-detection/valid successfully.
	Original images: 22
	Augmented images: 132
Augmenting ../data/pomelo-detection/test...


Augmenting images: 100%|██████████| 20/20 [00:03<00:00,  6.50image/s, augmented_bboxes=5]

Augmented ../data/pomelo-detection/test successfully.
	Original images: 20
	Augmented images: 120





In [16]:
# Augment classification tasks
classification_folder = os.path.join(DATA_FOLDER, "pomelo-classification")
target_folder = [os.path.join(classification_folder, target) for target in ["train", "valid", "test"]]

for folder in target_folder:
    print(f"Augmenting {folder}...")

    # Load annotations
    class_annotations = pd.read_csv(os.path.join(folder, "_classes.csv"))
    original_size = len(class_annotations)
    if 'TestSet' in class_annotations.columns:
        class_annotations = class_annotations[class_annotations['TestSet'] == 0]
        class_annotations = class_annotations.drop(columns='TestSet')

    looper = tqdm(class_annotations['filename'].tolist(), desc="Augmenting images", unit="image")
    for filename in looper:
        looper.set_postfix({"filename": filename})
        # Get file path
        file_path = os.path.join(folder, filename)

        # Read image
        image = cv2.imread(file_path)

        new_rows = []
        for _ in range(AUGMENTATION_AMOUNT):
            # Apply augmentations
            augmented = CLASSIFICATION_AUGS(image=image)
            augmented_image = augmented["image"]

            # Save augmented image
            new_image_name = os.path.basename(file_path) + f"_{_}.jpg"
            new_image_path = os.path.join(folder, new_image_name)

            cv2.imwrite(new_image_path, augmented_image)

            # Add augmented image to annotations
            new_row = class_annotations[class_annotations['filename'] == filename].iloc[0]
            new_row["filename"] = new_image_name
            new_rows.append(new_row)

        class_annotations = pd.concat([class_annotations, pd.DataFrame(new_rows)], ignore_index=True)

    # Save new annotations
    class_annotations.to_csv(os.path.join(folder, "_classes.csv"), index=False)
    print(f"Augmented {folder} successfully.")
    print(f"\tOriginal images: {original_size}")
    print(f"\tTotal images: {len(class_annotations)}")

Augmenting ../data/pomelo-classification/train...


Augmenting images: 100%|██████████| 201/201 [02:04<00:00,  1.61image/s, filename=IMG_4911_jpg.rf.7a8a1758cd35672b0ee72e5f29ccf52c.jpg]


Augmented ../data/pomelo-classification/train successfully.
	Original images: 216
	Total images: 1206
Augmenting ../data/pomelo-classification/valid...


Augmenting images: 100%|██████████| 5/5 [00:04<00:00,  1.10image/s, filename=IMG_4910_jpg.rf.0558eda1565b52e73879550aba2bc38a.jpg]


Augmented ../data/pomelo-classification/valid successfully.
	Original images: 9
	Total images: 30
Augmenting ../data/pomelo-classification/test...


Augmenting images: 100%|██████████| 8/8 [00:07<00:00,  1.01image/s, filename=IMG_4862_jpg.rf.f18b1ee1137dcf2eb72a74f74268b769.jpg]

Augmented ../data/pomelo-classification/test successfully.
	Original images: 9
	Total images: 48



