# **تمكّن - Tamakkan** Graduation project
## Intelligent platform for driver behavior analysis, training support, and automated license testing.
##**Group Member:**
 *  ### **Samar Rafat Kintab          443003122**
*  ### **Lina Mohammad Bader   444000417**
* ### **Lamar Bandar Felemban 444003576**
* ### **Bashair Fahad Al-jabri     444004184**

## **Supervised By: Dr. Eiman Talal Al-Harby**


# **BDD100K Dataset**

## Imports

In [1]:
import os
import json
import glob
import random
import shutil
from pathlib import Path
from collections import Counter, defaultdict

import numpy as np
import pandas as pd
from tqdm.auto import tqdm

## Mount Google Drive

In [2]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


## Configurations

In [3]:
# This is the only CONFIG in the whole code
CONFIG = {
    # dataset paths
    "train_root": "/content/drive/MyDrive/tamakkan/100k/train_split",  # recursive to loop the 14 folders
    "val_root":   "/content/drive/MyDrive/tamakkan/100k/val",
    "test_root":  "/content/drive/MyDrive/tamakkan/100k/test",

    # What we remove + final target data size
    "class_to_remove": "train",
    "target_total_size": 30000,  # total across train+val+test

    # Keep split proportions stable
    "keep_split_ratio": True,

    # Optional safety floors for val/test (so we don’t end up with 200 val images)
    # we can try to Set to 0 if we want pure proportional sampling.
    "min_val": 2000,
    "min_test": 2000,

    # Output YOLO dataset folder for final dataset
    "output_dir": "/content/drive/MyDrive/tamakkan/bdd100k_yolo_30k",

    # Image types - this is just in case
    "image_extensions": [".jpg", ".jpeg", ".png"],

    # Random seed (reproducible)
    "seed": 42,

    # Sampling strategy
    # "information_maximizing": rank by value score (rare-ish + diverse + hard conditions)
    "strategy": "information_maximizing",

    # Scoring knobs - we can change later if we want
    "score_weights": {
        "rarity": 6.0,        # reward rare classes (computed from data)
        "diversity": 3.0,     # reward class diversity
        "density": 0.08,      # reward more objects
        "night": 1.0,
        "dawn_dusk": 2.0,
        "bad_weather": 1.5,
        "rare_scene": 1.0,
    },

    # Scenes you want to give a little boost to
    "scene_bonus_list": ["residential", "parking lot", "tunnel", "gas stations"],
    "weather_bonus_list": ["rainy", "snowy", "foggy"],
}

random.seed(CONFIG["seed"])
np.random.seed(CONFIG["seed"])


print("Filter train-class images + undersample ALL to 30k")
print(f"Remove class: {CONFIG['class_to_remove']}")
print(f"Target total: {CONFIG['target_total_size']:,}")
print(f"Output dir:  {CONFIG['output_dir']}")


Filter train-class images + undersample ALL to 30k
Remove class: train
Target total: 30,000
Output dir:  /content/drive/MyDrive/tamakkan/bdd100k_yolo_30k


## Utilities

In [4]:
# Note: We use these through out the notebook so i put them here so we dont get duplicates

_JSON_CACHE = {}  # json_path

def load_json_cached(json_path: str):
    """Load JSON once and cache it (Drive is slow i found this method to be faster)"""
    if json_path in _JSON_CACHE:
        return _JSON_CACHE[json_path]
    try:
        with open(json_path, "r") as f:
            data = json.load(f)
    except Exception:
        data = None
    _JSON_CACHE[json_path] = data
    return data


def extract_box2d_objects(label_data):
    """Return list of objects that actually have box2d (this is for yolo)."""
    if not label_data or "frames" not in label_data:
        return []
    objs = []
    for frame in label_data.get("frames", []):
        for obj in frame.get("objects", []):
            if obj.get("box2d") is not None:
                objs.append(obj)
    return objs


def find_images(root_dir: str, recursive: bool):
    """Collect image paths under root_dir."""
    images = []
    if recursive:
        for ext in CONFIG["image_extensions"]:
            images.extend(glob.glob(os.path.join(root_dir, "**", f"*{ext}"), recursive=True))
    else:
        for ext in CONFIG["image_extensions"]:
            images.extend(glob.glob(os.path.join(root_dir, f"*{ext}")))
    return images


def build_pairs(root_dir: str, split: str, recursive: bool):
    """
    Match images with their JSON file (same stem).
    Returns list of dict samples: {split,img_path,json_path}
    """
    if not os.path.exists(root_dir):
        print(f"Missing folder: {root_dir}")
        return []

    images = find_images(root_dir, recursive=recursive)
    samples = []

    for img_path in tqdm(images, desc=f"Matching {split}"):
        json_path = str(Path(img_path).with_suffix(".json"))
        if os.path.exists(json_path):
            samples.append({
                "split": split,
                "img_path": img_path,
                "json_path": json_path
            })

    return samples

## Step 1: Scan & match

In [5]:
# We create one master list of matched samples across all splits.
print("Step 1: Scanning & matching dataset")

train_samples = build_pairs(CONFIG["train_root"], "train", recursive=True)
val_samples   = build_pairs(CONFIG["val_root"],   "val",   recursive=False)
test_samples  = build_pairs(CONFIG["test_root"],  "test",  recursive=False)

all_samples = train_samples + val_samples + test_samples

print("\nDataset matched pairs:")
print(f"  Train: {len(train_samples):,}")
print(f"  Val:   {len(val_samples):,}")
print(f"  Test:  {len(test_samples):,}")
print(f"  TOTAL: {len(all_samples):,}\n")

Step 1: Scanning & matching dataset


Matching train:   0%|          | 0/70000 [00:00<?, ?it/s]

Matching val:   0%|          | 0/10000 [00:00<?, ?it/s]

Matching test:   0%|          | 0/20000 [00:00<?, ?it/s]


Dataset matched pairs:
  Train: 70,000
  Val:   10,000
  Test:  20,000
  TOTAL: 100,000



## Step 2: remove images containing the 'train' class
(drop whole the image)

In [6]:
# if an image has a train box anywhere we delete the entire sample.
def sample_has_class(sample, class_name: str) -> bool:
    label = load_json_cached(sample["json_path"])
    objs = extract_box2d_objects(label)
    for o in objs:
        if o.get("category") == class_name:
            return True
    return False


print(f"Step 2: Removing any sample that contains class '{CONFIG['class_to_remove']}'")

kept = []
removed = []

for s in tqdm(all_samples, desc="Filtering train-class samples"):
    if sample_has_class(s, CONFIG["class_to_remove"]):
        removed.append(s)
    else:
        kept.append(s)

print("\nFiltering result:")
print(f"  Original: {len(all_samples):,}")
print(f"  Removed:  {len(removed):,} ({len(removed)/max(1,len(all_samples))*100:.2f}%)")
print(f"  Kept:     {len(kept):,}\n")

kept_by_split = {
    "train": [s for s in kept if s["split"] == "train"],
    "val":   [s for s in kept if s["split"] == "val"],
    "test":  [s for s in kept if s["split"] == "test"],
}

print("Kept breakdown (after removing train-class images):")
for sp in ["train","val","test"]:
    print(f"  {sp.upper():5s}: {len(kept_by_split[sp]):,}")
print()

Step 2: Removing any sample that contains class 'train'


Filtering train-class samples:   0%|          | 0/100000 [00:00<?, ?it/s]


Filtering result:
  Original: 100,000
  Removed:  145 (0.14%)
  Kept:     99,855

Kept breakdown (after removing train-class images):
  TRAIN: 69,895
  VAL  : 9,986
  TEST : 19,974



## Step 3: analyze data

In [7]:
# We store:
# categories + counts
# metadata: timeofday/scene/weather
# num_objects, diversity score
def shannon_diversity(categories):
    """Normalized Shannon entropy (0..1) as a diversity score."""
    if not categories:
        return 0.0
    counts = Counter(categories)
    total = sum(counts.values())
    probs = np.array([c/total for c in counts.values()], dtype=np.float64)
    entropy = -np.sum(probs * np.log2(probs + 1e-12))
    max_entropy = np.log2(len(counts)) if len(counts) > 1 else 1.0
    return float(entropy / max_entropy) if max_entropy > 0 else 0.0


print("Step 3: Analyzing samples (cached)")

analyzed = []
all_class_counts = Counter()

for s in tqdm(kept, desc="Analyzing kept samples"):
    label = load_json_cached(s["json_path"])
    if not label:
        continue

    objs = extract_box2d_objects(label)
    cats = [o.get("category","unknown") for o in objs]

    attrs = label.get("attributes", {}) if isinstance(label, dict) else {}
    timeofday = attrs.get("timeofday", "unknown")
    scene     = attrs.get("scene", "unknown")
    weather   = attrs.get("weather", "unknown")

    all_class_counts.update(cats)

    analyzed.append({
        **s,
        "categories": cats,
        "num_objects": len(cats),
        "diversity": shannon_diversity(cats),
        "timeofday": timeofday,
        "scene": scene,
        "weather": weather,
    })

print(f"\n Analyzed samples: {len(analyzed):,}")
print(f" Unique classes (after filter): {len(all_class_counts):,}\n")

Step 3: Analyzing samples (cached)


Analyzing kept samples:   0%|          | 0/99855 [00:00<?, ?it/s]


 Analyzed samples: 99,855
 Unique classes (after filter): 9



## Step 4: score samples (Intelligent undersampling)

In [8]:
# compute a score per image so we keep the “best learning signal” images
# rarity: favor classes that appear less often (inverse frequency)
# diversity: favor images with multiple classes
# density: favor images with more objects
# hard conditions: keep some night / dawn-dusk / rainy-foggy / rare scenes

# rarity is computed from the filtered dataset not hard-coded
def compute_rarity_bonus(categories, class_freq: Counter):
    """Reward rare classes using inverse frequency."""
    if not categories:
        return 0.0
    bonus = 0.0
    for c in set(categories):
        f = class_freq.get(c, 1)
        bonus += 1.0 / np.sqrt(f)  # gentle inverse freq
    return float(bonus)


W = CONFIG["score_weights"]

def compute_score(sample, class_freq: Counter):
    score = 0.0

    # 1) rarity bonus
    score += W["rarity"] * compute_rarity_bonus(sample["categories"], class_freq)

    # 2) diversity bonus
    score += W["diversity"] * sample["diversity"]

    # 3) density bonus
    score += W["density"] * sample["num_objects"]

    # 4) metadata bonuses
    if sample["timeofday"] == "night":
        score += W["night"]
    elif sample["timeofday"] == "dawn/dusk":
        score += W["dawn_dusk"]

    if sample["weather"] in CONFIG["weather_bonus_list"]:
        score += W["bad_weather"]

    if sample["scene"] in CONFIG["scene_bonus_list"]:
        score += W["rare_scene"]

    return float(score)


print("Step 4: Scoring images for intelligent undersampling")

for s in tqdm(analyzed, desc="Scoring"):
    s["score"] = compute_score(s, all_class_counts)

Step 4: Scoring images for intelligent undersampling


Scoring:   0%|          | 0/99855 [00:00<?, ?it/s]

##Step 5: undersample dataset to 30k
(split-aware)

In [9]:
# split ratios are stable by default
def choose_split_targets(total_target: int, split_counts: dict):
    """Compute how many samples to keep per split."""
    total_available = sum(split_counts.values())
    if total_target >= total_available:
        return {k: split_counts[k] for k in split_counts}

    if not CONFIG["keep_split_ratio"]:
        train_target = total_target - CONFIG["min_val"] - CONFIG["min_test"]
        train_target = max(0, train_target)
        return {
            "train": min(train_target, split_counts["train"]),
            "val":   min(CONFIG["min_val"], split_counts["val"]),
            "test":  min(CONFIG["min_test"], split_counts["test"]),
        }

    # proportional targets
    raw = {sp: int(round(total_target * (split_counts[sp] / total_available))) for sp in split_counts}

    # enforce minimums for val/test
    raw["val"]  = max(raw["val"],  CONFIG["min_val"])
    raw["test"] = max(raw["test"], CONFIG["min_test"])

    # adjust train to fit total_target
    used = raw["val"] + raw["test"]
    raw["train"] = max(0, total_target - used)

    # cap by available
    for sp in raw:
        raw[sp] = min(raw[sp], split_counts[sp])

    # if capping reduced total, fill the remaining from train if possible
    current_total = sum(raw.values())
    remaining = total_target - current_total
    if remaining > 0:
        can_add = min(remaining, split_counts["train"] - raw["train"])
        raw["train"] += max(0, can_add)

    return raw


split_counts = {
    "train": sum(1 for s in analyzed if s["split"] == "train"),
    "val":   sum(1 for s in analyzed if s["split"] == "val"),
    "test":  sum(1 for s in analyzed if s["split"] == "test"),
}

targets = choose_split_targets(CONFIG["target_total_size"], split_counts)

print("\n Target sizes (after filtering train-class images):")
for sp in ["train","val","test"]:
    print(f"  {sp.upper():5s}: target {targets[sp]:,} / available {split_counts[sp]:,}")
print(f"  TOTAL: {sum(targets.values()):,}\n")


def select_top_by_score(samples, k):
    """Pick top-k samples by score (highest first)."""
    if k >= len(samples):
        return samples
    samples_sorted = sorted(samples, key=lambda x: x["score"], reverse=True)
    return samples_sorted[:k]


print("Step 5: Selecting top scored samples per split")

selected = []
selected_by_split = {}

for sp in ["train","val","test"]:
    pool = [s for s in analyzed if s["split"] == sp]
    chosen = select_top_by_score(pool, targets[sp])
    selected_by_split[sp] = chosen
    selected.extend(chosen)

print("\n Selection done:")
for sp in ["train","val","test"]:
    print(f"  {sp.upper():5s}: {len(selected_by_split[sp]):,}")
print(f"  TOTAL: {len(selected):,}\n")


 Target sizes (after filtering train-class images):
  TRAIN: target 20,999 / available 69,895
  VAL  : target 3,000 / available 9,986
  TEST : target 6,001 / available 19,974
  TOTAL: 30,000

Step 5: Selecting top scored samples per split

 Selection done:
  TRAIN: 20,999
  VAL  : 3,000
  TEST : 6,001
  TOTAL: 30,000



## Step 6: build class list and YOLO label conversion

In [10]:
# YOLO needs:
# labels as .txt in format: <class_id> <x_center> <y_center> <w> <h>
# all values normalized by image width/height

# We create the class list from the dataset automatically.

def build_class_list(samples):
    all_cats = Counter()
    for s in samples:
        all_cats.update(s["categories"])
    classes = sorted(all_cats.keys())
    return classes, all_cats


classes, class_freq_selected = build_class_list(selected)
class_to_id = {c:i for i,c in enumerate(classes)}

print("Step 6: Building YOLO class mapping")
print(f"Classes used (count={len(classes)}):")
print(classes[:30], "..." if len(classes) > 30 else "")
print()


def clamp(v, lo, hi):
    return max(lo, min(hi, v))


def json_to_yolo_lines(json_path, img_w, img_h, class_to_id):
    """
    Convert one BDD100K label JSON to YOLO lines.
    Returns list[str], each is: "id xc yc w h"
    """
    label = load_json_cached(json_path)
    if not label:
        return []

    objs = extract_box2d_objects(label)
    lines = []

    for o in objs:
        cat = o.get("category", "unknown")
        if cat not in class_to_id:
            continue

        b = o.get("box2d", {})
        x1, y1, x2, y2 = b.get("x1"), b.get("y1"), b.get("x2"), b.get("y2")
        if None in (x1, y1, x2, y2):
            continue

        # sanity clamp (just in case)
        x1 = clamp(float(x1), 0.0, float(img_w))
        x2 = clamp(float(x2), 0.0, float(img_w))
        y1 = clamp(float(y1), 0.0, float(img_h))
        y2 = clamp(float(y2), 0.0, float(img_h))

        if x2 <= x1 or y2 <= y1:
            continue

        # YOLO normalized format
        xc = ((x1 + x2) / 2.0) / img_w
        yc = ((y1 + y2) / 2.0) / img_h
        w  = (x2 - x1) / img_w
        h  = (y2 - y1) / img_h

        cid = class_to_id[cat]
        lines.append(f"{cid} {xc:.6f} {yc:.6f} {w:.6f} {h:.6f}")

    return lines

Step 6: Building YOLO class mapping
Classes used (count=9):
['bike', 'bus', 'car', 'motor', 'person', 'rider', 'traffic light', 'traffic sign', 'truck'] 



## Step 7: create YOLO folder structure + export images/labels + data.yaml

In [None]:
# This makes a fully trainable YOLO dataset:
# output_dir/
#   images/train, images/val, images/test
#   labels/train, labels/val, labels/test
#   data.yaml

def ensure_dir(p):
    os.makedirs(p, exist_ok=True)

out = CONFIG["output_dir"]
img_out = {sp: os.path.join(out, "images", sp) for sp in ["train","val","test"]}
lab_out = {sp: os.path.join(out, "labels", sp) for sp in ["train","val","test"]}

for sp in ["train","val","test"]:
    ensure_dir(img_out[sp])
    ensure_dir(lab_out[sp])

print("Step 7: Exporting YOLO dataset (copy images + write labels)")

# To avoid repeated PIL opens, we’ll read image size using PIL only once per sample.
from PIL import Image

def export_split(samples, split):
    for s in tqdm(samples, desc=f"Export {split}"):
        img_src = s["img_path"]
        json_src = s["json_path"]

        stem = Path(img_src).stem
        img_dst = os.path.join(img_out[split], Path(img_src).name)
        lab_dst = os.path.join(lab_out[split], stem + ".txt")

        # copy image
        if not os.path.exists(img_dst):
            shutil.copy2(img_src, img_dst)

        # write label
        try:
            with Image.open(img_src) as im:
                w, h = im.size
        except Exception:
            # if image fails to open, skip it (rare but possible)
            continue

        yolo_lines = json_to_yolo_lines(json_src, w, h, class_to_id)

        with open(lab_dst, "w") as f:
            f.write("\n".join(yolo_lines))

export_split(selected_by_split["train"], "train")
export_split(selected_by_split["val"], "val")
export_split(selected_by_split["test"], "test")


# data.yaml for Ultralytics YOLO
yaml_path = os.path.join(out, "data.yaml")
with open(yaml_path, "w") as f:
    f.write(f"path: {out}\n")
    f.write("train: images/train\n")
    f.write("val: images/val\n")
    f.write("test: images/test\n\n")
    f.write(f"nc: {len(classes)}\n")
    f.write("names:\n")
    for i, name in enumerate(classes):
        f.write(f"  {i}: {name}\n")

print("\n YOLO dataset is ready!")
print(f" Output folder: {out}")
print(f" data.yaml: {yaml_path}")

Step 7: Exporting YOLO dataset (copy images + write labels)


Export train:   0%|          | 0/20999 [00:00<?, ?it/s]

## Final summary

In [None]:

print("FINAL SUMMARY")

print(f"After removing '{CONFIG['class_to_remove']}' images:")
print(f"  Kept samples total: {len(analyzed):,}")
print(f"Selected for YOLO:")
print(f"  Train: {len(selected_by_split['train']):,}")
print(f"  Val:   {len(selected_by_split['val']):,}")
print(f"  Test:  {len(selected_by_split['test']):,}")
print(f"  TOTAL: {len(selected):,}")
print(f"Classes: {len(classes)}")


## Visualizations