In [None]:
# ================================
# Install dependencies
# ================================
!pip install -q roboflow diffusers transformers accelerate safetensors pillow matplotlib numpy

# ================================
# Imports
# ================================
from roboflow import Roboflow
import random
import os, torch, random, shutil
import numpy as np
from PIL import Image, ImageEnhance, ImageFilter
from diffusers import StableDiffusionImg2ImgPipeline
from diffusers.utils import logging
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from google.colab import drive

drive.mount('/content/drive')
logging.set_verbosity_error()

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
rf = Roboflow(api_key="7TI8mHPqsyInAEUtwLsa")
project = rf.workspace("en-zbzva").project("all-pepper-datasets-q5opc")
version = project.version(1)
dataset = version.download("voc")

dataset_dir = dataset.location

train_dir = os.path.join(dataset_dir, "train")
val_dir   = os.path.join(dataset_dir, "valid")
test_dir  = os.path.join(dataset_dir, "test")

train_imgs = [os.path.join(train_dir, f) for f in os.listdir(train_dir) if f.endswith(".jpg")]
val_imgs   = [os.path.join(val_dir,   f) for f in os.listdir(val_dir)   if f.endswith(".jpg")]
test_imgs  = [os.path.join(test_dir,  f) for f in os.listdir(test_dir)  if f.endswith(".jpg")]

train_dir_lbl = [os.path.join(train_dir, f) for f in os.listdir(train_dir) if f.endswith(".xml")]
val_dir_lbl   = [os.path.join(val_dir,   f) for f in os.listdir(val_dir)   if f.endswith(".xml")]
test_dir_lbl  = [os.path.join(test_dir,  f) for f in os.listdir(test_dir)  if f.endswith(".xml")]


print("Train images:", len(train_imgs))
print("Valid images:", len(val_imgs))
print("Test images:", len(test_imgs))
print("Train labels:", len(train_dir_lbl))
print("Valid labels:", len(val_dir_lbl))
print("Test labels:", len(test_dir_lbl))




loading Roboflow workspace...
loading Roboflow project...
Train images: 1489
Valid images: 427
Test images: 211
Train labels: 1489
Valid labels: 427
Test labels: 211


In [None]:
# # ================================
# # Load Roboflow dataset
# # ================================
# rf = Roboflow(api_key="7TI8mHPqsyInAEUtwLsa")
# project = rf.workspace("en-zbzva").project("all-pepper-datasets-q5opc")
# version = project.version(1)
# dataset = version.download("yolov9")

# dataset_dir = dataset.location
# train_dir_img = os.path.join(dataset_dir, "train", "images")
# val_dir_img   = os.path.join(dataset_dir, "valid", "images")
# test_dir_img  = os.path.join(dataset_dir, "test",  "images")

# train_dir_lbl = os.path.join(dataset_dir, "train", "labels")
# val_dir_lbl   = os.path.join(dataset_dir, "valid", "labels")
# test_dir_lbl  = os.path.join(dataset_dir, "test",  "labels")

# print("Roboflow source dataset:")
# print(" Train images:", len(os.listdir(train_dir_img)))
# print(" Valid images:", len(os.listdir(val_dir_img)))
# print(" Test  images:", len(os.listdir(test_dir_img)))

loading Roboflow workspace...
loading Roboflow project...


Downloading Dataset Version Zip in All-Pepper-Datasets-1 to yolov9:: 100%|██████████| 110813/110813 [00:07<00:00, 15121.31it/s]





Extracting Dataset Version Zip to All-Pepper-Datasets-1 in yolov9:: 100%|██████████| 4266/4266 [00:00<00:00, 6704.23it/s]


Roboflow source dataset:
 Train images: 1489
 Valid images: 427
 Test  images: 211


In [None]:
# # ============================================
# # 3. Classical augmentation functions
# #    (NO geometry change → labels stay valid)
# # ============================================

# def apply_weak_light(img):
#     """Simulate weak, cloudy light. No resize, no geometry change."""
#     img = ImageEnhance.Brightness(img).enhance(0.55)
#     img = ImageEnhance.Contrast(img).enhance(0.75)
#     img = ImageEnhance.Color(img).enhance(0.80)
#     return img

# def apply_foggy_base(img):
#     """Slightly soften contrast/colors before fog overlay."""
#     img = ImageEnhance.Brightness(img).enhance(1.05)
#     img = ImageEnhance.Contrast(img).enhance(0.65)
#     img = ImageEnhance.Color(img).enhance(0.75)
#     return img

# def add_fog_overlay(img, intensity=0.55):
#     """
#     Add a fog overlay using alpha-composited white veil.
#     No resizing, no cropping.
#     """
#     w, h = img.size
#     fog = Image.new("RGBA", img.size, (255, 255, 255, 0))
#     fog_px = fog.load()

#     for i in range(w):
#         for j in range(h):
#             # Denser fog towards top of image (can tweak)
#             alpha = int(255 * intensity * np.exp(-((j / h) * 1.5)))
#             fog_px[i, j] = (255, 255, 255, alpha)

#     fog = fog.filter(ImageFilter.GaussianBlur(10))
#     return Image.alpha_composite(img.convert("RGBA"), fog).convert("RGB")

# def augment_classical(img_path, out_path, mode):
#     """
#     mode: 'weak' or 'foggy'
#     img_path: source image path
#     out_path: destination image path
#     """
#     img = Image.open(img_path).convert("RGB")  # keep original resolution

#     if mode == "weak":
#         img = apply_weak_light(img)
#     elif mode == "foggy":
#         img = apply_foggy_base(img)
#         img = add_fog_overlay(img, intensity=0.55)
#     else:
#         raise ValueError("mode must be 'weak' or 'foggy'")

#     img.save(out_path, quality=95)


In [None]:


# ============================================
# RANDOMIZED Weak-Light
# ============================================
def apply_weak_light(img):
    """Randomized weak-light augmentation (safe for labels)."""

    # Random ranges (feel free to tweak)
    brightness = random.uniform(0.45, 0.75)
    contrast   = random.uniform(0.60, 0.90)
    color      = random.uniform(0.70, 0.90)
    sharpness  = random.uniform(0.70, 1.00)

    img = ImageEnhance.Brightness(img).enhance(brightness)
    img = ImageEnhance.Contrast(img).enhance(contrast)
    img = ImageEnhance.Color(img).enhance(color)
    img = ImageEnhance.Sharpness(img).enhance(sharpness)

    return img



# ============================================
# RANDOMIZED Foggy base (before fog overlay)
# ============================================
def apply_foggy_base(img):
    """Randomized fog-prep adjustment."""
    brightness = random.uniform(1.00, 1.15)
    contrast   = random.uniform(0.55, 0.75)
    color      = random.uniform(0.65, 0.80)

    img = ImageEnhance.Brightness(img).enhance(brightness)
    img = ImageEnhance.Contrast(img).enhance(contrast)
    img = ImageEnhance.Color(img).enhance(color)

    return img


# ============================================
# RANDOMIZED Fog Overlay
# ============================================
def add_fog_overlay(img, intensity=None):
    """
    Add fog overlay with random intensity and random blur.
    intensity: if None, choose random.
    """
    if intensity is None:
        intensity = random.uniform(0.35, 0.70)   # fog thickness

    blur_amt = random.uniform(5, 12)            # background fog blur
    grad_exp = random.uniform(1.0, 1.8)         # fog depth profile

    w, h = img.size
    fog = Image.new("RGBA", (w, h), (255,255,255,0))
    px = fog.load()

    for i in range(w):
        for j in range(h):
            # randomized gradient exponent
            alpha = int(255 * intensity * np.exp(-((j/h) * grad_exp)))
            px[i, j] = (255,255,255,alpha)

    fog = fog.filter(ImageFilter.GaussianBlur(blur_amt))
    return Image.alpha_composite(img.convert("RGBA"), fog).convert("RGB")



# ============================================
# FINAL augmentation function
# ============================================
def augment_classical(img_path, out_path, mode):
    """
    mode: 'weak' or 'foggy'
    Randomized classical augmentation.
    """
    img = Image.open(img_path).convert("RGB")

    if mode == "weak":
        img = apply_weak_light(img)

    elif mode == "foggy":
        img = apply_foggy_base(img)
        img = add_fog_overlay(img)   # fog is randomized each time

    else:
        raise ValueError("mode must be 'weak' or 'foggy'")

    img.save(out_path, quality=95)


In [None]:
# ============================================
# 4. Output dataset roots in Google Drive
# ============================================
root_weak  = "/content/drive/MyDrive/pepper_weaklight"
root_foggy = "/content/drive/MyDrive/pepper_foggy"

for root in [root_weak, root_foggy]:
    for split in ["train", "valid", "test_roboflow"]:
        os.makedirs(os.path.join(root, split, "images"), exist_ok=True)
        os.makedirs(os.path.join(root, split, "labels"), exist_ok=True)

print("Output roots:")
print(" Weak-light:", root_weak)
print(" Foggy     :", root_foggy)



Output roots:
 Weak-light: /content/drive/MyDrive/pepper_weaklight
 Foggy     : /content/drive/MyDrive/pepper_foggy


In [None]:
# ============================================
# 5. Split processor with SKIP logic
# ============================================
def process_split_classical(src_img_dir, dst_root, mode, split_name):
    """
    src_img_dir: original Roboflow images (train/valid/test)
    dst_root: root_weak or root_foggy
    mode: 'weak' or 'foggy'
    split_name: 'train' / 'valid' / 'test_roboflow'
    """
    dst_img_dir = os.path.join(dst_root, split_name, "images")
    os.makedirs(dst_img_dir, exist_ok=True)

    src_files = sorted([
        f for f in os.listdir(src_img_dir)
        if f.lower().endswith((".jpg", ".png", ".jpeg"))
    ])
    dst_files = sorted([
        f for f in os.listdir(dst_img_dir)
        if f.lower().endswith((".jpg", ".png", ".jpeg"))
    ])

    if len(dst_files) >= len(src_files) and len(src_files) > 0:
        print(f"[{mode}] {split_name}: {len(dst_files)}/{len(src_files)} images already exist → SKIP")
        return

    print(f"[{mode}] {split_name}: generating {len(src_files)} images...")

    for fname in src_files:
        in_path  = os.path.join(src_img_dir, fname)
        out_path = os.path.join(dst_img_dir, fname)

        if os.path.exists(out_path):
            continue

        augment_classical(in_path, out_path, mode)

In [None]:
# # ============================================
# # 5. Split processor with SKIP logic
# # ============================================
# def process_split_classical(src_img_dir, dst_root, mode, split_name):
#     """
#     src_img_dir: original Roboflow images (train/valid/test)
#     dst_root: root_weak or root_foggy
#     mode: 'weak' or 'foggy'
#     split_name: 'train' / 'valid' / 'test_roboflow'
#     """
#     dst_img_dir = os.path.join(dst_root, split_name, "images")
#     os.makedirs(dst_img_dir, exist_ok=True)

#     src_files = sorted([
#         f for f in os.listdir(src_img_dir)
#         if f.lower().endswith((".jpg", ".png", ".jpeg"))
#     ])
#     dst_files = sorted([
#         f for f in os.listdir(dst_img_dir)
#         if f.lower().endswith((".jpg", ".png", ".jpeg"))
#     ])

#     if len(dst_files) >= len(src_files) and len(src_files) > 0:
#         print(f"[{mode}] {split_name}: {len(dst_files)}/{len(src_files)} images already exist → SKIP")
#         return

#     print(f"[{mode}] {split_name}: generating {len(src_files)} images...")

#     for fname in src_files:
#         in_path  = os.path.join(src_img_dir, fname)
#         out_path = os.path.join(dst_img_dir, fname)

#         if os.path.exists(out_path):
#             continue

#         augment_classical(in_path, out_path, mode)

In [None]:
# ============================================
# 6. Generate Weak-Light dataset (classical)
# ============================================
process_split_classical(train_dir, root_weak,  "weak",  "train")
process_split_classical(val_dir,   root_weak,  "weak",  "valid")
process_split_classical(test_dir,  root_weak,  "weak",  "test_roboflow")

# ============================================
# 7. Generate Foggy dataset (classical)
# ============================================
process_split_classical(train_dir, root_foggy, "foggy", "train")
process_split_classical(val_dir,   root_foggy, "foggy", "valid")
process_split_classical(test_dir,  root_foggy, "foggy", "test_roboflow")

print("Finished generating weak-light & foggy datasets!")

[weak] train: generating 1489 images...
[weak] valid: generating 427 images...
[weak] test_roboflow: generating 211 images...
[foggy] train: generating 1489 images...
[foggy] valid: generating 427 images...
[foggy] test_roboflow: generating 211 images...
Finished generating weak-light & foggy datasets!


In [None]:
# # ============================================
# # 6. Generate Weak-Light dataset (classical)
# # ============================================
# process_split_classical(train_imgs, root_weak,  "weak",  "train")
# process_split_classical(val_imgs,   root_weak,  "weak",  "valid")
# process_split_classical(test_imgs,  root_weak,  "weak",  "test_roboflow")

# # ============================================
# # 7. Generate Foggy dataset (classical)
# # ============================================
# process_split_classical(train_imgs, root_foggy, "foggy", "train")
# process_split_classical(val_imgs,   root_foggy, "foggy", "valid")
# process_split_classical(test_imgs,  root_foggy, "foggy", "test_roboflow")

# print("Finished generating weak-light & foggy datasets!")

TypeError: listdir: path should be string, bytes, os.PathLike, integer or None, not list

In [None]:
# # ============================================
# # 8. Copy labels (labels are reusable!)
# # ============================================
# def copy_labels(src_lbl_dir, dst_root, split_name):
#     dst_lbl_dir = os.path.join(dst_root, split_name, "labels")
#     os.makedirs(dst_lbl_dir, exist_ok=True)

#     src_files = [f for f in os.listdir(src_lbl_dir) if f.endswith(".txt")]
#     dst_files = [f for f in os.listdir(dst_lbl_dir) if f.endswith(".txt")]

#     if len(dst_files) >= len(src_files) and len(src_files) > 0:
#         print(f"[LABELS] {os.path.basename(dst_root)} {split_name}: already copied → SKIP")
#         return

#     print(f"[LABELS] {os.path.basename(dst_root)} {split_name}: copying {len(src_files)} labels...")
#     for f in src_files:
#         shutil.copy(
#             os.path.join(src_lbl_dir, f),
#             os.path.join(dst_lbl_dir, f)
#         )

# # Weak-light labels
# copy_labels(train_dir_lbl, root_weak,  "train")
# copy_labels(val_dir_lbl,   root_weak,  "valid")
# copy_labels(test_dir_lbl,  root_weak,  "test_roboflow")

# # Foggy labels
# copy_labels(train_dir_lbl, root_foggy, "train")
# copy_labels(val_dir_lbl,   root_foggy, "valid")
# copy_labels(test_dir_lbl,  root_foggy, "test_roboflow")

# print(" All labels copied.")

TypeError: listdir: path should be string, bytes, os.PathLike, integer or None, not list

In [None]:
# ============================================
# 8. Copy labels (labels are reusable!)
# ============================================
def copy_labels(src_lbl_list, dst_root, split_name):
    """
    src_lbl_list: a list of full .xml label paths
    dst_root:     root_weak or root_foggy
    split_name:   train / valid / test_roboflow
    """
    dst_lbl_dir = os.path.join(dst_root, split_name, "labels")
    os.makedirs(dst_lbl_dir, exist_ok=True)

    # Extract just the filenames that should exist in dst
    src_files = [os.path.basename(f) for f in src_lbl_list]
    dst_files = [f for f in os.listdir(dst_lbl_dir) if f.endswith(".xml")]

    # Skip if already copied
    if len(dst_files) >= len(src_files) and len(src_files) > 0:
        print(f"[LABELS] {os.path.basename(dst_root)} {split_name}: already copied → SKIP")
        return

    print(f"[LABELS] {os.path.basename(dst_root)} {split_name}: copying {len(src_files)} labels...")

    # Copy each xml
    for lbl_path in src_lbl_list:
        fname = os.path.basename(lbl_path)
        shutil.copy(lbl_path, os.path.join(dst_lbl_dir, fname))


# Weak-light labels
copy_labels(train_dir_lbl, root_weak,  "train")
copy_labels(val_dir_lbl,   root_weak,  "valid")
copy_labels(test_dir_lbl,  root_weak,  "test_roboflow")

# Foggy labels
copy_labels(train_dir_lbl, root_foggy, "train")
copy_labels(val_dir_lbl,   root_foggy, "valid")
copy_labels(test_dir_lbl,  root_foggy, "test_roboflow")

print(" All labels copied.")

[LABELS] pepper_weaklight train: copying 1489 labels...
[LABELS] pepper_weaklight valid: copying 427 labels...
[LABELS] pepper_weaklight test_roboflow: copying 211 labels...
[LABELS] pepper_foggy train: copying 1489 labels...
[LABELS] pepper_foggy valid: copying 427 labels...
[LABELS] pepper_foggy test_roboflow: copying 211 labels...
 All labels copied.


In [None]:
# # ============================================
# # 8. Copy YOLO labels (labels are reusable!)
# # ============================================
# def copy_labels(src_lbl_dir, dst_root, split_name):
#     dst_lbl_dir = os.path.join(dst_root, split_name, "labels")
#     os.makedirs(dst_lbl_dir, exist_ok=True)

#     src_files = [f for f in os.listdir(src_lbl_dir) if f.endswith(".txt")]
#     dst_files = [f for f in os.listdir(dst_lbl_dir) if f.endswith(".txt")]

#     if len(dst_files) >= len(src_files) and len(src_files) > 0:
#         print(f"[LABELS] {os.path.basename(dst_root)} {split_name}: already copied → SKIP")
#         return

#     print(f"[LABELS] {os.path.basename(dst_root)} {split_name}: copying {len(src_files)} labels...")
#     for f in src_files:
#         shutil.copy(
#             os.path.join(src_lbl_dir, f),
#             os.path.join(dst_lbl_dir, f)
#         )

# # Weak-light labels
# copy_labels(train_dir_lbl, root_weak,  "train")
# copy_labels(val_dir_lbl,   root_weak,  "valid")
# copy_labels(test_dir_lbl,  root_weak,  "test_roboflow")

# # Foggy labels
# copy_labels(train_dir_lbl, root_foggy, "train")
# copy_labels(val_dir_lbl,   root_foggy, "valid")
# copy_labels(test_dir_lbl,  root_foggy, "test_roboflow")

# print(" All labels copied.")

In [None]:
def make_split_txt(img_dir, save_path):
    """
    img_dir: folder containing .jpg images
    save_path: full path to output txt file
    """
    # Collect JPG filenames
    names = sorted([f for f in os.listdir(img_dir) if f.lower().endswith(".jpg")])

    # Remove ".jpg" extension
    names_no_ext = [os.path.splitext(f)[0] for f in names]

    # Save to txt file
    with open(save_path, "w") as f:
        for n in names_no_ext:
            f.write(n + "\n")

    print(f"✓ Saved {len(names_no_ext)} entries → {save_path}")

In [None]:
# Create a folder inside MyDrive for output
txt_out_dir = "/content/drive/MyDrive/pepper_lists"
os.makedirs(txt_out_dir, exist_ok=True)

# Full save paths
train_txt = os.path.join(txt_out_dir, "train.txt")
val_txt   = os.path.join(txt_out_dir, "val.txt")
test_txt  = os.path.join(txt_out_dir, "test.txt")

# Generate the txt files
make_split_txt(train_dir, train_txt)
make_split_txt(val_dir,   val_txt)
make_split_txt(test_dir,  test_txt)

✓ Saved 1489 entries → /content/drive/MyDrive/pepper_lists/train.txt
✓ Saved 427 entries → /content/drive/MyDrive/pepper_lists/val.txt
✓ Saved 211 entries → /content/drive/MyDrive/pepper_lists/test.txt


In [None]:
# ================================
# DataLoaders
# ================================
transform = transforms.Compose([
    transforms.ToTensor()
])

class PepperDataset(Dataset):
    def __init__(self, img_dir):
        self.img_dir = img_dir
        self.files = sorted([f for f in os.listdir(img_dir) if f.lower().endswith((".jpg",".png",".jpeg"))])

    def __len__(self):
        return len(self.files)

    def __getitem__(self, idx):
        fname = self.files[idx]
        img = Image.open(os.path.join(self.img_dir, fname)).convert("RGB")
        return transform(img), fname

def make_loaders(root, batch=16):
    return (
        DataLoader(PepperDataset(os.path.join(root,"train","images")), batch_size=batch, shuffle=True),
        DataLoader(PepperDataset(os.path.join(root,"valid","images")), batch_size=batch, shuffle=False),
        DataLoader(PepperDataset(os.path.join(root,"test_roboflow","images")),  batch_size=batch, shuffle=False),
    )

foggy_train_loader, foggy_val_loader, foggy_test_loader = make_loaders(root_foggy)
weak_train_loader,  weak_val_loader,  weak_test_loader  = make_loaders(root_weak)

print("All dataloaders ready!")


All dataloaders ready!
