In [3]:
import cv2
import numpy as np

def dark_channel(im, size=15):
    min_rgb = np.min(im, axis=2)
    kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (size,size))
    dark = cv2.erode(min_rgb, kernel)
    return dark

def atmospheric_light(im, dark, top_percent=0.001):
    h, w = dark.shape
    num_pixels = int(h*w*top_percent)
    indices = np.argsort(dark.ravel())[-num_pixels:]
    A = np.mean(im.reshape(-1,3)[indices], axis=0)
    return A

def transmission_estimate(im, A, size=15, omega=0.95):
    normed = im / A
    trans = 1 - omega * dark_channel(normed, size)
    return trans

def guided_filter(I, p, r=60, eps=1e-3):
    src = p.astype(np.float32)
    return cv2.ximgproc.guidedFilter(guide=I, src=src, radius=r, eps=eps)

def recover(im, t, A, t0=0.1):
    t = np.nan_to_num(t, nan=0.1, posinf=0.1, neginf=0.1)
    t = np.clip(t, t0, 1.0)
    J = np.empty_like(im)

    for i in range(3):
        J[:, :, i] = (im[:, :, i] - A[i]) / t + A[i]

    return np.clip(J, 0, 1)

def dehaze_dcp(img_path, size=15, omega=0.95, Amax = 0.95, Amin = 0.7,  gamma = 1.0, top_percent=0.001):
    im = cv2.imread(img_path).astype(np.float64) / 255.0
    dark = dark_channel(im, size=size)
    A = atmospheric_light(im, dark, top_percent=top_percent)
    A = np.clip(A, Amin, Amax)
    t = transmission_estimate(im, A, size=size, omega=omega)
    gray = cv2.cvtColor((im * 255).astype(np.uint8), cv2.COLOR_BGR2GRAY)
    t_refined = guided_filter(gray, t)
    J = recover(im, t_refined, A)
    J = np.power(J, gamma)
    return (J*255).astype(np.uint8), t_refined, dark


In [5]:
import os
from tqdm import tqdm
input_dir = "US-Road-Signs-71/easy/perlin_synth/images"
output_dir = "US-Road-Signs-71/easy/perlin_removed/images"
os.makedirs(output_dir, exist_ok=True)
size = 15
omega = 0.95
Amax = 0.95
Amin=0.7
gamma = 0.85
top_percent=0.001
for filename in tqdm(os.listdir(input_dir)):
    if filename.lower().endswith(('.png', '.jpg', '.jpeg', '.bmp')):
        img_path = os.path.join(input_dir, filename)
        J, t_refined, dark = dehaze_dcp(img_path, size=size, omega=omega, Amax=Amax, Amin=Amin, gamma=gamma, top_percent=top_percent)
        output_path = os.path.join(output_dir, filename)
        cv2.imwrite(output_path, J)



100%|██████████| 509/509 [02:01<00:00,  4.20it/s]


In [6]:
input_dir = "US-Road-Signs-71/easy/DL_synth_2.0/images"
output_dir = "US-Road-Signs-71/easy/DL_synth_2.0_removed/images"
os.makedirs(output_dir, exist_ok=True)
for filename in tqdm(os.listdir(input_dir)):
    if filename.lower().endswith(('.png', '.jpg', '.jpeg', '.bmp')):
        img_path = os.path.join(input_dir, filename)
        J, t_refined, dark = dehaze_dcp(img_path, size=size, omega=omega, Amax=Amax, Amin=Amin, gamma=gamma, top_percent=top_percent)
        output_path = os.path.join(output_dir, filename)
        cv2.imwrite(output_path, J)

input_dir = "US-Road-Signs-71/easy/DL_synth_2.75/images"
output_dir = "US-Road-Signs-71/easy/DL_synth_2.75_removed/images"
os.makedirs(output_dir, exist_ok=True)
for filename in tqdm(os.listdir(input_dir)):
    if filename.lower().endswith(('.png', '.jpg', '.jpeg', '.bmp')):
        img_path = os.path.join(input_dir, filename)
        J, t_refined, dark = dehaze_dcp(img_path, size=size, omega=omega, Amax=Amax, Amin=Amin, gamma=gamma, top_percent=top_percent)
        output_path = os.path.join(output_dir, filename)
        cv2.imwrite(output_path, J)

100%|██████████| 509/509 [02:00<00:00,  4.23it/s]
100%|██████████| 509/509 [01:58<00:00,  4.28it/s]


In [4]:
import cv2
import numpy as np
from sklearn.cluster import KMeans

def estimate_airlight(img, num_samples=2000):
    h, w, _ = img.shape
    img_reshaped = img.reshape(-1, 3)

    # Pick brightest pixels as airlight candidates
    intensities = np.sum(img_reshaped, axis=1)
    indices = np.argsort(intensities)[-num_samples:]
    A = np.mean(img_reshaped[indices], axis=0)
    return A

def get_haze_lines(img, n_clusters=40):
    """Cluster colors into haze-lines (using KMeans)."""
    h, w, _ = img.shape
    pixels = img.reshape(-1, 3).astype(np.float32)

    kmeans = KMeans(n_clusters=n_clusters, n_init=3, random_state=42)
    labels = kmeans.fit_predict(pixels)

    clusters = []
    for i in range(n_clusters):
        idx = np.where(labels == i)[0]
        cluster_pixels = pixels[idx]

        if len(cluster_pixels) == 0:
            continue

        clusters.append({
            "indices": idx,
            "pixels": cluster_pixels,
            "color": np.mean(cluster_pixels, axis=0)
        })

    return clusters, labels, kmeans.cluster_centers_

def estimate_transmission_nonlocal(clusters, A, img_size, beta=1.0):
    """Estimate transmission from haze-lines."""
    h, w = img_size
    t = np.zeros(h*w)

    A_norm = np.linalg.norm(A)

    for cl in clusters:
        c = cl["color"]
        dist = np.linalg.norm(c - A)
        t_val = np.exp(-beta * dist / A_norm)
        t[cl["indices"]] = t_val

    return t.reshape(h, w)

def guided_filter(I, p, r=60, eps=1e-3):
    """Use simple OpenCV guided filter."""
    I = I.astype(np.float32)
    p = p.astype(np.float32)
    return cv2.ximgproc.guidedFilter(guide=I, src=p, radius=r, eps=eps)

def recover_scene(img, t, A, t0=0.1):
    t = np.clip(t, t0, 1)
    J = np.empty_like(img)

    for i in range(3):
        J[:, :, i] = (img[:, :, i] - A[i]) / (t+1e-9) + A[i]

    return np.clip(J, 0, 255).astype(np.uint8)

def non_local_dehaze(img_path, n_clusters=40, beta=1.2, refine=True):
    img = cv2.imread(img_path)
    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    h, w, _ = img.shape

    # 1. Estimate Atmospheric Light A
    A = estimate_airlight(img)

    # 2. Build haze-lines via KMeans
    clusters, labels, centers = get_haze_lines(img, n_clusters=n_clusters)

    # 3. Transmission per haze-line
    t = estimate_transmission_nonlocal(clusters, A, (h, w), beta=beta)

    # 4. Optional guided filter refinement
    if refine:
        gray = cv2.cvtColor(img.astype(np.uint8), cv2.COLOR_RGB2GRAY)
        t = guided_filter(gray, t.astype(np.float32), r=40, eps=1e-3)

    # 5. Recover scene radiance
    J = recover_scene(img.astype(np.float32), t, A)

    return J, t, A


In [None]:
input_dir = "US-Road-Signs-71/easy/perlin_synth/images"
output_dir = "US-Road-Signs-71/easy/perlin_removed_NON/images"
os.makedirs(output_dir, exist_ok=True)

for filename in tqdm(os.listdir(input_dir)):
    if filename.lower().endswith(('.png', '.jpg', '.jpeg', '.bmp')):
        img_path = os.path.join(input_dir, filename)
        J, t, A = non_local_dehaze(img_path, n_clusters=60, beta=1.2, refine=True)
        output_path = os.path.join(output_dir, filename)
        cv2.imwrite(output_path, cv2.cvtColor(J, cv2.COLOR_RGB2BGR))


input_dir = "US-Road-Signs-71/easy/DL_synth_2.0/images"
output_dir = "US-Road-Signs-71/easy/DL_synth_2.0_removed_NON/images"
os.makedirs(output_dir, exist_ok=True)
for filename in tqdm(os.listdir(input_dir)):
    if filename.lower().endswith(('.png', '.jpg', '.jpeg', '.bmp')):
        img_path = os.path.join(input_dir, filename)
        J, t, A = non_local_dehaze(img_path, n_clusters=60, beta=1.2, refine=True)
        output_path = os.path.join(output_dir, filename)
        cv2.imwrite(output_path, cv2.cvtColor(J, cv2.COLOR_RGB2BGR))

input_dir = "US-Road-Signs-71/easy/DL_synth_2.75/images"
output_dir = "US-Road-Signs-71/easy/DL_synth_2.75_removed_NON/images"
os.makedirs(output_dir, exist_ok=True)
for filename in tqdm(os.listdir(input_dir)):
    if filename.lower().endswith(('.png', '.jpg', '.jpeg', '.bmp')):
        img_path = os.path.join(input_dir, filename)
        J, t, A = non_local_dehaze(img_path, n_clusters=60, beta=1.2, refine=True)
        output_path = os.path.join(output_dir, filename)
        cv2.imwrite(output_path, cv2.cvtColor(J, cv2.COLOR_RGB2BGR))



In [5]:
# hybrid_dehaze_gpu.py
import os
import cv2
import math
import numpy as np
import torch
import torch.nn.functional as F

# -------------------------
# Utilities
# -------------------------
def imread_rgb_f32(path):
    bgr = cv2.imread(path, cv2.IMREAD_COLOR)
    if bgr is None:
        raise FileNotFoundError(path)
    rgb = cv2.cvtColor(bgr, cv2.COLOR_BGR2RGB)
    return (rgb.astype(np.float32) / 255.0)

def imsave_rgb_u8(path, rgb_f32):
    rgb = np.clip(rgb_f32, 0.0, 1.0)
    bgr = cv2.cvtColor((rgb * 255.0).astype(np.uint8), cv2.COLOR_RGB2BGR)
    cv2.imwrite(path, bgr)

# -------------------------
# Fast guided filter (CPU, OpenCV-based box filters)
# -------------------------
def fast_guided_filter(I, p, r=30, eps=1e-3):
    """
    I: guidance image in [0,1] float32 (HxW)
    p: input image to filter in [0,1] float32 (HxW)
    returns filtered p
    """
    # ensure float32
    I = I.astype(np.float32)
    p = p.astype(np.float32)
    win = (r, r)

    mean_I = cv2.boxFilter(I, ddepth=-1, ksize=(r,r), normalize=True)
    mean_p = cv2.boxFilter(p, ddepth=-1, ksize=(r,r), normalize=True)
    corr_I = cv2.boxFilter(I * I, ddepth=-1, ksize=(r,r), normalize=True)
    corr_Ip = cv2.boxFilter(I * p, ddepth=-1, ksize=(r,r), normalize=True)

    var_I = corr_I - mean_I * mean_I
    cov_Ip = corr_Ip - mean_I * mean_p

    a = cov_Ip / (var_I + eps)
    b = mean_p - a * mean_I

    mean_a = cv2.boxFilter(a, ddepth=-1, ksize=(r,r), normalize=True)
    mean_b = cv2.boxFilter(b, ddepth=-1, ksize=(r,r), normalize=True)

    q = mean_a * I + mean_b
    return q

# -------------------------
# DCP helpers (cpu numpy)
# -------------------------
def dark_channel_np(im, size=15):
    # im: HxWx3 float32 [0,1]
    min_ch = np.min(im, axis=2)
    kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (size, size))
    dark = cv2.erode((min_ch * 255).astype(np.uint8), kernel)
    return dark.astype(np.float32) / 255.0

def estimate_atmospheric_light_np(im, dark, top_percent=0.001):
    h, w = dark.shape
    num = max(1, int(h * w * top_percent))
    idx = np.argsort(dark.ravel())[-num:]
    A = np.mean(im.reshape(-1, 3)[idx], axis=0)
    return A

def transmission_from_dcp_np(im, A, size=15, omega=0.95):
    norm = im / (A[None, None, :] + 1e-8)
    t = 1.0 - omega * dark_channel_np(norm, size)
    return np.clip(t, 0.0, 1.0)

# -------------------------
# GPU KMeans (PyTorch) - runs on device if available
# -------------------------
def torch_kmeans(points, n_clusters=32, n_iters=20, device=None, init_centers=None):
    """
    Simple kmeans implemented in PyTorch.
    points: (N, 3) float32 tensor
    returns: centers (k,3) tensor, labels (N,) long tensor
    """
    if device is None:
        device = points.device

    N, D = points.shape
    if init_centers is None:
        # random sample initial centers
        idx = torch.randperm(N, device=device)[:n_clusters]
        centers = points[idx].clone()
    else:
        centers = init_centers.to(device).clone()

    for it in range(n_iters):
        # compute distance in a vectorized way: (N, k)
        # use broadcasting: (N,1,3) - (1,k,3) -> (N,k,3)
        # but compute squared distances efficiently
        # dist = (points.unsqueeze(1) - centers.unsqueeze(0)).pow(2).sum(2)
        # using formula: ||x||^2 - 2 x.c + ||c||^2
        pts_norm = (points * points).sum(1, keepdim=True)  # (N,1)
        centers_norm = (centers * centers).sum(1).unsqueeze(0)  # (1,k)
        dots = points @ centers.t()  # (N,k)
        dists = pts_norm - 2.0 * dots + centers_norm  # (N,k)

        labels = torch.argmin(dists, dim=1)  # (N,)
        # update centers
        new_centers = []
        for k in range(n_clusters):
            mask = (labels == k)
            if mask.sum() == 0:
                # reinitialize empty cluster to a random point
                new_centers.append(points[torch.randint(0, N, (1,), device=device)].squeeze(0))
            else:
                new_centers.append(points[mask].mean(dim=0))
        centers = torch.stack(new_centers, dim=0)
    return centers, labels

# -------------------------
# Hybrid dehaze pipeline
# -------------------------
def hybrid_dehaze_gpu(
    img_path,
    out_path=None,
    # clustering params
    n_clusters=32,
    sample_size=30000,
    kmeans_iters=12,
    # image scale for clustering (smaller -> faster)
    scale=0.6,
    # guided filter params
    gf_r=40,
    gf_eps=1e-3,
    # DCP params (for fusion)
    dcp_patch=15,
    dcp_omega=0.85,
    dcp_top_percent=0.001,
    # recovery
    t0=0.05,
    device_preference=None  # "cuda" or "cpu" or None (auto)
):
    # 1) load image
    im = imread_rgb_f32(img_path)  # HxWx3 float32 in [0,1]
    h, w = im.shape[:2]

    # 2) optionally downscale to speed up clustering
    if scale < 1.0:
        small = cv2.resize(im, (int(w*scale), int(h*scale)), interpolation=cv2.INTER_AREA)
    else:
        small = im.copy()
    hs, ws = small.shape[:2]

    # 3) Prepare pixel samples for clustering (GPU)
    pixels = (small.reshape(-1, 3).copy()).astype(np.float32)  # Nx3 numpy
    N = pixels.shape[0]
    # sample indices
    if sample_size < N:
        idx = np.random.choice(N, size=sample_size, replace=False)
        sample_pixels = pixels[idx]
    else:
        sample_pixels = pixels

    # 4) Move samples to torch (GPU if available and requested)
    use_cuda = False
    if device_preference == "cuda":
        use_cuda = torch.cuda.is_available()
    elif device_preference == "cpu":
        use_cuda = False
    else:
        use_cuda = torch.cuda.is_available()

    device = torch.device("cuda" if use_cuda else "cpu")
    pts = torch.from_numpy(sample_pixels).to(device)

    # 5) Run kmeans on sampled points (fast on GPU)
    centers_t, _ = torch_kmeans(pts, n_clusters=n_clusters, n_iters=kmeans_iters, device=device)
    centers = centers_t.detach().cpu().numpy()  # kx3

    # 6) Assign every (small) pixel to nearest center (vectorized on CPU for memory)
    # For speed, do assignment in PyTorch if we are on GPU (faster)
    if use_cuda:
        # compute distances on GPU for full pixels
        all_pts = torch.from_numpy(pixels).to(device)  # (N,3)
        centers_dev = centers_t.to(device)
        # compute squared dists using norms trick
        ap = (all_pts * all_pts).sum(1, keepdim=True)
        ac = (centers_dev * centers_dev).sum(1).unsqueeze(0)
        dots = all_pts @ centers_dev.t()
        dists = ap - 2.0 * dots + ac  # (N,k)
        labels = torch.argmin(dists, dim=1).cpu().numpy()
    else:
        # CPU assign (vectorized numpy)
        # dists: N x k, careful about memory - do in chunks
        k = centers.shape[0]
        labels = np.empty((N,), dtype=np.int32)
        chunk = 200000  # safe chunk size
        for i0 in range(0, N, chunk):
            i1 = min(N, i0 + chunk)
            a = pixels[i0:i1, :]  # (M,3)
            # distance to centers
            # use (a**2).sum(1,keepdims) - 2 a @ centers.T + centers_norm
            a_norm = (a * a).sum(axis=1, keepdims=True)  # (M,1)
            c_norm = (centers * centers).sum(axis=1, keepdims=True).T  # (1,k)
            dots = a @ centers.T  # (M,k)
            dists = a_norm - 2.0 * dots + c_norm
            labels[i0:i1] = np.argmin(dists, axis=1)

    labels_img = labels.reshape(hs, ws)

    # 7) Estimate atmospheric light A (use DCP-based robust selection on small image)
    dark_small = dark_channel_np(small, size=dcp_patch)
    A_dcp = estimate_atmospheric_light_np(small, dark_small, top_percent=dcp_top_percent)
    # also compute a centers-based A (brightest center)
    center_intens = centers.sum(axis=1)
    A_centers = centers[np.argmax(center_intens)]
    # fuse: prefer DCP estimate but clamp near centers estimate to avoid pure-white
    A = np.clip(A_dcp, 0.0, 0.98)
    # small safety: if A is extremely close to 1, pull it toward center
    if np.mean(A) > 0.95:
        A = 0.6 * A + 0.4 * A_centers
    A = np.clip(A, 0.4, 0.98)

    # 8) Non-local transmission estimate per cluster
    # distance of cluster center to A -> larger distance -> larger transmission (less haze)
    centers_norm = np.linalg.norm(centers, axis=1)
    A_norm = np.linalg.norm(A) + 1e-8
    # compute scalar t_k per cluster using a soft mapping
    # tune alpha to control strength
    alpha = 2.0
    dists_cent = np.linalg.norm(centers - A[None, :], axis=1)
    # normalize distances to [0,1]
    dmin, dmax = dists_cent.min(), dists_cent.max()
    if dmax - dmin < 1e-6:
        rel = np.zeros_like(dists_cent)
    else:
        rel = (dists_cent - dmin) / (dmax - dmin)
    # map rel -> transmission using an exponential or linear mapping (we use exp to emphasize near-A)
    t_clusters = np.clip(np.exp(-alpha * (1.0 - rel)), 0.05, 0.99)  # cluster-wise t

    # fill per-pixel t on small image
    t_small = t_clusters[labels_img]

    # 9) refine transmission via guided filter on CPU
    gray_small = cv2.cvtColor((small * 255).astype(np.uint8), cv2.COLOR_RGB2GRAY).astype(np.float32) / 255.0
    t_refined_small = fast_guided_filter(gray_small, t_small.astype(np.float32), r=gf_r, eps=gf_eps)

    # 10) upsample transmission to full resolution
    t_full = cv2.resize(t_refined_small, (w, h), interpolation=cv2.INTER_LINEAR)
    t_full = np.clip(t_full, t0, 0.99)

    # 11) Recover radiance on CPU
    # ensure im is float32 [0,1]
    J = np.empty_like(im, dtype=np.float32)
    for c in range(3):
        J[:, :, c] = (im[:, :, c] - A[c]) / t_full + A[c]
    J = np.clip(J, 0.0, 1.0)

    # 12) Optional fusion with DCP (blend where DCP is more reliable)
    # compute DCP transmission
    t_dcp_small = transmission_from_dcp_np(small, A, size=dcp_patch, omega=dcp_omega)
    t_dcp_small = cv2.normalize(t_dcp_small, None, 0.0, 1.0, cv2.NORM_MINMAX)
    t_dcp_full = cv2.resize(t_dcp_small, (w, h), interpolation=cv2.INTER_LINEAR)
    # fusion weight: where DCP suggests denser haze (low t), give more weight to DCP, else non-local
    w_dcp = np.clip((1.0 - t_dcp_full) * 2.0, 0.0, 1.0)  # heuristic
    # combined t
    t_combined = w_dcp * t_dcp_full + (1.0 - w_dcp) * t_full
    t_combined = np.clip(t_combined, t0, 0.99)

    # final recovery with combined transmission
    J_comb = np.empty_like(im, dtype=np.float32)
    for c in range(3):
        J_comb[:, :, c] = (im[:, :, c] - A[c]) / t_combined + A[c]
    J_comb = np.clip(J_comb, 0.0, 1.0)

    # small gamma correction for visual quality
    J_comb = np.power(J_comb, 0.95)

    # save if requested
    if out_path:
        os.makedirs(os.path.dirname(out_path), exist_ok=True)
        imsave_rgb_u8(out_path, J_comb)

    return {
        "J": J_comb,
        "t_full": t_full,
        "t_combined": t_combined,
        "A": A,
        "use_cuda": use_cuda
    }


In [14]:
# device_pref = None if args.device == "auto" else ("cuda" if args.device == "cuda" else "cpu")
input_dir = "US-Road-Signs-71/easy/perlin_synth/images"
output_dir = "US-Road-Signs-71/easy/perlin_removed_NON/images"
os.makedirs(output_dir, exist_ok=True)

for filename in tqdm(os.listdir(input_dir)):
    if filename.lower().endswith(('.png', '.jpg', '.jpeg', '.bmp')):
        img_path = os.path.join(input_dir, filename)
        output_path = os.path.join(output_dir, filename)
        res = hybrid_dehaze_gpu(
            img_path,
            out_path=output_path,
            n_clusters=32,
            scale=1.0,
            device_preference="cuda"
            )


100%|██████████| 509/509 [02:07<00:00,  4.00it/s]


In [15]:
input_dir = "US-Road-Signs-71/easy/DL_synth_2.0/images"
output_dir = "US-Road-Signs-71/easy/DL_synth_2.0_removed_NON/images"
os.makedirs(output_dir, exist_ok=True)

for filename in tqdm(os.listdir(input_dir)):
    if filename.lower().endswith(('.png', '.jpg', '.jpeg', '.bmp')):
        img_path = os.path.join(input_dir, filename)
        output_path = os.path.join(output_dir, filename)
        res = hybrid_dehaze_gpu(
            img_path,
            out_path=output_path,
            n_clusters=32,
            scale=1.0,
            device_preference="cuda"
            )

input_dir = "US-Road-Signs-71/easy/DL_synth_2.75/images"
output_dir = "US-Road-Signs-71/easy/DL_synth_2.75_removed_NON/images"
os.makedirs(output_dir, exist_ok=True)

for filename in tqdm(os.listdir(input_dir)):
    if filename.lower().endswith(('.png', '.jpg', '.jpeg', '.bmp')):
        img_path = os.path.join(input_dir, filename)
        output_path = os.path.join(output_dir, filename)
        res = hybrid_dehaze_gpu(
            img_path,
            out_path=output_path,
            n_clusters=32,
            scale=1.0,
            device_preference="cuda"
            )

100%|██████████| 509/509 [02:05<00:00,  4.04it/s]
100%|██████████| 509/509 [02:05<00:00,  4.06it/s]


In [11]:
input_dir = "US-Road-Signs-71/collection"
output_dir = "US-Road-Signs-71/easy/collection/images"
os.makedirs(output_dir, exist_ok=True)
size = 15
omega = 0.95
Amax = 0.95
Amin=0.7
gamma = 1.0
top_percent=0.01
for filename in tqdm(os.listdir(input_dir)):
    if filename.lower().endswith(('.png', '.jpg', '.jpeg', '.bmp', '.JPEG')):
        img_path = os.path.join(input_dir, filename)
        output_path = os.path.join(output_dir, 'hybid_' + filename )
        res = hybrid_dehaze_gpu(
            img_path,
            out_path=output_path,
            n_clusters=32,
            scale=1.0,
            device_preference="cuda"
            )
        
        output_path = os.path.join(output_dir, 'dcp_' + filename )
        J, t_refined, dark = dehaze_dcp(img_path, size=size, omega=omega, Amax=Amax, Amin=Amin, gamma=gamma, top_percent=top_percent)
        output_path = os.path.join(output_dir, 'dcp' + filename)
        cv2.imwrite(output_path, J)
        

100%|██████████| 6/6 [00:08<00:00,  1.42s/it]


In [2]:
import cv2
import numpy as np
import matplotlib.pyplot as plt
from scipy import stats
import os
from tqdm import tqdm
def histogram_equalization(image):
    """
    Apply histogram equalization to enhance low-light images.
    For color images, applies to V channel in HSV color space.
    """
    if len(image.shape) == 2:
        # Grayscale image
        equalized = cv2.equalizeHist(image)
    else:
        # Color image - convert to HSV and equalize V channel
        img_hsv = cv2.cvtColor(image, cv2.COLOR_BGR2HSV)
        img_hsv[:, :, 2] = cv2.equalizeHist(img_hsv[:, :, 2])
        equalized = cv2.cvtColor(img_hsv, cv2.COLOR_HSV2BGR)
    
    return equalized

def gamma_correction(image, gamma=0.5):
    """
    Apply gamma correction with specified gamma value.
    
    Args:
        image: Input image (BGR format)
        gamma: Gamma value for correction
    """
    # Build lookup table for gamma correction
    # inv_gamma = 1.0 / gamma
    table = np.array([((i / 255.0) ** gamma) * 255 
                      for i in np.arange(0, 256)]).astype("uint8")
    
    # Apply gamma correction using lookup table
    corrected = cv2.LUT(image, table)
    return corrected

def analyze_image_brightness(image):
    """
    Analyze image to determine optimal parameters.
    """
    # Convert to grayscale for analysis
    if len(image.shape) == 3:
        gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
    else:
        gray = image
    
    # Compute statistics
    mean_intensity = np.mean(gray)
    std_intensity = np.std(gray)
    median_intensity = np.median(gray)
    
    # Compute percentiles
    p10 = np.percentile(gray, 10)
    p50 = np.percentile(gray, 50)
    p90 = np.percentile(gray, 90)
    
    # Determine brightness level
    is_lowlight = mean_intensity < 80
    is_very_lowlight = mean_intensity < 50
    
    # Recommend gamma based on analysis
    if is_very_lowlight:
        recommended_gamma = 0.4
        reason = "Very low mean intensity detected"
    elif is_lowlight:
        recommended_gamma = 0.5
        reason = "Low mean intensity detected"
    elif mean_intensity < 120:
        recommended_gamma = 0.6
        reason = "Moderately low mean intensity"
    else:
        recommended_gamma = 0.8
        reason = "Image already fairly bright"
    
    # Assess contrast
    if std_intensity < 30:
        contrast_level = "Low contrast"
    elif std_intensity < 60:
        contrast_level = "Moderate contrast"
    else:
        contrast_level = "High contrast"
    
    return {
        'mean_intensity': mean_intensity,
        'std_intensity': std_intensity,
        'median_intensity': median_intensity,
        'p10': p10,
        'p50': p50,
        'p90': p90,
        'is_lowlight': is_lowlight,
        'recommended_gamma': recommended_gamma,
        'recommendation_reason': reason,
        'contrast_level': contrast_level
    }

def select_optimal_gamma(image, gamma_range=(0.3, 0.4, 0.5, 0.6, 0.7)):
    """
    Test multiple gamma values and select optimal one.
    """
    results = {}
    
    for gamma in gamma_range:
        enhanced = gamma_correction(image, gamma)
        gray = cv2.cvtColor(enhanced, cv2.COLOR_BGR2GRAY)
        
        mean_int = np.mean(gray)
        std_int = np.std(gray)
        overexposed = np.sum(gray > 245) / gray.size * 100
        underexposed = np.sum(gray < 10) / gray.size * 100
        
        # Quality score: target mean ~130, penalize over-exposure
        brightness_score = 100 - abs(mean_int - 130)
        overexposure_penalty = overexposed * 5
        quality_score = brightness_score - overexposure_penalty
        
        results[gamma] = {
            'mean_intensity': mean_int,
            'std_intensity': std_int,
            'overexposed_pct': overexposed,
            'underexposed_pct': underexposed,
            'quality_score': quality_score
        }
    
    # Select gamma with highest quality score
    optimal_gamma = max(results.keys(), key=lambda g: results[g]['quality_score'])
    return optimal_gamma, results

ipnut_dir = "US-Road-Signs-71/easy/low_light/images"
output_dir = "US-Road-Signs-71/easy/low_light_hist/images"
os.makedirs(output_dir, exist_ok=True)
for filename in tqdm(os.listdir(ipnut_dir)):
    if filename.lower().endswith(('.png', '.jpg', '.jpeg', '.bmp')):
        img_path = os.path.join(ipnut_dir, filename)
        image = cv2.imread(img_path)
        enhanced = histogram_equalization(image)
        output_path = os.path.join(output_dir, filename)
        cv2.imwrite(output_path, enhanced)
    
output_dir = "US-Road-Signs-71/easy/low_light_gamma/images"
os.makedirs(output_dir, exist_ok=True)
for filename in tqdm(os.listdir(ipnut_dir)):
    if filename.lower().endswith(('.png', '.jpg', '.jpeg', '.bmp')):
        img_path = os.path.join(ipnut_dir, filename)
        image = cv2.imread(img_path)
        analysis = analyze_image_brightness(image)
        recommended_gamma = analysis['recommended_gamma']
        enhanced = gamma_correction(image, gamma=recommended_gamma)
        output_path = os.path.join(output_dir, filename)
        cv2.imwrite(output_path, enhanced)


100%|██████████| 509/509 [00:06<00:00, 78.96it/s]
100%|██████████| 509/509 [00:15<00:00, 33.74it/s]
