In [1]:
# If you are in a fresh environment (e.g., Colab / new venv), uncomment:
!pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu121
!pip install diffusers transformers accelerate safetensors
!pip install pillow kagglehub opencv-python


Looking in indexes: https://download.pytorch.org/whl/cu121


In [2]:
import os
import random
import shutil
from pathlib import Path
from glob import glob

import numpy as np
import cv2
from PIL import Image, ImageDraw, ImageFilter

import kagglehub
import torch
from diffusers import StableDiffusionPipeline


  from .autonotebook import tqdm as notebook_tqdm


In [11]:
# --------------------------
# WHERE TO SAVE OUTPUT
# --------------------------
OUT_ROOT = Path("/content/kitti_dirty_subset_run4")   # change this to any folder you want
OVERWRITE_OUTPUT = True                          # delete OUT_ROOT before writing

# --------------------------
# DATASET SOURCE
# --------------------------
KAGGLE_DATASET_ID = "klemenko/kitti-dataset"
SUBSET_SIZE = 100                                # how many KITTI images to export
SAVE_CLEAN = False                               # also save clean copies (optional)
CLEAN_DIR_NAME = "image_2_clean"
DIRTY_DIR_NAME = "image_2"                       # keep KITTI-like folder name

# --------------------------
# TEXTURE GENERATION (Stable Diffusion)
# --------------------------
USE_SD_TEXTURES = True
SD_MODEL_ID = "runwayml/stable-diffusion-v1-5"
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
DTYPE = torch.float16 if DEVICE == "cuda" else torch.float32

TEXTURES_DIR = Path("/content/mud_textures")     # where textures are stored/created
N_TEXTURES_TO_GENERATE = 12
TEXTURE_SIZE = 512

MUD_PROMPT = "photorealistic mud splatter on transparent background, wet dirt texture, high detail, isolated"
MUD_NEGATIVE = "scene, background, people, objects, text, watermark, cartoon, illustration, blurry"

PROMPT_FILE = None  # optional path to a prompt file
NEG_FILE = None     # optional path to a negative prompt file

# --------------------------
# SOILING / LENS EFFECTS
# --------------------------
# Global strength controls
ALPHA_STRENGTH = 1.35
EDGE_SOFTNESS = 4
DARKEN_STRENGTH = 0.20
BLUR_STRENGTH = 5
HAZE_STRENGTH = 0.07

# Per-image random contamination amount
CONTAMINATION_LEVEL_MIN = 1
CONTAMINATION_LEVEL_MAX = 5

# Multi-scale mask recipe (tweak this for more diversity)
PRESETS = {
    "light": {
        "alpha": 1.30,
        "edge_soft": 3,
        "darken": 0.18,
        "blur": 4,
        "haze": 0.07,
        "mask": {
            "spot_specs": (
                (90,  1, 4,  5, 0.20, 100, 180),
                (22,  5, 11, 8, 0.25, 130, 220),
                (3,  12, 24, 10, 0.30, 160, 255),
            ),
            "base_blur": 1,
            "edge_blur": 4,
            "add_streaks": False,
        },
    },
    "medium": {
        "alpha": 1.45,
        "edge_soft": 4,
        "darken": 0.28,
        "blur": 5,
        "haze": 0.1,
        "mask": {
            "spot_specs": (
                (160, 1, 5,  5, 0.20, 110, 190),
                (55,  6, 14, 8, 0.25, 140, 230),
                (8,  14, 32, 10, 0.30, 170, 255),
            ),
            "base_blur": 1,
            "edge_blur": 5,
            "add_streaks": True,
        },
    },
    "heavy": {
        "alpha": 1.80,
        "edge_soft": 5,
        "darken": 0.35,
        "blur": 6,
        "haze": 0.13,
        "mask": {
            "spot_specs": (
                (260, 1, 6,  6, 0.22, 120, 210),
                (85,  7, 18, 10, 0.25, 150, 240),
                (14, 18, 45, 12, 0.30, 180, 255),
            ),
            "base_blur": 2,
            "edge_blur": 6,
            "add_streaks": True,
        },
    },
}

# distribution of severities per image
SEVERITY_WEIGHTS = [0.55, 0.35, 0.10]  # light, medium, heavy


In [4]:
def download_kitti_kagglehub(dataset_id: str) -> Path:
    path = kagglehub.dataset_download(dataset_id)
    return Path(path)

def find_kitti_dirs(root: Path):
    img_dir = root / "data_object_image_2" / "training" / "image_2"
    lbl_dir = root / "data_object_label_2" / "training" / "label_2"

    if img_dir.exists() and lbl_dir.exists():
        return img_dir, lbl_dir

    candidates_img = list(root.rglob("training/image_2"))
    candidates_lbl = list(root.rglob("training/label_2"))

    if not candidates_img or not candidates_lbl:
        raise FileNotFoundError(f"Could not find KITTI image_2 / label_2 under: {root}")

    return candidates_img[0], candidates_lbl[0]

kitti_root = download_kitti_kagglehub(KAGGLE_DATASET_ID)
img_dir, lbl_dir = find_kitti_dirs(kitti_root)

images = sorted(Path(img_dir).glob("*.png"))
labels = sorted(Path(lbl_dir).glob("*.txt"))

print("KITTI root:", kitti_root)
print("Image dir:", img_dir)
print("Label dir:", lbl_dir)
print("Total images:", len(images))
print("Total labels:", len(labels))


KITTI root: C:\Users\yuval\.cache\kagglehub\datasets\klemenko\kitti-dataset\versions\1
Image dir: C:\Users\yuval\.cache\kagglehub\datasets\klemenko\kitti-dataset\versions\1\data_object_image_2\training\image_2
Label dir: C:\Users\yuval\.cache\kagglehub\datasets\klemenko\kitti-dataset\versions\1\data_object_label_2\training\label_2
Total images: 7481
Total labels: 7481


In [5]:
def white_to_alpha(img: Image.Image, threshold: int = 240) -> Image.Image:
    img = img.convert("RGBA")
    data = np.array(img)
    r, g, b, a = data.T
    white = (r > threshold) & (g > threshold) & (b > threshold)
    data[..., 3][white.T] = 0
    return Image.fromarray(data)

def load_prompt_files(prompt_file, neg_file, default_prompt, default_negative):
    prompt = default_prompt
    negative = default_negative
    if prompt_file and Path(prompt_file).exists():
        prompt = Path(prompt_file).read_text(encoding="utf-8")
    if neg_file and Path(neg_file).exists():
        negative = Path(neg_file).read_text(encoding="utf-8")
    return prompt, negative

def init_sd_pipeline(model_id: str, device: str, dtype):
    pipe = StableDiffusionPipeline.from_pretrained(model_id, torch_dtype=dtype)
    pipe = pipe.to(device)
    pipe.safety_checker = None
    try:
        pipe.enable_attention_slicing()
    except Exception:
        pass
    return pipe

def generate_textures(pipe, prompt: str, negative: str, amount: int, out_dir: Path, start_from: int = 0, size: int = 512):
    out_dir.mkdir(parents=True, exist_ok=True)
    for i in range(start_from, start_from + amount):
        img = pipe(
            prompt=prompt,
            negative_prompt=negative,
            height=size,
            width=size,
            guidance_scale=7
        ).images[0]
        img = white_to_alpha(img)
        img.save(out_dir / f"texture_{i:04d}.png")


In [6]:
TEXTURES_DIR.mkdir(parents=True, exist_ok=True)

if USE_SD_TEXTURES:
    prompt, negative = load_prompt_files(PROMPT_FILE, NEG_FILE, MUD_PROMPT, MUD_NEGATIVE)
    pipe = init_sd_pipeline(SD_MODEL_ID, DEVICE, DTYPE)

    existing = sorted(TEXTURES_DIR.glob("*.png"))
    start_from = len(existing)
    to_make = max(0, N_TEXTURES_TO_GENERATE - len(existing))

    if to_make > 0:
        generate_textures(pipe, prompt, negative, to_make, TEXTURES_DIR, start_from=start_from, size=TEXTURE_SIZE)

    print("Textures available:", len(list(TEXTURES_DIR.glob("*.png"))))
else:
    print("Skipping SD texture generation. Using existing textures in:", TEXTURES_DIR)


Loading pipeline components...:   0%|          | 0/7 [00:00<?, ?it/s]`torch_dtype` is deprecated! Use `dtype` instead!
Loading pipeline components...: 100%|██████████| 7/7 [00:01<00:00,  3.78it/s]
100%|██████████| 50/50 [00:11<00:00,  4.44it/s]
100%|██████████| 50/50 [00:10<00:00,  4.74it/s]
100%|██████████| 50/50 [00:10<00:00,  4.68it/s]
100%|██████████| 50/50 [00:11<00:00,  4.54it/s]
100%|██████████| 50/50 [00:11<00:00,  4.33it/s]
100%|██████████| 50/50 [00:11<00:00,  4.27it/s]
100%|██████████| 50/50 [00:11<00:00,  4.38it/s]
100%|██████████| 50/50 [00:11<00:00,  4.43it/s]
100%|██████████| 50/50 [00:11<00:00,  4.51it/s]
100%|██████████| 50/50 [00:10<00:00,  4.73it/s]
100%|██████████| 50/50 [00:10<00:00,  4.70it/s]
100%|██████████| 50/50 [00:10<00:00,  4.70it/s]


Textures available: 12


In [12]:
def generate_contamination_mask_multiscale(
    size,
    rng: np.random.Generator,
    spot_specs=(
        (260, 3, 10, 6, 0.22, 140, 220),
        (110, 10, 26, 10, 0.25, 160, 240),
        (20,  26, 70, 14, 0.30, 200, 255),
    ),
    base_blur=2,
    edge_blur=6,
    add_streaks=True,
):
    w, h = size
    mask = Image.new("L", (w, h), 0)
    draw = ImageDraw.Draw(mask)

    for (count, min_r, max_r, sub_n, sub_rf, inten_min, inten_max) in spot_specs:
        for _ in range(int(count)):
            main_r = int(rng.integers(min_r, max_r + 1))
            cx = int(rng.integers(main_r, max(main_r + 1, w - main_r)))
            cy = int(rng.integers(main_r, max(main_r + 1, h - main_r)))
            intensity = int(rng.integers(inten_min, inten_max + 1))

            for _ in range(int(sub_n)):
                ox = int(rng.integers(-main_r, main_r + 1))
                oy = int(rng.integers(-main_r, main_r + 1))
                sub_r = max(1, int(main_r * sub_rf * float(rng.uniform(0.5, 1.6))))

                sx = cx + ox
                sy = cy + oy

                x1 = max(0, sx - sub_r)
                y1 = max(0, sy - sub_r)
                x2 = min(w, sx + sub_r)
                y2 = min(h, sy + sub_r)

                if x1 < x2 and y1 < y2:
                    draw.ellipse((x1, y1, x2, y2), fill=intensity)

    if add_streaks:
        n_streaks = int(rng.integers(6, 18))
        for _ in range(n_streaks):
            x1 = int(rng.integers(0, w))
            y1 = int(rng.integers(0, h))
            length = int(rng.integers(int(0.10 * w), int(0.45 * w)))
            angle = float(rng.uniform(-0.6, 0.6))
            thickness = int(rng.integers(1, 4))
            intensity = int(rng.integers(40, 120))

            x2 = int(np.clip(x1 + length * np.cos(angle), 0, w - 1))
            y2 = int(np.clip(y1 + length * np.sin(angle), 0, h - 1))
            draw.line((x1, y1, x2, y2), fill=intensity, width=thickness)

    if base_blur > 0:
        mask = mask.filter(ImageFilter.GaussianBlur(base_blur))
    if edge_blur > 0:
        mask = mask.filter(ImageFilter.GaussianBlur(edge_blur))

    return mask


def apply_refined_soiling_v2(
    clean_img: Image.Image,
    texture_img: Image.Image,
    rng: np.random.Generator,
    alpha_strength=1.8,
    edge_softness=5,
    darken_strength=0.25,
    blur_strength=7,
    haze_strength=0.10,
    mask_specs=None,
):
    clean_img = clean_img.convert("RGB")
    w, h = clean_img.size

    if mask_specs is None:
        mask_specs = {}

    mask_raw = generate_contamination_mask_multiscale((w, h), rng=rng, **mask_specs)
    mask_np = np.array(mask_raw).astype(np.float32) / 255.0

    if edge_softness > 0:
        k = int(edge_softness) * 2 + 1
        mask_np = cv2.GaussianBlur(mask_np, (k, k), 0)

    mask_np = np.clip(mask_np * float(alpha_strength), 0.0, 1.0)

    tex = texture_img.resize((w, h)).convert("RGBA")
    tex_np = np.array(tex).astype(np.float32)
    tex_rgb = tex_np[..., :3]
    tex_a = tex_np[..., 3] / 255.0

    m = np.clip(mask_np * np.clip(tex_a * 0.85, 0.0, 1.0), 0.0, 1.0)
    m3 = m[..., None]

    I = np.array(clean_img).astype(np.float32)

    dirty = (1.0 - m3) * I + m3 * tex_rgb

    if darken_strength > 0:
        dirty *= (1.0 - float(darken_strength) * m3)

    if blur_strength and blur_strength > 0:
        k = int(blur_strength) * 2 + 1
        blurred = cv2.GaussianBlur(dirty, (k, k), 0)
        dirty = (1.0 - m3) * dirty + m3 * blurred

    if haze_strength > 0:
        dirty = (1.0 - float(haze_strength) * m3) * dirty + (float(haze_strength) * m3) * 255.0

    return Image.fromarray(np.uint8(np.clip(dirty, 0, 255)))


In [13]:
def build_dirty_kitti_subset(
    img_dir: Path,
    lbl_dir: Path,
    out_root: Path,
    textures_dir: Path,
    n_samples: int,
    save_clean: bool,
    clean_dir_name: str,
    dirty_dir_name: str,
    overwrite_output: bool,
):
    # FULLY RANDOM: no fixed seeds
    rng = np.random.default_rng()

    if overwrite_output and out_root.exists():
        shutil.rmtree(out_root)

    out_img_dir = out_root / "training" / dirty_dir_name
    out_lbl_dir = out_root / "training" / "label_2"
    out_img_dir.mkdir(parents=True, exist_ok=True)
    out_lbl_dir.mkdir(parents=True, exist_ok=True)

    out_clean_dir = None
    if save_clean:
        out_clean_dir = out_root / "training" / clean_dir_name
        out_clean_dir.mkdir(parents=True, exist_ok=True)

    all_imgs = sorted(Path(img_dir).glob("*.png"))
    if len(all_imgs) == 0:
        raise FileNotFoundError(f"No images found in {img_dir}")

    textures = sorted(Path(textures_dir).glob("*.png"))
    if len(textures) == 0:
        raise FileNotFoundError(f"No textures found in {textures_dir}. Generate or add PNG textures first.")

    chosen = random.sample(all_imgs, k=min(n_samples, len(all_imgs)))
    missing_labels = 0

    for img_path in chosen:
        stem = img_path.stem
        lbl_path = Path(lbl_dir) / f"{stem}.txt"
        if not lbl_path.exists():
            missing_labels += 1
            continue

        clean_img = Image.open(img_path)
        tex_img = Image.open(random.choice(textures))

        # Choose severity per image (light/medium/heavy)
        severity = random.choices(
            ["light", "medium", "heavy"],
            weights=SEVERITY_WEIGHTS,
            k=1
        )[0]
        p = PRESETS[severity]

        dirty_img = apply_refined_soiling_v2(
            clean_img=clean_img,
            texture_img=tex_img,
            rng=rng,
            alpha_strength=p["alpha"],
            edge_softness=p["edge_soft"],
            darken_strength=p["darken"],
            blur_strength=p["blur"],
            haze_strength=p["haze"],
            mask_specs=p["mask"],
        )

        dirty_img.save(out_img_dir / f"{stem}.png", format="PNG")

        if save_clean and out_clean_dir is not None:
            clean_img.convert("RGB").save(out_clean_dir / f"{stem}.png", format="PNG")

        shutil.copy2(lbl_path, out_lbl_dir / f"{stem}.txt")

    print(f"Done. Saved subset to: {out_root}")
    print(f"Missing labels skipped: {missing_labels}")
    print(f"Dirty images saved: {len(list(out_img_dir.glob('*.png')))}")
    print(f"Labels saved: {len(list(out_lbl_dir.glob('*.txt')))}")


In [14]:
build_dirty_kitti_subset(
    img_dir=Path(img_dir),
    lbl_dir=Path(lbl_dir),
    out_root=OUT_ROOT,
    textures_dir=TEXTURES_DIR,
    n_samples=SUBSET_SIZE,
    save_clean=SAVE_CLEAN,
    clean_dir_name=CLEAN_DIR_NAME,
    dirty_dir_name=DIRTY_DIR_NAME,
    overwrite_output=OVERWRITE_OUTPUT,
)


Done. Saved subset to: \content\kitti_dirty_subset_run4
Missing labels skipped: 0
Dirty images saved: 100
Labels saved: 100


In [16]:
out_img_dir = OUT_ROOT / "training" / DIRTY_DIR_NAME
out_lbl_dir = OUT_ROOT / "training" / "label_2"

out_imgs = sorted(out_img_dir.glob("*.png"))
out_lbls = sorted(out_lbl_dir.glob("*.txt"))

img_stems = set(p.stem for p in out_imgs)
lbl_stems = set(p.stem for p in out_lbls)

print("Images:", len(out_imgs))
print("Labels:", len(out_lbls))
print("Mismatched (images without labels):", len(img_stems - lbl_stems))
print("Mismatched (labels without images):", len(lbl_stems - img_stems))

if out_imgs:
    print("Example:", out_imgs[0].name, "<->", (out_lbl_dir / (out_imgs[0].stem + ".txt")).name)


Images: 100
Labels: 100
Mismatched (images without labels): 0
Mismatched (labels without images): 0
Example: 000008.png <-> 000008.txt
