In [1]:
#!/usr/bin/env python3
"""
HYBRID + CLASSICAL STEREO PIPELINE  ───────────────────────────────────────
Author  : Barış Bakırdöven
Date    : 2025‑06‑03

This single script offers two complementary depth‑estimation workflows on
KITTI Stereo 2015:

  1. **Hybrid** – A lightweight U‑Net encoder is first *learned* using KITTI
     disparity supervision; at inference its frozen feature maps are fed
     to a hand‑engineered block‑matching stage.
  2. **Classical SGM** – Standard Semi‑Global Matching (OpenCV StereoSGBM)
     runs directly on grey‑scale images, mirroring the earlier MATLAB
     implementation (median filtering, KITTI D1‑all evaluation, assorted
     visualisations).

Invocation examples
────────────────────
$ python hybrid.py                           # train extractor + hybrid eval
$ python hybrid.py --no‑train --weights ext.h5   # hybrid eval only
$ python hybrid.py --classical                  # classical SGM only

All outputs (weights, metrics, PNGs) appear under ./outputs_hybrid/.
"""

# ───────────────────────── Imports & globals ────────────────────────────
import os, glob, time, pathlib, argparse
from typing import Dict, Tuple

import numpy as np
import cv2                                    # ← classical SGM
import tensorflow as tf
from tensorflow.keras import layers, Model

import matplotlib.pyplot as plt; import matplotlib as mpl
mpl.rcParams["figure.autolayout"] = True


# Base directory where both stereo images and calib files sit
BASE_KITTI = pathlib.Path(
    r"C:\Users\Baris\Downloads\ee417\kitti"
)

TRAIN_LEFT_DIR  = BASE_KITTI / "data_scene_flow" / "training" / "image_2"
TRAIN_RIGHT_DIR = BASE_KITTI / "data_scene_flow" / "training" / "image_3"
TRAIN_GT_DIR    = BASE_KITTI / "data_scene_flow" / "training" / "disp_occ_0"

TEST_LEFT_DIR   = BASE_KITTI / "data_scene_flow" / "testing" / "image_2"
TEST_RIGHT_DIR  = BASE_KITTI / "data_scene_flow" / "testing" / "image_3"
CALIB_DIR = BASE_KITTI / "data_scene_flow_calib" / "training" / "calib_cam_to_cam"

SAVE_DIR          = pathlib.Path("./outputs_hybrid"); SAVE_DIR.mkdir(exist_ok=True)

TARGET_SIZE  = (384, 1248)      # H×W after cropping sky rows
VAL_SPLIT    = 0.2
BATCH_SIZE   = 2
MAX_DISP     = 192; MAX_DISP_Q = MAX_DISP//4
EPOCHS_MAIN, EPOCHS_FINE = 5, 2
LR_MAIN,    LR_FINE      = 1e-3, 1e-4
N_SHOW = 2

# ──────────────────────── Utility I/O helpers ───────────────────────────

def list_png(folder: pathlib.Path) -> Dict[str,str]:
    return {os.path.basename(p): p for p in glob.glob(str(folder/"*.png"))}


def load_rgb(fp: tf.Tensor) -> tf.Tensor:
    img = tf.image.decode_png(tf.io.read_file(fp), channels=3)
    return tf.image.convert_image_dtype(img, tf.float32)


def load_disp(fp: tf.Tensor) -> tf.Tensor:
    raw = tf.image.decode_png(tf.io.read_file(fp), channels=1, dtype=tf.uint16)
    return tf.cast(raw, tf.float32) / 256.0

# ───────────────────────── tf.data pipelines ────────────────────────────

def make_dataset(keys, L_map, R_map, G_map):
    LP = tf.constant([L_map[k] for k in keys]); RP = tf.constant([R_map[k] for k in keys]); GP = tf.constant([G_map[k] for k in keys])
    def _f(lp,rp,gp):
        L = tf.image.resize(load_rgb(lp), TARGET_SIZE); R = tf.image.resize(load_rgb(rp), TARGET_SIZE)
        D = tf.image.resize(load_disp(gp), TARGET_SIZE, method='nearest');
        return (L,R), D
    return (tf.data.Dataset.from_tensor_slices((LP,RP,GP)).map(_f, tf.data.AUTOTUNE)
            .batch(BATCH_SIZE).prefetch(tf.data.AUTOTUNE))

def make_test_dataset(keys,L_map,R_map):
    LP = tf.constant([L_map[k] for k in keys]); RP = tf.constant([R_map[k] for k in keys])
    def _f(lp,rp):
        L=tf.image.resize(load_rgb(lp),TARGET_SIZE); R=tf.image.resize(load_rgb(rp),TARGET_SIZE); return (L,R)
    return (tf.data.Dataset.from_tensor_slices((LP,RP)).map(_f,tf.data.AUTOTUNE)
            .batch(BATCH_SIZE).prefetch(tf.data.AUTOTUNE))

# ──────────────────────── Train/val/test splits ─────────────────────────

def build_splits():
    L_ALL,R_ALL,G_ALL = map(list_png,(TRAIN_LEFT_DIR,TRAIN_RIGHT_DIR,TRAIN_GT_DIR))
    keys=[k for k in L_ALL if k in R_ALL and k in G_ALL]; keys.sort()
    from sklearn.model_selection import train_test_split
    tr,va = train_test_split(keys,test_size=VAL_SPLIT,random_state=42)
    train_ds=make_dataset(tr,L_ALL,R_ALL,G_ALL); val_ds=make_dataset(va,L_ALL,R_ALL,G_ALL)
    # --- testing (no GT) ---
    TL,TR=map(list_png,(TEST_LEFT_DIR,TEST_RIGHT_DIR)); tkeys=[k for k in TL if k in TR]; tkeys.sort()
    test_ds=make_test_dataset(tkeys,TL,TR)
    print(f"→ train={len(tr)}  val={len(va)}  test={len(tkeys)}")
    return train_ds,val_ds,test_ds

# ─────────────────── Lightweight U‑Net encoder (¼‑res) ──────────────────

def conv_bn_relu(x,ch,k=3,s=1):
    x=layers.Conv2D(ch,k,s,'same',use_bias=False)(x); x=layers.BatchNormalization()(x); return layers.ReLU()(x)

def unet_encoder(backbone_ch=32, depth=2):
    inp=layers.Input((None,None,3)); x=inp
    for d in range(depth):
        for _ in range(2): x=conv_bn_relu(x, backbone_ch*(2**d))
        x=layers.MaxPool2D(2)(x)
    out=conv_bn_relu(x, backbone_ch*(2**depth))
    return Model(inp,out,name="UNetEncoder")

# ───────────────────── FeatureNet (training surrogate) ──────────────────

def cost_volume(FL,FR,D=MAX_DISP_Q):
    cost=[]
    for d in range(D):
        slice_r=FR if d==0 else tf.pad(FR[:,:,:-d,:],[[0,0],[0,0],[d,0],[0,0]])
        cost.append(tf.reduce_mean(FL*slice_r,-1,keepdims=True))
    return tf.concat(cost,-1)

class FeatureNet(Model):
    def __init__(self):
        super().__init__(); self.max_disp_q=MAX_DISP_Q; self.feat=unet_encoder()
    def call(self,inputs):
        L,R=inputs; FL,FR=self.feat(L),self.feat(R)
        cv=cost_volume(FL,FR,self.max_disp_q); logits=tf.expand_dims(cv,-1)
        prob=tf.nn.softmax(logits,axis=-2)[...,0]
        disp_q=tf.reduce_sum(prob*tf.range(self.max_disp_q,dtype=tf.float32),-1)
        return tf.image.resize(disp_q[...,None]*4.0, tf.shape(L)[1:3])  # full‑res

# ─────────────────── KITTI metrics & masked MAE loss ───────────────────

def masked_mae(gt,pred):
    m=tf.cast(gt>0,tf.float32); return tf.reduce_sum(tf.abs(gt-pred)*m)/tf.reduce_sum(m)

def kitti_d1(pred,gt):
    m=tf.cast(gt>0,gt.dtype); ae=tf.abs(pred-gt); rel=ae/tf.maximum(gt,1e-6); bad=tf.logical_and(ae>3.0,rel>0.05)
    return tf.reduce_sum(tf.cast(bad,gt.dtype)*m), tf.reduce_sum(m)

def mae_rmse(pred,gt):
    m=tf.cast(gt>0,gt.dtype); ae=tf.reduce_sum(tf.abs(pred-gt)*m); se=tf.reduce_sum(tf.square(pred-gt)*m); v=tf.reduce_sum(m)
    return ae,se,v

# ─────────────────────────── HYBRID inference ──────────────────────────

def block_match_feats(FL:np.ndarray,FR:np.ndarray,D=MAX_DISP_Q)->np.ndarray:
    H,W,_=FL.shape; cost=np.full((H,W,D),np.inf,np.float32)
    for d in range(D):
        slice_r=FR if d==0 else np.pad(FR[:,:-d,:],((0,0),(d,0),(0,0)),mode='constant')
        cost[:,:,d]=np.mean(np.abs(FL-slice_r),-1)
    return np.argmin(cost,-1).astype(np.float32)  # ¼‑res

def hybrid_disparity(extractor:Model,L:tf.Tensor,R:tf.Tensor)->tf.Tensor:
    FL=extractor(L,training=False).numpy(); FR=extractor(R,training=False).numpy(); outs=[]
    for b in range(FL.shape[0]):
        disp_q=block_match_feats(FL[b],FR[b]); outs.append(tf.image.resize(disp_q[...,None]*4.0, tf.shape(L)[1:3]))
    return tf.concat(outs,0)

# ───────────────────────── Classical SGM pipeline ───────────────────────

def disparity_sgm_cv2(L_rgb:np.ndarray,R_rgb:np.ndarray, max_disp=128, uniqueness=15):
    grayL=cv2.cvtColor(L_rgb,cv2.COLOR_RGB2GRAY); grayR=cv2.cvtColor(R_rgb,cv2.COLOR_RGB2GRAY)
    # StereoSGBM requires numDisparities divisible by 16
    num_disp=(max_disp//16)*16
    sgbm=cv2.StereoSGBM_create(minDisparity=0, numDisparities=num_disp, blockSize=5,
                               P1=8*3*3**2, P2=32*3*3**2,
                               uniquenessRatio=uniqueness, speckleWindowSize=50,
                               speckleRange=2, disp12MaxDiff=1)
    disp=sgbm.compute(grayL,grayR).astype(np.float32)/16.0
    disp=cv2.medianBlur(disp,3)
    disp[disp<0]=0
    return disp

# ─────────────── Evaluate pipeline (hybrid *or* classical) ──────────────

def eval_loop(disp_fn, dataset, extractor=None):
    total_ae=total_se=total_bad=total_v=0.0; n=0; t0=time.perf_counter()
    for batch in dataset:
        if extractor: (L,R),G=batch; P=disp_fn(extractor,L,R)[...,0]; G=G[...,0]
        else: (L,R),G=batch; P=[];  # placeholder handled below
        if extractor is None:
            # loop over batch for classical SGM (CPU)
            for b in range(L.shape[0]):
                d=disparity_sgm_cv2((L[b].numpy()*255).astype(np.uint8), (R[b].numpy()*255).astype(np.uint8))
                P.append(cv2.resize(d,(TARGET_SIZE[1],TARGET_SIZE[0]),interpolation=cv2.INTER_LINEAR))
            P=np.stack(P,0)
        ae,se,v=mae_rmse(P,G); bad,_=kitti_d1(P,G)
        total_ae+=ae.numpy(); total_se+=se.numpy(); total_bad+=bad.numpy(); total_v+=v.numpy(); n+=int(L.shape[0])
    sec=(time.perf_counter()-t0)/n; mae=total_ae/total_v; rmse=np.sqrt(total_se/total_v); d1=total_bad/total_v*100
    return sec,d1,mae,rmse

# ───────────────────────────── Training loop ────────────────────────────

def train_extractor(train_ds,val_ds):
    fn=FeatureNet(); _=fn((tf.zeros((1,*TARGET_SIZE,3)),)*2)
    fn.compile(tf.keras.optimizers.Adam(LR_MAIN),loss=masked_mae); fn.fit(train_ds,validation_data=val_ds,epochs=EPOCHS_MAIN,verbose=1)
    if EPOCHS_FINE: fn.compile(tf.keras.optimizers.Adam(LR_FINE),loss=masked_mae); fn.fit(train_ds,validation_data=val_ds,epochs=EPOCHS_FINE,verbose=1)
    extractor=fn.feat; extractor.trainable=False; return extractor




In [2]:
# ────────────────────────── Notebook-friendly MAIN ──────────────────────────
# Adjust these flags directly in the cell instead of using argparse
CLASSICAL_ONLY = False   # True → run only classical SGM
NO_TRAIN       = False   # True → skip training and load weights below
WEIGHTS_PATH   = ""      # Path to .h5 extractor weights (used if NO_TRAIN)

# 1. Build datasets
train_ds, val_ds, _ = build_splits()

# 2. Classical-only branch
if CLASSICAL_ONLY:
    sec, d1, mae, rmse = eval_loop(disparity_sgm_cv2, val_ds)
    print(f"[CLASSICAL SGM] time/pair = {sec:.2f}s   D1 = {d1:.2f}%   "
          f"MAE = {mae:.2f}px   RMSE = {rmse:.2f}px")

# 3. Hybrid branch
else:
    if NO_TRAIN and WEIGHTS_PATH:
        extractor = unet_encoder()            # build backbone
        extractor.load_weights(WEIGHTS_PATH)  # load frozen weights
        extractor.trainable = False
    else:
        extractor = train_extractor(train_ds, val_ds)
        extractor.save_weights(SAVE_DIR / "extractor_final.h5")

    sec, d1, mae, rmse = eval_loop(hybrid_disparity, val_ds, extractor)
    print(f"[HYBRID] time/pair = {sec:.2f}s   D1 = {d1:.2f}%   "
          f"MAE = {mae:.2f}px   RMSE = {rmse:.2f}px")

    # 4. Quick qualitative dump (first N_SHOW validation batches)
    disp_cmap = mpl.colormaps['turbo'].copy()
    disp_cmap.set_bad('#303030')

    for bid, ((L, R), G) in enumerate(val_ds.take(N_SHOW)):
        P = hybrid_disparity(extractor, L, R)[..., 0].numpy()
        for i in range(L.shape[0]):
            gt   = np.where(G[i, ..., 0] > 0, G[i, ..., 0], np.nan)
            pred = P[i]
            lo, hi = (0, MAX_DISP) if not np.isfinite(gt).any() \
                     else np.nanpercentile(gt, [1, 99])
            norm = mpl.colors.Normalize(vmin=lo, vmax=hi)

            fig, ax = plt.subplots(1, 3, figsize=(16, 4))
            ax[0].imshow(L[i]);  ax[0].axis('off'); ax[0].set_title('Left')
            ax[1].imshow(gt, cmap=disp_cmap, norm=norm)
            ax[1].axis('off');   ax[1].set_title('GT')
            im = ax[2].imshow(pred, cmap=disp_cmap, norm=norm)
            ax[2].axis('off');   ax[2].set_title('Hybrid')
            fig.colorbar(im, ax=ax[1:], fraction=0.03, pad=0.02, label='px')

            # Save & show
            out = SAVE_DIR / f"hybrid_val_{bid}_{i}.png"
            fig.savefig(out, dpi=150)
            plt.show()
            plt.close(fig)


ValueError: With n_samples=0, test_size=0.2 and train_size=None, the resulting train set will be empty. Adjust any of the aforementioned parameters.