In [None]:
# --- Environment-agnostic dataset root resolver ---
from pathlib import Path

# If we're on Kaggle, this exists:
kaggle_root = Path("/kaggle/input/kitti-dataset")

# If we're outside Kaggle but downloaded with kagglehub, use that:
try:
    hub_root = Path(klemenko_kitti_dataset_path)
except NameError:
    hub_root = None

if kaggle_root.exists():
    DATASET_ROOT = kaggle_root
elif hub_root and hub_root.exists():
    DATASET_ROOT = hub_root
else:
    raise FileNotFoundError(
        "KITTI data not found. Either run on Kaggle (where /kaggle/input/kitti-dataset exists) "
        "or ensure kagglehub.download() finished and klemenko_kitti_dataset_path is valid."
    )

print("Using KITTI dataset root:", DATASET_ROOT)

# Phase 1

Install Required Packages

In [None]:
# Install everything we actually use
!pip install -q ultralytics tqdm opencv-python-headless matplotlib pyyaml scikit-learn

# Make Ultralytics not initialize W&B (saves RAM / avoids login prompts)
from ultralytics.utils import SETTINGS
SETTINGS.update({'wandb': False})

In [None]:
import warnings
warnings.filterwarnings("ignore", category=FutureWarning)
import seaborn as sns
import matplotlib.pyplot as plt
from ultralytics import YOLO
from pathlib import Path
import cv2
from collections import defaultdict
import numpy as np
from pathlib import Path
from ultralytics import YOLO
from scipy.ndimage import gaussian_filter
import random

Create Project Folder Structure

In [None]:
from pathlib import Path

PROJECT_ROOT = Path('./kitti_phase1')
EXTRACT_DIR = PROJECT_ROOT / 'kitti_extracted'   # kept for parity with original notebook
YOLO_DIR = PROJECT_ROOT / 'kitti_yolo'

for d in [EXTRACT_DIR, YOLO_DIR]:
    d.mkdir(parents=True, exist_ok=True)

(YOLO_IMAGES_TRAIN := YOLO_DIR / 'images' / 'train').mkdir(parents=True, exist_ok=True)
(YOLO_LABELS_TRAIN := YOLO_DIR / 'labels' / 'train').mkdir(parents=True, exist_ok=True)
(YOLO_IMAGES_VAL   := YOLO_DIR / 'images' / 'val').mkdir(parents=True, exist_ok=True)
(YOLO_LABELS_VAL   := YOLO_DIR / 'labels' / 'val').mkdir(parents=True, exist_ok=True)

PROJECT_ROOT, EXTRACT_DIR, YOLO_DIR

Select KITTI Paths from Kaggle Input

In [None]:
from pathlib import Path

KAGGLE_KITTI = DATASET_ROOT

TRAIN_IMAGES_DIR = KAGGLE_KITTI / "data_object_image_2" / "training" / "image_2"
TEST_IMAGES_DIR  = KAGGLE_KITTI / "data_object_image_2" / "testing"  / "image_2"
LABELS_DIR       = KAGGLE_KITTI / "data_object_label_2" / "training" / "label_2"

IMAGES_DIR = TRAIN_IMAGES_DIR  # we convert from the labeled TRAIN split

print("Train images folder:", TRAIN_IMAGES_DIR)
print("Test  images folder:", TEST_IMAGES_DIR)
print("Labels folder      :", LABELS_DIR)

# Early path asserts to fail fast if anything's missing
for p in [TRAIN_IMAGES_DIR, TEST_IMAGES_DIR, LABELS_DIR]:
    assert p.exists(), f"Missing folder: {p}"

print("Number of TRAIN images:", len(list(TRAIN_IMAGES_DIR.glob('*.png'))))
print("Number of TEST  images:", len(list(TEST_IMAGES_DIR.glob('*.png'))))
print("Number of TRAIN labels:", len(list(LABELS_DIR.glob('*.txt'))))

Define Image and Label Directories, Preview a Sample Image, and Preview the Matching Label File

In [None]:
import cv2, matplotlib.pyplot as plt

sample_img_path = sorted(IMAGES_DIR.glob('*.png'))[0]
img = cv2.cvtColor(cv2.imread(str(sample_img_path)), cv2.COLOR_BGR2RGB)

plt.imshow(img)
plt.axis('off')
plt.title(sample_img_path.name)
plt.show()

sample_lbl_path = LABELS_DIR / (sample_img_path.stem + '.txt')
print("Label file:", sample_lbl_path.name)
print("\n".join(open(sample_lbl_path).read().splitlines()[:10]))

Check for Missing or Empty Labels

In [None]:
from tqdm import tqdm

images = sorted(IMAGES_DIR.glob('*.png'))
labels = sorted(LABELS_DIR.glob('*.txt'))
img_set = {p.stem for p in images}
lbl_set = {p.stem for p in labels}

imgs_without_lbl = [p for p in images if p.stem not in lbl_set]
lbl_without_imgs = [p for p in labels if p.stem not in img_set]
empty_label_files = [p for p in labels if p.stat().st_size == 0]

print("Images without label:", len(imgs_without_lbl))
print("Labels without image:", len(lbl_without_imgs))
print("Empty label files:", len(empty_label_files))

Remove Missing/Empty Label Files

In [None]:
valid_labels = {p.stem for p in labels if p.stat().st_size > 0}
images = [p for p in images if p.stem in valid_labels]  # keep images with non-empty label
print("Usable images after removing missing/empty labels:", len(images))

CLEAN_REPORT = {
    "imgs_without_lbl": [p.name for p in imgs_without_lbl],
    "lbl_without_imgs": [p.name for p in lbl_without_imgs],
    "empty_label_files": [p.name for p in empty_label_files],
}

Remove Duplicate Images

In [None]:
# --- Robust duplicate detection on Kaggle FUSE (handles Errno 512) ---
import hashlib, time, os

def file_md5_resilient(path, chunk=1<<20, retries=3, pause=0.3):
    """MD5 with small retries to tolerate transient FUSE read errors."""
    for attempt in range(1, retries+1):
        try:
            h = hashlib.md5()
            with open(path, "rb") as f:
                while True:
                    data = f.read(chunk)
                    if not data:
                        break
                    h.update(data)
            return h.hexdigest()
        except OSError as e:
            if attempt == retries:
                raise
            time.sleep(pause)
    return None  # should not reach

hash_seen = set()
unique_images = []
dupe_images = []
unreadable_images = []

for p in images:
    try:
        h = file_md5_resilient(p)
        if h in hash_seen:
            dupe_images.append(p)
        else:
            hash_seen.add(h)
            unique_images.append(p)
    except OSError as e:
        # Skip files that the FUSE mount refuses to read; log for report
        unreadable_images.append(p)

# keep only readable, non-duplicate files
images = unique_images

print("Removed duplicate images:", len(dupe_images))
print("Skipped unreadable images (FUSE error):", len(unreadable_images))

CLEAN_REPORT["duplicate_images"]  = [p.name for p in dupe_images]
CLEAN_REPORT["unreadable_images"] = [p.name for p in unreadable_images]

Visualize Bounding Boxes on a Sample Image

In [None]:
import matplotlib.patches as patches

def read_kitti_objects(lbl_path):
    objs = []
    for line in open(lbl_path).read().splitlines():
        parts = line.split()
        if len(parts) < 8:
            continue
        cls = parts[0]
        try:
            x1, y1, x2, y2 = map(float, parts[4:8])
            objs.append({'cls': cls, 'bbox': [x1, y1, x2, y2]})
        except:
            continue
    return objs

fig, ax = plt.subplots(figsize=(8,5))
ax.imshow(img)
ax.axis('off')

objs = read_kitti_objects(sample_lbl_path)
for o in objs[:10]:
    x1, y1, x2, y2 = o['bbox']
    rect = patches.Rectangle((x1,y1), x2-x1, y2-y1, linewidth=2, edgecolor='yellow', facecolor='none')
    ax.add_patch(rect)
    ax.text(x1, y1-3, o['cls'], fontsize=8, color='yellow', backgroundcolor='black')
plt.show()

Define Classes and Label Conversion Functions

In [None]:
CLASSES = ['Car', 'Pedestrian', 'Cyclist']
CLASS_TO_ID = {c:i for i,c in enumerate(CLASSES)}

def kitti_to_yolo_bbox(x1, y1, x2, y2, W, H):
    x_center = ((x1 + x2) / 2.0) / W
    y_center = ((y1 + y2) / 2.0) / H
    w = (x2 - x1) / W
    h = (y2 - y1) / H
    return x_center, y_center, w, h

Convert KITTI Labels to YOLO Format with Outlier Filtering

In [None]:
def convert_file_to_yolo(img_path, lbl_path):
    import cv2, math
    H, W = cv2.imread(str(img_path)).shape[:2]
    lines_out = []
    min_area_frac = 1e-5      # drop boxes smaller than this fraction of image area
    max_area_frac = 0.8       # drop boxes that cover almost whole image
    max_aspect = 20.0         # drop boxes with extreme aspect ratio

    for line in open(lbl_path).read().splitlines():
        parts = line.split()
        if len(parts) < 8:
            continue
        cls = parts[0]
        if cls not in CLASS_TO_ID:
            continue

        x1, y1, x2, y2 = map(float, parts[4:8])
        # clamp to image bounds
        x1, y1 = max(0.0, x1), max(0.0, y1)
        x2, y2 = min(float(W-1), x2), min(float(H-1), y2)
        if x2 <= x1 or y2 <= y1:
            continue

        # outlier checks
        bw, bh = (x2 - x1), (y2 - y1)
        area_frac = (bw * bh) / float(W * H)
        aspect = max(bw / max(bh,1e-6), bh / max(bw,1e-6))
        if area_frac < min_area_frac or area_frac > max_area_frac or aspect > max_aspect:
            continue

        xc = ((x1 + x2) / 2.0) / W
        yc = ((y1 + y2) / 2.0) / H
        w  = bw / W
        h  = bh / H
        if w <= 0 or h <= 0:
            continue

        lines_out.append(f"{CLASS_TO_ID[cls]} {xc:.6f} {yc:.6f} {w:.6f} {h:.6f}")
    return lines_out

Split Dataset into Train (80%) and Validation (20%)

In [None]:
import random, shutil
random.seed(42)

usable_images = [p for p in images if (LABELS_DIR / (p.stem + '.txt')).exists()]
random.shuffle(usable_images)

split_idx = int(0.8 * len(usable_images))
train_imgs = usable_images[:split_idx]
val_imgs = usable_images[split_idx:]

def process_split(img_paths, images_out_dir, labels_out_dir):
    images_out_dir.mkdir(parents=True, exist_ok=True)
    labels_out_dir.mkdir(parents=True, exist_ok=True)
    kept, skipped = 0, 0
    for ip in tqdm(img_paths):
        lp = LABELS_DIR / (ip.stem + '.txt')
        lines = convert_file_to_yolo(ip, lp)
        if len(lines) == 0:
            skipped += 1
            continue
        shutil.copy2(ip, images_out_dir / ip.name)
        with open(labels_out_dir / (ip.stem + '.txt'), 'w') as f:
            f.write('\n'.join(lines) + '\n')
        kept += 1
    return kept, skipped

kept_tr, skip_tr = process_split(train_imgs, YOLO_IMAGES_TRAIN, YOLO_LABELS_TRAIN)
kept_va, skip_va = process_split(val_imgs, YOLO_IMAGES_VAL, YOLO_LABELS_VAL)

print({'train_kept': kept_tr, 'train_skipped': skip_tr, 'val_kept': kept_va, 'val_skipped': skip_va})

Write YOLO Data Configuration (data.yaml)

In [None]:
import yaml

data_yaml = {
    'path': str(YOLO_DIR.resolve()),
    'train': 'images/train',
    'val': 'images/val',
    'names': CLASSES,
}
with open(YOLO_DIR / 'data.yaml', 'w') as f:
    yaml.safe_dump(data_yaml, f, sort_keys=False)
print((YOLO_DIR / 'data.yaml').read_text())

Count Train/Val Images and Labels and Package Final YOLO Dataset as ZIP

In [None]:
n_tr_img = len(list((YOLO_IMAGES_TRAIN).glob('*.png')))
n_tr_lbl = len(list((YOLO_LABELS_TRAIN).glob('*.txt')))
n_va_img = len(list((YOLO_IMAGES_VAL).glob('*.png')))
n_va_lbl = len(list((YOLO_LABELS_VAL).glob('*.txt')))

print('Train images:', n_tr_img, '| Train labels:', n_tr_lbl)
print('Val images  :', n_va_img, '| Val labels  :', n_va_lbl)

import shutil
zip_path = PROJECT_ROOT / 'kitti_yolo_prepared.zip'
if zip_path.exists(): zip_path.unlink()
shutil.make_archive(str(zip_path.with_suffix('')), 'zip', root_dir=YOLO_DIR)
print("ZIP written to:", zip_path.resolve())

Save Cleaning Report as JSON

In [None]:
import json
CLEAN_REPORT.update({
    "train_kept": kept_tr, "train_skipped": skip_tr,
    "val_kept": kept_va,   "val_skipped": skip_va
})
with open(PROJECT_ROOT / "cleaning_report.json", "w") as f:
    json.dump(CLEAN_REPORT, f, indent=2)
print("Saved cleaning_report.json")

# Phase 2

Load YOLO Dataset Metadata

In [None]:
from pathlib import Path
import pandas as pd
import os

YOLO_DIR = (PROJECT_ROOT / "kitti_yolo").resolve()

train_images = sorted((YOLO_DIR / "images" / "train").glob("*.png"))
train_labels = sorted((YOLO_DIR / "labels" / "train").glob("*.txt"))
val_images   = sorted((YOLO_DIR / "images" / "val").glob("*.png"))
val_labels   = sorted((YOLO_DIR / "labels" / "val").glob("*.txt"))

print("Train images:", len(train_images), "Train labels:", len(train_labels))
print("Val images  :", len(val_images), "Val labels  :", len(val_labels))

Parse YOLO Label Files into DataFrame

In [None]:
import numpy as np

records = []
for lbl in train_labels + val_labels:
    split = "train" if "train" in str(lbl) else "val"
    for line in open(lbl).read().splitlines():
        cls, xc, yc, w, h = line.split()
        records.append({
            "split": split,
            "image": lbl.stem + ".png",
            "class_id": int(cls),
            "x_center": float(xc),
            "y_center": float(yc),
            "width": float(w),
            "height": float(h),
        })

df = pd.DataFrame(records)

In [None]:
df.head()



In [None]:
print("Dataset size:", len(df))
print("Number of unique images:", df["image"].nunique())
print("Classes:", df["class_id"].unique())
print("Class counts:\n", df["class_id"].value_counts())


# Bounding Box Visualization

In [None]:
def show_image_with_bboxes(image_name, df, images_dir):
    sub = df[df["image"] == image_name]
    img_path = os.path.join(images_dir, image_name)
    img = cv2.imread(img_path)
    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)

    h, w, _ = img.shape

    for _, row in sub.iterrows():
        x, y, bw, bh = row["x_center"], row["y_center"], row["width"], row["height"]
        x1 = int((x - bw/2) * w)
        y1 = int((y - bh/2) * h)
        x2 = int((x + bw/2) * w)
        y2 = int((y + bh/2) * h)

        cls = row["class_id"]
        color = (0,255,0) if cls==0 else (255,0,0) if cls==1 else (0,0,255)
        label = "Car" if cls==0 else "Pedestrian" if cls==1 else "Cyclist"

        cv2.rectangle(img, (x1,y1), (x2,y2), color, 2)
        cv2.putText(img, label, (x1, y1-5), cv2.FONT_HERSHEY_SIMPLEX,
                    0.6, color, 2, cv2.LINE_AA)

    plt.figure(figsize=(10,6))
    plt.imshow(img)
    plt.axis("off")
    plt.title(f"Bounding Boxes in {image_name}")
    plt.show()


# Class Diversity per Image

In [None]:
diverse_imgs = df.groupby("image")["class_id"].nunique()
diverse_imgs = diverse_imgs[diverse_imgs==3].index

for img_name in diverse_imgs[:3]:
    show_image_with_bboxes(img_name, df, str(IMAGES_DIR))


# Class Distribution

In [None]:
# --- Fix dtype & palette for plotting ---
import pandas as pd

# ensure numeric class ids and keep only the three classes
df["class_id"] = pd.to_numeric(df["class_id"], errors="coerce")
df = df[df["class_id"].isin([0, 1, 2])].copy()
df["class_id"] = df["class_id"].astype(int)

CLS_ORDER = [0, 1, 2]
PALETTE_INT = {0: "#9370DB", 1: "#4682B4", 2: "#20B2AA"}

In [None]:
plt.figure(figsize=(6,4))
sns.countplot(data=df, x="class_id",
              palette={'0': "#4E79A7", '1': "#9370DB", '2': "#76B7B2"})
plt.xticks([0,1,2], ['Car','Pedestrian','Cyclist'])
plt.title("Class Distribution")
plt.xlabel("Class")
plt.ylabel("Count")
plt.show()

In [None]:
df["is_minority"] = df["class_id"].isin([1,2]).astype(int)

plt.figure(figsize=(6,4))
sns.countplot(data=df, x="is_minority",
              palette={'0': "#4E79A7", '1': "#F28E2B"})
plt.xticks([0,1], ["Majority (Car)", "Minority (Pedestrian/Cyclist)"])
plt.title("Minority Class Flag Distribution")
plt.xlabel("Class Group")
plt.ylabel("Count")
plt.show()

# # Train vs Validation

In [None]:
plt.figure(figsize=(6,4))
sns.countplot(data=df,x="class_id",hue="split",palette=["#9370DB", "#4682B4"])
plt.xticks([0,1,2], ['Car','Pedestrian','Cyclist'])
plt.title("Class Distribution (Train vs Val)")
plt.show()

# Bounding Box Derived Features

In [None]:
df["bbox_area"] = df["width"] * df["height"]
df["aspect_ratio"] = df["width"] / df["height"]

In [None]:
df.head()

In [None]:
plt.figure(figsize=(12,5))
plt.subplot(1,2,1)
sns.histplot(df["bbox_area"],bins=50,kde=True,color="#9370DB",edgecolor="black")

plt.title("Bounding Box Area Distribution")

plt.subplot(1,2,2)
sns.histplot(df["aspect_ratio"],bins=50,kde=True,color="#008080",edgecolor="black")
plt.title("Bounding Box Aspect Ratio Distribution")

plt.tight_layout()
plt.show()


In [None]:
import random
for img_name in random.sample(df['image'].unique().tolist(), 4):
    show_image_with_bboxes(img_name, df, str(IMAGES_DIR))

In [None]:
objs_per_img = df.groupby("image")["class_id"].count().reset_index(name="num_objects")

plt.figure(figsize=(6,4))
sns.histplot(objs_per_img["num_objects"],bins=30,color="#9370DB",edgecolor="black")
plt.title("Objects per Image Distribution")
plt.xlabel("Objects per image")
plt.ylabel("Frequency")
plt.show()

print("Avg objects per image:", objs_per_img["num_objects"].mean())
print("Max objects in an image:", objs_per_img["num_objects"].max())


# Bounding Box Area per Class

In [None]:
plt.figure(figsize=(10,6))
PALETTE_STR = {'0': "#9370DB", '1': "#4682B4", '2': "#20B2AA"}
sns.boxplot(data=df, x="class_id", y="bbox_area", palette=PALETTE_STR)

plt.xticks([0,1,2], ['Car','Pedestrian','Cyclist'])
plt.yscale("log")
plt.title("Bounding Box Area Distribution per Class")
plt.show()


# Object Center Heatmaps

In [None]:
plt.figure(figsize=(15,5))

for i, cls in enumerate(['Car','Pedestrian','Cyclist']):
    plt.subplot(1,3,i+1)
    subset = df[df["class_id"]==i]
    sns.kdeplot(x=subset["x_center"], y=subset["y_center"], cmap="mako", fill=True, thresh=0.05)
    plt.title(f"{cls} Center Density")
    plt.xlabel("X Center")
    plt.ylabel("Y Center")

plt.tight_layout()
plt.show()

# Combined Object Center Heatmap

In [None]:
plt.figure(figsize=(6,6))
sns.kdeplot(
    data=df, x="x_center", y="y_center",
    hue="class_id", fill=True, common_norm=False, alpha=0.4
)
plt.title("Object Center Heatmap per Class")
plt.show()

# Visualizing Object Centers on Random Images

In [None]:
def show_object_centers(image_name, df, images_dir):
    sub = df[df["image"] == image_name]
    img_path = os.path.join(images_dir, image_name)
    img = cv2.imread(img_path)
    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)

    h, w, _ = img.shape

    plt.imshow(img)
    for _, row in sub.iterrows():
        cx = int(row["x_center"] * w)
        cy = int(row["y_center"] * h)
        cls = row["class_id"]

        color = "lime" if cls==0 else "orange" if cls==1 else "red"
        label = "Car" if cls==0 else "Pedestrian" if cls==1 else "Cyclist"

        plt.scatter(cx, cy, c=color, s=50, edgecolors="black")

    plt.title(f"Object Centers in {image_name}")
    plt.axis("off")

In [None]:
random_imgs = df["image"].drop_duplicates().sample(6, random_state=42).tolist()

plt.figure(figsize=(12, 30))
for i, img_name in enumerate(random_imgs, 1):
    plt.subplot(6, 1, i)
    show_object_centers(img_name, df, str(IMAGES_DIR))

plt.tight_layout()
plt.show()

# Crowded Images & Rare Classes

In [None]:
class_diversity = df.groupby("image")["class_id"].nunique().reset_index()
class_diversity.rename(columns={"class_id": "num_classes"}, inplace=True)
top_imgs = objs_per_img.sort_values("num_objects", ascending=False).head(5)
print("\nTop crowded images:\n", top_imgs)

rare_class_imgs = class_diversity[class_diversity["num_classes"] == 1]
print("\nImages with only one class:", len(rare_class_imgs))

In [None]:
def show_image_with_stats(image_name, df, images_dir):
    sub = df[df["image"] == image_name]
    img_path = os.path.join(images_dir, image_name)
    img = cv2.imread(img_path)
    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)

    h, w, _ = img.shape
    for _, row in sub.iterrows():
        x, y, bw, bh = row["x_center"], row["y_center"], row["width"], row["height"]
        x1 = int((x - bw/2) * w)
        y1 = int((y - bh/2) * h)
        x2 = int((x + bw/2) * w)
        y2 = int((y + bh/2) * h)

        cls = row["class_id"]
        color = (0,255,0) if cls==0 else (255,0,0) if cls==1 else (0,0,255)
        label = "Car" if cls==0 else "Pedestrian" if cls==1 else "Cyclist"

        cv2.rectangle(img, (x1,y1), (x2,y2), color, 2)
        cv2.putText(img, label, (x1, y1-5), cv2.FONT_HERSHEY_SIMPLEX,
                    0.6, color, 2, cv2.LINE_AA)

    counts = sub["class_id"].value_counts().to_dict()
    num_cars = counts.get(0, 0)
    num_pedestrians = counts.get(1, 0)
    num_cyclists = counts.get(2, 0)
    total_objects = len(sub)

    plt.figure(figsize=(8,6))
    plt.imshow(img)
    plt.axis("off")
    plt.title(f"Image: {image_name} | Total: {total_objects} | Cars: {num_cars} | Pedestrians: {num_pedestrians} | Cyclists: {num_cyclists}")
    plt.show()

In [None]:
top_imgs = objs_per_img.sort_values("num_objects", ascending=False).head(3)
for img_name in top_imgs["image"]:
    show_image_with_stats(img_name, df, str(IMAGES_DIR))

# Summary

In [None]:
print("Total images:", df["image"].nunique())
print("Total objects:", len(df))

print("\nObjects per class:\n", df["class_id"].value_counts())
print("\nAvg objects per image:", objs_per_img["num_objects"].mean())
print("Max objects in an image:", objs_per_img["num_objects"].max())

print("\nClass diversity per image:\n", class_diversity["num_classes"].value_counts().sort_index())

# Handling Classes to Make It Balanced

In [None]:
from sklearn.utils import resample

df_clean = df.dropna(subset=["class_id"]).copy()

df_filtered = df_clean[df_clean["class_id"].isin([0, 1, 2])]

car_df = df_filtered[df_filtered["class_id"] == 0]
ped_df = df_filtered[df_filtered["class_id"] == 1]
cyc_df = df_filtered[df_filtered["class_id"] == 2]

print("Before balancing:")
print(df_filtered["class_id"].value_counts())

target_size = len(car_df)

ped_upsampled = resample(
    ped_df, replace=True,
    n_samples=target_size, random_state=42
)
cyc_upsampled = resample(
    cyc_df, replace=True,
    n_samples=target_size, random_state=42
)

df_balanced = pd.concat([car_df, ped_upsampled, cyc_upsampled]).reset_index(drop=True)

print("\nAfter balancing:")
print(df_balanced["class_id"].value_counts())
print("Balanced dataset size:", len(df_balanced))

In [None]:
from sklearn.model_selection import train_test_split

train_df, val_df = train_test_split(df_balanced, test_size=0.2, stratify=df_balanced["class_id"], random_state=42)

print("Train size:", len(train_df))
print("Validation size:", len(val_df))
print("Train class distribution:\n", train_df["class_id"].value_counts())
print("Val class distribution:\n", val_df["class_id"].value_counts())


In [None]:
import shutil
from pathlib import Path
from sklearn.model_selection import train_test_split
from ultralytics import YOLO
import yaml
import random

def prepare_and_train_yolov8(df_balanced,
                             orig_images_dir: Path,
                             balanced_root_dir: Path,
                             train_val_split: float = 0.2,
                             sample_fraction: float = 1.0,
                             epochs: int = 50,
                             batch_size: int = 16,
                             img_size: int = 640,
                             device: int = 0):

    img_train_dir = balanced_root_dir / "images" / "train"
    lbl_train_dir = balanced_root_dir / "labels" / "train"
    img_val_dir = balanced_root_dir / "images" / "val"
    lbl_val_dir = balanced_root_dir / "labels" / "val"
    for folder in [img_train_dir, lbl_train_dir, img_val_dir, lbl_val_dir]:
        folder.mkdir(parents=True, exist_ok=True)

    unique_images = df_balanced["image"].unique()
    train_imgs, val_imgs = train_test_split(unique_images, test_size=train_val_split, random_state=42, shuffle=True)

    random.seed(42)
    train_imgs = random.sample(train_imgs.tolist(), int(len(train_imgs) * sample_fraction))
    val_imgs = random.sample(val_imgs.tolist(), int(len(val_imgs) * sample_fraction))

    grouped = df_balanced.groupby("image")

    def yolo_format(row):
        return f"{row['class_id']} {row['x_center']:.6f} {row['y_center']:.6f} {row['width']:.6f} {row['height']:.6f}"

    def copy_and_write(images, img_dir, lbl_dir):
        for img_name in images:
            shutil.copy2(orig_images_dir / img_name, img_dir / img_name)
            labels = []
            if img_name in grouped.groups:
                labels = [yolo_format(row) for _, row in grouped.get_group(img_name).iterrows()]
            with open(lbl_dir / (Path(img_name).stem + ".txt"), "w") as f:
                f.writelines(line + "\n" for line in labels)

    copy_and_write(train_imgs, img_train_dir, lbl_train_dir)
    copy_and_write(val_imgs, img_val_dir, lbl_val_dir)

    print(f"Prepared {len(train_imgs)} train and {len(val_imgs)} val images with labels.")
    data_yaml = {
        'path': str(balanced_root_dir.resolve()),
        'train': 'images/train',
        'val': 'images/val',
        'names': ['Car', 'Pedestrian', 'Cyclist']
    }
    with open(balanced_root_dir / "data.yaml", 'w') as f:
        yaml.safe_dump(data_yaml, f, sort_keys=False)
    print(f"Saved data.yaml at {balanced_root_dir / 'data.yaml'}")

In [None]:
# Build the balanced dataset that training will point to
balanced_root_dir = Path("./kitti_balanced_yolo")
prepare_and_train_yolov8(
    df_balanced=df_balanced,
    orig_images_dir=IMAGES_DIR,
    balanced_root_dir=balanced_root_dir,
    train_val_split=0.2,
    sample_fraction=1.0
)
assert (balanced_root_dir / "data.yaml").exists(), "Expected balanced_root_dir/data.yaml to exist."

# Model training with duplicates removed

In [None]:
# ---- Training (safe version) ----
from ultralytics import YOLO
from pathlib import Path
import torch

# Choose transfer learning (recommended) or from-scratch (commented)
model = YOLO('yolov8n.pt')     # or 'yolov8s.pt' if you have more compute
# model = YOLO('yolov8n.yaml') # from scratch

# Pick device safely (works on CPU/GPU)
device = 0 if torch.cuda.is_available() else 'cpu'

# Point to the dataset you want:
# A) Balanced dataset (requires the call in Step 6)
data_yaml_path = balanced_root_dir / "data.yaml"
# B) Or original dataset you built earlier:
# data_yaml_path = YOLO_DIR / "data.yaml"

assert data_yaml_path.exists(), f"data.yaml not found at {data_yaml_path}"

epochs = 50
batch_size = 8
img_size = 640

results = model.train(
    data=str(data_yaml_path),
    epochs=epochs,
    batch=batch_size,
    imgsz=img_size,
    device=device,
    workers=2,
    flipud=0.5,
    fliplr=0.5,
    hsv_h=0.015,
    hsv_s=0.7,
    hsv_v=0.4,
    project="runs/train",
    name="kitti_yolov8",
    exist_ok=True
)

In [None]:
# --- Locate the freshly trained best.pt robustly ---
from pathlib import Path
import hashlib

candidates = []
try:
    candidates.append(Path(results.save_dir) / "weights" / "best.pt")  # from Ultralytics Results
except Exception:
    pass

# Your named run (fallback)
candidates.append(Path("runs/train/kitti_yolov8/weights/best.pt"))

# Newest best.pt anywhere
cand_glob = sorted(Path("runs").rglob("weights/best.pt"),
                   key=lambda p: p.stat().st_mtime if p.exists() else 0,
                   reverse=True)
candidates.extend(cand_glob)

BEST_PT = next((p for p in candidates if p and p.exists()), None)
assert BEST_PT is not None and BEST_PT.exists(), "best.pt not found."

def sha256(p: Path):
    h = hashlib.sha256()
    with open(p, 'rb') as f:
        for chunk in iter(lambda: f.read(1<<20), b''):
            h.update(chunk)
    return h.hexdigest()

print("best.pt:", BEST_PT)
print("size(MB):", round(BEST_PT.stat().st_size/1e6, 2))
print("sha256:", sha256(BEST_PT))

# Evaluation

In [None]:
eval_results = model.val()
print(eval_results)

In [None]:
# View predictions inline (no files saved)
from ultralytics import YOLO
from pathlib import Path
import matplotlib.pyplot as plt
import gc, torch

model = YOLO(str(BEST_PT))

IMG_SIZE = 640
CONF = 0.5
IOU = 0.45
MAX_SHOW = 30      # change to see more; large values can slow the notebook

pred_gen = model.predict(
    source=str(TEST_IMAGES_DIR),
    imgsz=IMG_SIZE,
    conf=CONF,
    iou=IOU,
    device=device,       # defined earlier (0 or 'cpu')
    stream=True,         # generator = low memory
    save=False,          # <-- don't save images
    save_txt=False,
    verbose=False
)

shown = 0
for r in pred_gen:
    # r.plot() returns an annotated image (BGR)
    im_annot = r.plot()
    plt.figure(figsize=(12, 6))
    plt.imshow(im_annot[..., ::-1])  # BGR -> RGB for matplotlib
    plt.title(Path(r.path).name)
    plt.axis("off")
    plt.show()

    shown += 1
    plt.close()
    gc.collect()
    if torch.cuda.is_available():
        torch.cuda.empty_cache()
    if shown >= MAX_SHOW:
        break

# Count objects in the predicted images Predicted

In [None]:
# ✅ Pick a real image folder that exists (no hard-coded Kaggle paths)
from pathlib import Path
import itertools

candidates = [
    TEST_IMAGES_DIR,                                  # preferred: KITTI test images
    YOLO_DIR / "images" / "val",                      # fallback: your YOLO val split
    YOLO_DIR / "images" / "train",                    # fallback: your YOLO train split
]
if 'balanced_root_dir' in globals():
    candidates += [balanced_root_dir / "images" / "val", balanced_root_dir / "images" / "train"]

# keep only directories that exist and have at least one PNG
def has_png(p: Path) -> bool:
    return p and p.exists() and any(p.glob("*.png"))

candidates = [p for p in candidates if has_png(p)]
assert candidates, "No image directory with PNGs found in TEST/VAL/TRAIN candidates."

SOURCE_DIR = candidates[0]
print("Using images from:", SOURCE_DIR)

# 🔎 Show a few for sanity
first_five = list(itertools.islice(SOURCE_DIR.glob("*.png"), 5))
print("Example files:", [p.name for p in first_five])

# 🔮 Predict & DISPLAY inline (nothing saved)
from ultralytics import YOLO
import matplotlib.pyplot as plt
import torch, gc

model = YOLO(str(BEST_PT))
device = 0 if torch.cuda.is_available() else 'cpu'

pred_gen = model.predict(
    source=str(SOURCE_DIR),
    imgsz=640, conf=0.5, iou=0.45,
    device=device,
    stream=True,      # low memory
    save=False,       # do NOT save images
    save_txt=False,
    verbose=False
)

MAX_SHOW = 30  # change if you want to view more/less
for i, r in enumerate(pred_gen, start=1):
    im_annot = r.plot()                # annotated BGR image
    plt.figure(figsize=(12, 6))
    plt.imshow(im_annot[..., ::-1])    # BGR -> RGB
    plt.title(Path(r.path).name)
    plt.axis("off")
    plt.show()

    if i >= MAX_SHOW:
        break
    plt.close(); gc.collect()
    if torch.cuda.is_available():
        torch.cuda.empty_cache()

# Visualize first 5 prediction images with counts

In [None]:
# ✅ Predict on an existing directory, SHOW images inline, and PRINT per-image + total counts
from pathlib import Path
import matplotlib.pyplot as plt
import numpy as np, gc, torch
from collections import defaultdict
from ultralytics import YOLO

# Pick a source dir that actually exists
candidates = [
    TEST_IMAGES_DIR,
    YOLO_DIR / "images" / "val",
    YOLO_DIR / "images" / "train",
]
candidates = [p for p in candidates if p and p.exists() and any(p.glob("*.png"))]
assert candidates, "No image directory with PNGs found in TEST/VAL/TRAIN."
SOURCE_DIR = candidates[0]
print("Using images from:", SOURCE_DIR)

# Load best weights
model = YOLO(str(BEST_PT))
device = 0 if torch.cuda.is_available() else 'cpu'

# Class names (robust to dict/list forms)
model_names = getattr(model, "names", None)
if isinstance(model_names, dict):
    class_names = [model_names[i] for i in sorted(model_names)]
elif isinstance(model_names, (list, tuple)):
    class_names = list(model_names)
else:
    class_names = ["Car", "Pedestrian", "Cyclist"]  # fallback

# Stream predictions (low memory), don't save files
pred_gen = model.predict(
    source=str(SOURCE_DIR),
    imgsz=640,
    conf=0.5,
    iou=0.45,
    device=device,
    batch=1,
    stream=True,
    save=False,
    verbose=False
)

total_counts = defaultdict(int)
MAX_SHOW = 10  # show up to N images

for i, r in enumerate(pred_gen, start=1):
    # --- counts for this image ---
    if hasattr(r, "boxes") and len(r.boxes):
        cls_ids = r.boxes.cls.int().cpu().numpy()
    else:
        cls_ids = np.array([], dtype=int)

    image_counts = defaultdict(int)
    for cid in cls_ids:
        name = class_names[int(cid)] if int(cid) < len(class_names) else str(int(cid))
        image_counts[name] += 1
        total_counts[name] += 1

    print(f"\nImage {i}: {Path(r.path).name}")
    print("  Counts:", dict(image_counts))

    # --- show annotated image inline ---
    im_annot = r.plot()                  # BGR
    plt.figure(figsize=(12, 6))
    plt.imshow(im_annot[..., ::-1])      # BGR -> RGB
    plt.title(f"{Path(r.path).name}  |  " +
              "  ".join([f"{k}:{v}" for k,v in image_counts.items()]) if image_counts else "No detections")
    plt.axis("off")
    plt.show()

    # housekeeping
    del im_annot
    gc.collect()
    if torch.cuda.is_available():
        torch.cuda.empty_cache()

    if i >= MAX_SHOW:
        break

# --- Totals across shown images ---
print("\nTotal predicted object counts (across shown images):")
for name in class_names:
    print(f"{name}: {total_counts[name]}")

# Confidence Heatmap Randomly sample 5 images

In [None]:
model = YOLO(str(BEST_PT))
test_images_dir = TEST_IMAGES_DIR
image_paths = list(test_images_dir.glob("*.png"))
sample_paths = random.sample(image_paths, 5)

class_names = ["Car", "Pedestrian", "Cyclist"]

for img_path in sample_paths:
    img = cv2.imread(str(img_path))
    if img is None:
        print(f"Could not read image: {img_path}")
        continue

    img_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    heatmap = np.zeros((img.shape[0], img.shape[1]), dtype=np.float32)

    results = model.predict(source=str(img_path), conf=0.5, save=False, stream=False)

    r = results[0]
    if hasattr(r, "boxes") and r.boxes is not None and len(r.boxes) > 0:
        boxes = r.boxes.xyxy.cpu().numpy()
        confs = r.boxes.conf.cpu().numpy()

        for box, conf in zip(boxes, confs):
            x1, y1, x2, y2 = box.astype(int)
            # Clip coordinates
            x1 = max(0, min(x1, img.shape[1] - 1))
            x2 = max(0, min(x2, img.shape[1] - 1))
            y1 = max(0, min(y1, img.shape[0] - 1))
            y2 = max(0, min(y2, img.shape[0] - 1))
            if x2 <= x1 or y2 <= y1:
                continue
            heatmap[y1:y2, x1:x2] = np.maximum(heatmap[y1:y2, x1:x2], conf)

    heatmap_norm = heatmap / (heatmap.max() + 1e-6)
    heatmap_smooth = gaussian_filter(heatmap_norm, sigma=5)

    plt.figure(figsize=(12, 6))
    plt.imshow(img_rgb)
    hmap = plt.imshow(heatmap_smooth, cmap='plasma', alpha=0.4, vmin=0, vmax=1)
    plt.axis("off")
    plt.title(f"Confidence Heatmap: {img_path.name}")
    plt.colorbar(hmap, fraction=0.046, pad=0.04, label='Confidence')
    plt.show()


In [None]:
try:
    !pip -q install "tensorflow==2.14.*" onnx onnxruntime onnxsim coremltools --upgrade
except Exception as e:
    print("Falling back to tensorflow-cpu...")
    !pip -q install "tensorflow-cpu==2.14.*" onnx onnxruntime onnxsim coremltools --upgrade

In [None]:
from pathlib import Path
try:
    CLASSES
except NameError:
    CLASSES = ['Car','Pedestrian','Cyclist']  # fallback
labels_txt = Path("labels.txt")
labels_txt.write_text("\n".join(CLASSES))
print("labels.txt written with", len(CLASSES), "classes.")


In [None]:
# Free as much memory as possible before heavy exports
import gc, torch
try:
    del model
except:
    pass
gc.collect()
if torch.cuda.is_available():
    torch.cuda.empty_cache()

In [None]:
from ultralytics import YOLO
from pathlib import Path
import shutil

EXPORTS_DIR = Path("exports"); EXPORTS_DIR.mkdir(exist_ok=True)
exp_model = YOLO(str(BEST_PT))

# ONNX
onnx_path = exp_model.export(format="onnx", opset=12, dynamic=True)
shutil.copy2(onnx_path, EXPORTS_DIR / Path(onnx_path).name)

# TFLite FP32
tfl_fp32 = exp_model.export(format="tflite")
shutil.copy2(tfl_fp32, EXPORTS_DIR / Path(tfl_fp32).name)

# TFLite FP16 (often best for Flutter + NNAPI/GPU)
tfl_fp16 = exp_model.export(format="tflite", half=True)
shutil.copy2(tfl_fp16, EXPORTS_DIR / Path(tfl_fp16).name)

# SavedModel (for custom INT8 conversion)
saved_dir = exp_model.export(format="saved_model")
dst_saved = EXPORTS_DIR / "saved_model"
if dst_saved.exists(): shutil.rmtree(dst_saved)
shutil.copytree(saved_dir, dst_saved)

# TorchScript (optional)
ts_path = exp_model.export(format="torchscript")
shutil.copy2(ts_path, EXPORTS_DIR / Path(ts_path).name)

# NCNN (optional)
ncnn_dir = exp_model.export(format="ncnn")
dst_ncnn = EXPORTS_DIR / "ncnn"
if dst_ncnn.exists(): shutil.rmtree(dst_ncnn)
shutil.copytree(ncnn_dir, dst_ncnn)

# CoreML (optional, for iOS later)
try:
    mlmodel_path = exp_model.export(format="coreml")
    shutil.copy2(mlmodel_path, EXPORTS_DIR / Path(mlmodel_path).name)
except Exception as e:
    print("CoreML export skipped:", e)

# Keep best.pt + labels.txt
shutil.copy2(BEST_PT, EXPORTS_DIR / "best.pt")
shutil.copy2(labels_txt, EXPORTS_DIR / labels_txt.name)

print(" Exported to:", EXPORTS_DIR.resolve())


In [None]:
import tensorflow as tf, random, cv2, numpy as np
from pathlib import Path

assert (Path("exports") / "saved_model").exists(), "Missing exports/saved_model for INT8 conversion."
dst_saved = Path("exports") / "saved_model"

cand_dirs = [
    Path("./kitti_phase1/kitti_yolo/images/train"),
    Path("./kitti_phase1/kitti_yolo/images/val"),
    Path("/kaggle/input/kitti-dataset/data_object_image_2/training/image_2")
]
img_pool = []
for d in cand_dirs:
    if d.exists():
        img_pool += list(d.glob("*.png"))
img_pool = sorted(set(img_pool))
assert img_pool, "No images available for INT8 representative dataset."
sample = random.sample(img_pool, min(100, len(img_pool)))  # was 150

def rep_ds(imgsz=640):
    for p in sample:
        im = cv2.imread(str(p))
        if im is None:
            continue
        im = cv2.cvtColor(im, cv2.COLOR_BGR2RGB)
        im = cv2.resize(im, (imgsz, imgsz)).astype(np.float32) / 255.0
        yield [np.expand_dims(im, 0)]

converter = tf.lite.TFLiteConverter.from_saved_model(str(dst_saved))
converter.optimizations = [tf.lite.Optimize.DEFAULT]
converter.representative_dataset = lambda: rep_ds()
converter.target_spec.supported_ops = [tf.lite.OpsSet.TFLITE_BUILTINS_INT8]
# converter.inference_input_type = tf.uint8
# converter.inference_output_type = tf.uint8

tfl_int8 = converter.convert()
int8_path = Path("exports/best_int8.tflite")
int8_path.write_bytes(tfl_int8)
print("INT8 saved:", int8_path, "size(MB)=", round(int8_path.stat().st_size/1e6,2))

In [None]:
import json, hashlib, zipfile
from pathlib import Path

def sha256(p: Path):
    h = hashlib.sha256()
    with open(p, 'rb') as f:
        for chunk in iter(lambda: f.read(1<<20), b''):
            h.update(chunk)
    return h.hexdigest()

artifacts = []
for p in Path("exports").rglob("*"):
    if p.is_file():
        artifacts.append({"path": str(p.relative_to("exports")),
                          "bytes": p.stat().st_size,
                          "sha256": sha256(p)})

Path("exports_manifest.json").write_text(json.dumps({
    "best_pt": str(BEST_PT),
    "classes": CLASSES,
    "artifacts": artifacts
}, indent=2))

with zipfile.ZipFile("all_model_exports.zip","w",compression=zipfile.ZIP_DEFLATED) as z:
    for a in artifacts:
        z.write(Path("exports")/a["path"], arcname=Path("exports")/a["path"])
    z.write("exports_manifest.json")

print("all_model_exports.zip ready.")
