In [1]:
import os
import numpy as np
import pickle
from collections import defaultdict
from glob import glob

In [2]:
def build_ili(descriptor_folder):
    ili = defaultdict(list)
    for file in glob(os.path.join(descriptor_folder, "*.npy")):
        desc = np.load(file)
        key = tuple(desc)  # Binarized HOG
        ili[key].append(file)
    with open("ili_index.pkl", "wb") as f:
        pickle.dump(ili, f)
    print(f"ILI index built with {len(ili)} unique keys.")

In [3]:
def pqhog_search(test_image_path, template_folder, ili_path, top_k=5):
    from sklearn.preprocessing import Binarizer
    from skimage.feature import hog
    import cv2

    with open(ili_path, "rb") as f:
        ili_index = pickle.load(f)

    image = cv2.imread(test_image_path)
    gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)

    # Extract PQ-HOG
    features, _ = hog(gray, pixels_per_cell=(8,8), cells_per_block=(1,1),
                      orientations=9, visualize=True)
    binarized = Binarizer(threshold=0).fit_transform(features.reshape(1, -1)).flatten()
    query_key = tuple(binarized)

    # Simple matching: Hamming distance between binary vectors
    def hamming_dist(a, b):
        return np.sum(np.array(a) != np.array(b))

    distances = []
    for key in ili_index:
        d = hamming_dist(query_key, key)
        distances.append((d, key))

    distances.sort()
    top_keys = [key for _, key in distances[:top_k]]

    matches = []
    for key in top_keys:
        for path in ili_index[key]:
            matches.append(path)
    return matches[:top_k]


In [4]:
def hybrid_detect(test_image_path, template_folder, descriptor_folder, ili_path, output_path):
    matches = pqhog_search(test_image_path, template_folder, ili_path)

    test_img = cv2.imread(test_image_path)
    eq, bin_img = preprocess_roi(test_img)
    best_fit = -1
    best_result = None
    best_template = None

    for match_path in matches:
        template = cv2.imread(match_path)
        result, score = run_ga(template, eq, bin_img, img_size=eq.shape[::-1])
        if result and score > best_fit:
            best_fit = score
            best_result = result
            best_template = template

    if best_result:
        filename = os.path.basename(test_image_path)
        overlay_and_save(test_img, best_template, best_result, os.path.join(output_path, filename))
        print(f"✅ Saved best hybrid match for {filename} with fitness {best_fit:.2f}")
    else:
        print("⚠️ No valid match found.")


In [5]:
# Step 1: Build the ILI index (run once)
build_ili("pqhog_descriptors/test")

# Step 2: Run on one image (test hybrid matching)
hybrid_detect(
    test_image_path="test_images/example.jpg",
    template_folder="templates",
    descriptor_folder="pqhog_descriptors/test",
    ili_path="ili_index.pkl",
    output_path="results"
)


ILI index built with 0 unique keys.


error: OpenCV(4.11.0) /io/opencv/modules/imgproc/src/color.cpp:199: error: (-215:Assertion failed) !_src.empty() in function 'cvtColor'


In [13]:
import random

In [16]:
# hybrid_seatbelt_matching.py

import os
import cv2
import numpy as np
from glob import glob
import pickle
from collections import defaultdict
from skimage.feature import hog
from skimage.transform import resize, rotate
from sklearn.preprocessing import Binarizer

# ---------------------- PQ-HOG + ILI ----------------------
def extract_pqhog(image):
    gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
    features, _ = hog(gray, pixels_per_cell=(8, 8), cells_per_block=(1, 1),
                      orientations=9, visualize=True, block_norm='L2-Hys')
    binarized = Binarizer(threshold=0).fit_transform(features.reshape(1, -1)).flatten()
    return binarized

def generate_pqhog_descriptors(roi_folder, save_folder):
    os.makedirs(save_folder, exist_ok=True)
    for path in glob(os.path.join(roi_folder, "*.jpg")):
        image = cv2.imread(path)
        if image is None: continue
        descriptor = extract_pqhog(image)
        name = os.path.basename(path).replace(".jpg", "_pqhog.npy")
        np.save(os.path.join(save_folder, name), descriptor)

def build_ili_index(descriptor_folder, save_path="ili_index.pkl"):
    ili = defaultdict(list)
    for file in glob(os.path.join(descriptor_folder, "*.npy")):
        desc = np.load(file)
        key = tuple(desc)
        ili[key].append(file)
    with open(save_path, "wb") as f:
        pickle.dump(ili, f)

# ---------------------- Preprocessing ----------------------
def preprocess_roi(roi):
    gray = cv2.cvtColor(roi, cv2.COLOR_BGR2GRAY)
    eq = cv2.equalizeHist(gray)
    bin_img = cv2.adaptiveThreshold(gray, 255, cv2.ADAPTIVE_THRESH_MEAN_C,
                                     cv2.THRESH_BINARY, 23, 5)
    return eq, bin_img

# ---------------------- Fitness & GA ----------------------
def compute_fitness(template_img, eq_img, bin_img, tx, ty, sx, sy, theta):
    h, w = template_img.shape[:2]
    th, tw = int(h * sy), int(w * sx)
    temp = resize(template_img, (th, tw), anti_aliasing=True)
    temp = rotate(temp, theta, resize=False)
    temp = (temp * 255).astype(np.uint8)
    roi_h, roi_w = eq_img.shape
    if tx + tw > roi_w or ty + th > roi_h:
        return 0
    eq_crop = eq_img[ty:ty+th, tx:tx+tw]
    bin_crop = bin_img[ty:ty+th, tx:tx+tw]
    third = th // 3
    if third == 0: return 0
    a = np.mean(eq_crop[0:third])
    b = np.mean(eq_crop[third:2*third])
    c = np.mean(eq_crop[2*third:])
    O1 = (abs(a - b) + abs(c - b)) / 2
    awht = np.sum(bin_crop[0:third] == 255) / (bin_crop[0:third].size + 1e-5)
    bblk = np.sum(bin_crop[third:2*third] == 0) / (bin_crop[third:2*third].size + 1e-5)
    cwht = np.sum(bin_crop[2*third:] == 255) / (bin_crop[2*third:].size + 1e-5)
    O2 = (awht + cwht + 2 * bblk) / 4
    return O1 + O2

def run_ga(template, eq_img, bin_img, img_size, pop_size=30, gen_size=30):
    wtgt, htgt = img_size
    if wtgt <= 46 or htgt <= 18:
        return None, 0
    population = [(
        np.random.randint(0, wtgt - 46),
        np.random.randint(0, htgt - 18),
        np.random.uniform(0.8, 1.2),
        np.random.uniform(0.8, 1.2),
        np.random.uniform(-30, 0)
    ) for _ in range(pop_size)]

    best_fit = -1
    best_ind = None
    for _ in range(gen_size):
        fitnesses = []
        for ind in population:
            tx, ty, sx, sy, theta = ind
            f = compute_fitness(template, eq_img, bin_img, tx, ty, sx, sy, theta)
            fitnesses.append(f)
            if f > best_fit:
                best_fit = f
                best_ind = ind
        parents = [population[i] for i in np.argsort(fitnesses)[-pop_size//5:]]
        new_population = []
        while len(new_population) < pop_size:
            p1, p2 = random.sample(parents, 2)
            child = (
                np.random.choice([p1[0], p2[0]]),
                np.random.choice([p1[1], p2[1]]),
                (p1[2] + p2[2]) / 2,
                (p1[3] + p2[3]) / 2,
                (p1[4] + p2[4]) / 2
            )
            if np.random.rand() < 0.1:
                child = (
                    child[0] + np.random.randint(-5, 5),
                    child[1] + np.random.randint(-5, 5),
                    child[2] * np.random.uniform(0.95, 1.05),
                    child[3] * np.random.uniform(0.95, 1.05),
                    child[4] + np.random.uniform(-2, 2)
                )
            new_population.append(child)
        population = new_population
    return best_ind, best_fit

# ---------------------- Overlay and Save ----------------------
def overlay_and_save(test_img, template, params, out_path):
    tx, ty, sx, sy, theta = params
    th, tw = template.shape[:2]
    h, w = int(th * sy), int(tw * sx)
    resized = resize(template, (h, w), anti_aliasing=True)
    rotated = rotate(resized, theta, resize=False)
    rotated = (rotated * 255).astype(np.uint8)
    if len(rotated.shape) == 2:
        rotated = cv2.cvtColor(rotated, cv2.COLOR_GRAY2BGR)
    overlay = test_img.copy()
    if ty + h > overlay.shape[0] or tx + w > overlay.shape[1]:
        return
    cv2.rectangle(overlay, (tx, ty), (tx + w, ty + h), (0, 255, 0), 2)
    roi = overlay[ty:ty+h, tx:tx+w]
    overlay[ty:ty+h, tx:tx+w] = cv2.addWeighted(roi, 0.5, rotated, 0.5, 0)
    cv2.imwrite(out_path, overlay)

# ---------------------- Hybrid Detector ----------------------
def hybrid_detect_batch(image_folder, template_path, output_path):
    os.makedirs(output_path, exist_ok=True)
    template = cv2.imread(template_path)
    if template is None:
        print("Template load failed.")
        return

    for img_path in glob(os.path.join(image_folder, "*.jpg")):
        test_img = cv2.imread(img_path)
        if test_img is None:
            print(f"⚠️ Failed to load {img_path}")
            continue
        eq, bin_img = preprocess_roi(test_img)
        params, score = run_ga(template, eq, bin_img, img_size=eq.shape[::-1])
        if params:
            result_path = os.path.join(output_path, os.path.basename(img_path))
            overlay_and_save(test_img, template, params, result_path)
            print(f"✅ {os.path.basename(img_path)} → score {score:.2f}")
        else:
            print(f"⚠️ No valid match for {img_path}")


In [8]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [20]:
hybrid_detect_batch(
    image_folder="/content/cropped_rois/test",
    template_path="template.jpg",
    output_path="results"
)

✅ 05-09-04_jpg.rf.3697cd52aa696b844db6277265a9bf25_person-noseatbelt_2.jpg → score 46.63
✅ 1625227981033_jpg.rf.a9131630836bf994d5a538f55a584a0d_person-seatbelt_2.jpg → score 60.26
⚠️ No valid match for /content/cropped_rois/test/05-12-35_jpg.rf.ed26d1f80295a6e15a2dcda065b7a83d_person-noseatbelt_1.jpg
✅ 945_png.rf.a087b9d0e32ea00f5b1ee9600c407a4f_person-noseatbelt_1.jpg → score 28.77
✅ 1625227980786_jpg.rf.44469331538f889d2b7c4266285f6f07_person-seatbelt_1.jpg → score 54.68
✅ frame_001152iiii-14-_jpg.rf.7ed7e63f855cac3da1b30027ccb6a160_person-seatbelt_0.jpg → score 43.72
✅ wwframe_001044-23-_jpg.rf.29f8acc748b79dc2aa35d067e26a77ab_windshield_0.jpg → score 74.82
✅ frame_001296kkkkkk-16-_jpg.rf.28fa2a05ac7a1010665267139268aacf_person-seatbelt_1.jpg → score 29.55
✅ 1_CAR_13-52-25-265_jpg.rf.569639aed27d3e4952fabadf3931afe8_person-seatbelt_0.jpg → score 109.61
✅ 1625227981116_jpg.rf.5324f5aba1261c8be3577ea53b60e34f_person-seatbelt_1.jpg → score 71.02
✅ 703_png_jpg.rf.67d52975664ee040448137

KeyboardInterrupt: 

In [21]:
# prompt: zip file

import zipfile
import os

def zip_files(directory_to_zip, zip_filename):
  """Zips a directory.

  Args:
    directory_to_zip: The directory to zip.
    zip_filename: The name of the output zip file.
  """

  with zipfile.ZipFile(zip_filename, 'w', zipfile.ZIP_DEFLATED) as zipf:
    for root, _, files in os.walk(directory_to_zip):
      for file in files:
        file_path = os.path.join(root, file)
        zipf.write(file_path, arcname=os.path.relpath(file_path, directory_to_zip))
  print(f"Successfully zipped {directory_to_zip} to {zip_filename}")

# Example usage (replace with your actual directory and desired zip file name)
zip_files("results", "results.zip") # zips the current directory


Successfully zipped results to results.zip
