<a href="https://colab.research.google.com/github/kikunota/3d-reconstruction-prototype/blob/main/Layout_Matching.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
"""
🧰 Cell 1 — Setup (installs)
"""

# ✅ Clean Cell 1 for Colab (no TF/cuDF conflicts)
# 1) Optional: remove unused libs that demand NumPy>=2 (quiet the warnings)
!pip -q uninstall -y thinc spacy -y >/dev/null 2>&1 || true

# 2) Pin OpenCV to a build compatible with NumPy 1.26 (what Colab ships)
!pip -q install "opencv-python-headless==4.8.1.78"

# 3) Install what we actually need
!pip -q install "faiss-cpu==1.8.0.post1" "open_clip_torch==2.26.1"

# 4) Sanity check
import numpy, pandas as pd, faiss, torch, open_clip
print("numpy:", numpy.__version__)       # should be 1.26.x in default Colab
print("pandas:", pd.__version__)         # should be 2.2.2 in default Colab
print("faiss:", faiss.__version__)
print("torch:", torch.__version__)
print("open_clip_torch:", open_clip.__version__)

[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m49.1/49.1 MB[0m [31m10.7 MB/s[0m eta [36m0:00:00[0m
[?25h[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
albucore 0.0.24 requires opencv-python-headless>=4.9.0.80, but you have opencv-python-headless 4.8.1.78 which is incompatible.
albumentations 2.0.8 requires opencv-python-headless>=4.9.0.80, but you have opencv-python-headless 4.8.1.78 which is incompatible.[0m[31m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m61.0/61.0 kB[0m [31m3.3 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m27.0/27.0 MB[0m [31m53.1 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.5/1.5 MB[0m [31m57.7 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m18.0/18.0

ValueError: numpy.dtype size changed, may indicate binary incompatibility. Expected 96 from C header, got 88 from PyObject

In [None]:
"""
Cell 2 — Imports & helpers
"""

import os, io, math, json, textwrap, random
from typing import List, Dict, Tuple
import numpy as np
import pandas as pd
from PIL import Image
from tqdm import tqdm

import torch
import faiss

import open_clip  # CLIP family (OpenCLIP/SigLIP checkpoints)

DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
print("Device:", DEVICE)

def set_seed(seed=42):
    random.seed(seed); np.random.seed(seed); torch.manual_seed(seed)
    if torch.cuda.is_available(): torch.cuda.manual_seed_all(seed)
set_seed(42)

def cosine_sim_mat(a: np.ndarray, b: np.ndarray) -> np.ndarray:
    # a: [N, d], b: [M, d] -> [N, M]
    a_norm = a / (np.linalg.norm(a, axis=1, keepdims=True) + 1e-8)
    b_norm = b / (np.linalg.norm(b, axis=1, keepdims=True) + 1e-8)
    return a_norm @ b_norm.T

def show_hit(row, topk_df):
    print(f"\nQUERY: {row.get('project','')} | {row.get('community','')} | size≈{row.get('size_min','')}–{row.get('size_max','')} m² | {row.get('features_text','')}")
    print(topk_df.to_string(index=False))


In [None]:
"""
🧾 Cell 3 — Create a CSV template (download & fill locally)

Use this once to get the template. Then re-upload with your data (and images).
"""
template = pd.DataFrame([
    {
        "plan_filename": "TOWERA_08A.jpg",  # ground-truth plan image filename (optional but needed for accuracy metrics)
        "project": "Downtown Tower A",
        "community": "Downtown Dubai",
        "tower": "A",
        "size_min": 80,          # in m² (or leave blank)
        "size_max": 95,
        "level_band": "Floors 10-20",  # free text
        "orientation": "Burj-facing",  # free text
        "features_text": "2 bathrooms; closed kitchen; balcony; storage; laundry"
    },
    {
        "plan_filename": "TOWERB_02B.jpg",
        "project": "Marina Residences",
        "community": "Dubai Marina",
        "tower": "B",
        "size_min": 65,
        "size_max": 72,
        "level_band": "Floors 2-8",
        "orientation": "Partial marina view",
        "features_text": "1 bathroom; open kitchen; balcony"
    }
])

# Save locally in Colab
csv_path = "property_profiles_template.csv"
template.to_csv(csv_path, index=False)

# Trigger download to your computer
files.download(csv_path)


In [None]:
"""
⬆️ Cell 4 — Upload your CSV & floor-plan images

Prepare a folder of plan images (JPG/PNG/PDF first page as image), e.g. TOWERA_08A.jpg.

CSV must have at least: project, community, features_text (others optional).

If you add the correct plan filename under plan_filename, we’ll compute accuracy.
"""
from google.colab import files
import zipfile, pathlib

print("Upload your profiles CSV:")
uploaded_csv = files.upload()  # choose your file, e.g., profiles.csv
csv_name = list(uploaded_csv.keys())[0]
profiles = pd.read_csv(io.BytesIO(uploaded_csv[csv_name]))
print(f"Loaded {profiles.shape[0]} profiles")

# Upload a zip of images OR multiple individual images; both supported.
print("Upload (a) a zip of floor-plan images OR (b) several image files:")
uploads = files.upload()

IMG_DIR = "/content/plans"
os.makedirs(IMG_DIR, exist_ok=True)

for name, data in uploads.items():
    if name.lower().endswith(".zip"):
        with zipfile.ZipFile(io.BytesIO(data), 'r') as zf:
            zf.extractall(IMG_DIR)
    else:
        # write the file directly
        out_path = os.path.join(IMG_DIR, name)
        with open(out_path, "wb") as f:
            f.write(data)

# Gather image paths (recursive)
valid_ext = {".jpg",".jpeg",".png",".webp",".bmp",".tif",".tiff"}
image_paths = []
for root, _, files_ in os.walk(IMG_DIR):
    for fn in files_:
        if pathlib.Path(fn).suffix.lower() in valid_ext:
            image_paths.append(os.path.join(root, fn))

print(f"Found {len(image_paths)} plan images.")
assert len(image_paths) > 0, "No images found. Please upload plan images."


In [None]:
"""
✍️ Cell 5 — Turn each profile into a CLIP text prompt

Keep it short and factual—CLIP likes concise descriptions.
"""

def build_prompt(row: pd.Series) -> str:
    parts = []
    if pd.notna(row.get("project")) and str(row["project"]).strip():
        parts.append(str(row["project"]).strip())
    if pd.notna(row.get("community")) and str(row["community"]).strip():
        parts.append(str(row["community"]).strip())
    if pd.notna(row.get("tower")) and str(row["tower"]).strip():
        parts.append(f"Tower {str(row['tower']).strip()}")
    # Size band
    smi, sma = row.get("size_min"), row.get("size_max")
    if pd.notna(smi) and pd.notna(sma):
        parts.append(f"{int(smi)}–{int(sma)} m²")
    elif pd.notna(smi):
        parts.append(f"≥{int(smi)} m²")
    elif pd.notna(sma):
        parts.append(f"≤{int(sma)} m²")
    if pd.notna(row.get("level_band")) and str(row["level_band"]).strip():
        parts.append(str(row["level_band"]).strip())
    if pd.notna(row.get("orientation")) and str(row["orientation"]).strip():
        parts.append(str(row["orientation"]).strip())
    # Features (keep short)
    feats = str(row.get("features_text","")).strip()
    if feats:
        parts.append(feats)
    # Final prompt
    return ", ".join(parts) + ". Floor plan layout."

profiles["prompt"] = profiles.apply(build_prompt, axis=1)
profiles[["prompt"]].head(5)


In [None]:
"""
🖼️ Cell 6 — Load CLIP & preprocess

Using a strong OpenCLIP checkpoint (EVA02-CLIP-B).
"""
# ✅ Cell 6 — Load CLIP model + preprocess (with safe fallbacks)
import torch
import open_clip
from PIL import Image

DEVICE = "cuda" if torch.cuda.is_available() else "cpu"

# Prefer models that work well on line drawings / floor plans, then fall back.
CANDIDATES = [
    ("EVA02-B-16", "merged2b_s8b_b131k"),   # good on diagrams
    ("ViT-B-16",   "laion2b_s34b_b88k"),    # widely available
    ("ViT-B-32",   "laion2b_s34b_b79k"),    # most compatible fallback
]

model = preprocess = tokenizer = None
last_error = None

for MODEL_NAME, PRETRAINED in CANDIDATES:
    try:
        print(f"Trying {MODEL_NAME} / {PRETRAINED} ...")
        _model, _, _preprocess = open_clip.create_model_and_transforms(
            MODEL_NAME, pretrained=PRETRAINED, device=DEVICE
        )
        _tokenizer = open_clip.get_tokenizer(MODEL_NAME)
        _model.eval()
        model, preprocess, tokenizer = _model, _preprocess, _tokenizer
        print(f"Loaded ✓ {MODEL_NAME} / {PRETRAINED} on {DEVICE}")
        break
    except Exception as e:
        last_error = e
        print(f"→ Failed: {e}")

if model is None:
    raise RuntimeError(f"Could not load any CLIP checkpoint. Last error:\n{last_error}")

print("Ready. DEVICE:", DEVICE)


In [None]:
"""
⚙️ Cell 7 — Encode all images

This precomputes image embeddings once; scale: thousands OK on Colab.
"""


def load_image(path, max_side=2048):
    img = Image.open(path).convert("RGB")
    # (Optional) downscale huge images to speed up
    w, h = img.size
    scale = min(1.0, float(max_side)/max(w,h))
    if scale < 1.0:
        img = img.resize((int(w*scale), int(h*scale)), Image.BICUBIC)
    return img

batch = 64
all_imgs, names = [], []
for p in image_paths:
    try:
        img = load_image(p)
        all_imgs.append(preprocess(img))
        names.append(os.path.basename(p))
    except Exception as e:
        print("Bad image:", p, e)

img_t = torch.stack(all_imgs).to(DEVICE)
img_embeds = []
with torch.no_grad():
    for i in tqdm(range(0, len(img_t), batch), desc="Encoding images"):
        chunk = img_t[i:i+batch]
        emb = model.encode_image(chunk)
        emb = emb / emb.norm(dim=-1, keepdim=True)
        img_embeds.append(emb)
img_embeds = torch.cat(img_embeds, dim=0).float().cpu().numpy()

print("Image embeddings:", img_embeds.shape)


In [None]:
"""
🔎 Cell 8 — Build FAISS index for fast top-K search

"""


d = img_embeds.shape[1]
index = faiss.IndexHNSWFlat(d, 32)   # simple, solid default for small/medium sets
index.hnsw.efConstruction = 128
index.hnsw.efSearch = 64
index.add(img_embeds)
print("FAISS index size:", index.ntotal)


In [None]:
"""
🧪 Cell 9 — Zero-shot retrieval (text → image) + metrics

For each profile prompt, we encode text and search top-K plans.

If your CSV includes plan_filename (exact match to an image filename), we compute Top-1/Top-5 accuracy.
"""

TOPK = 5

def encode_texts(prompts: List[str]) -> np.ndarray:
    out = []
    with torch.no_grad():
        for i in range(0, len(prompts), 256):
            tok = tokenizer(prompts[i:i+256]).to(DEVICE)
            emb = model.encode_text(tok)
            emb = emb / emb.norm(dim=-1, keepdim=True)
            out.append(emb.float().cpu().numpy())
    return np.vstack(out)

text_vecs = encode_texts(profiles["prompt"].tolist())
D, I = index.search(text_vecs, TOPK)    # distances are inner products due to normalized embeddings

# Assemble results
results = []
for r_idx, (dists, idxs) in enumerate(zip(D, I)):
    top = [{"rank": j+1, "filename": names[i], "score": float(dists[j])} for j, i in enumerate(idxs)]
    results.append(top)

# Compute metrics if ground-truth available
has_gt = "plan_filename" in profiles.columns and profiles["plan_filename"].notna().any()

top1, top5, cnt = 0, 0, 0
rows_for_preview = []
for i, top in enumerate(results):
    gt = str(profiles.loc[i].get("plan_filename","")).strip()
    if gt:
        cnt += 1
        top_files = [t["filename"] for t in top]
        if len(top_files)>0 and top_files[0] == gt: top1 += 1
        if gt in top_files: top5 += 1
    # store small preview table rows
    for t in top:
        if t["rank"]<=3:  # keep short
            rows_for_preview.append({
                "query_id": i,
                "prompt": profiles.loc[i, "prompt"][:120] + ("..." if len(profiles.loc[i, "prompt"])>120 else ""),
                "rank": t["rank"],
                "candidate": t["filename"],
                "score": round(t["score"], 3),
                "GT": gt if gt else ""
            })

preview_df = pd.DataFrame(rows_for_preview)
display(preview_df.head(20))

if has_gt and cnt>0:
    print(f"\nEval on {cnt} labeled queries:")
    print(f"Top-1 accuracy: {top1/cnt:.3f}")
    print(f"Top-5 accuracy: {top5/cnt:.3f}")
else:
    print("\nNo ground-truth 'plan_filename' provided; skipping accuracy.")


In [None]:
"""
👀 Cell 10 — Inspect a few queries with their top-K
"""
sample_n = min(5, len(profiles))
for i in range(sample_n):
    topk_df = pd.DataFrame(results[i])[:TOPK]
    show_hit(profiles.iloc[i], topk_df[["rank","filename","score"]])


In [None]:
# ✅ Cell 11 — Your own input → check for layout match (with optional GT check)
import os, torch
from PIL import Image
from IPython.display import display

# Prereqs: run Cell 4 (uploads), Cell 5 (build_prompt), Cell 6 (load model), Cell 7 (image encodings)
assert 'model' in globals() and 'tokenizer' in globals() and 'img_embeds' in globals() and 'names' in globals(), \
    "Please run Cells 4, 5, 6, and 7 first."
name_to_path = {os.path.basename(p): p for p in image_paths}

def build_prompt_from_fields(fields: dict) -> str:
    """Build a concise CLIP-friendly prompt from structured inputs (same keys as your CSV)."""
    parts = []
    for k in ["project", "community"]:
        v = fields.get(k)
        if v: parts.append(str(v).strip())
    if fields.get("tower"): parts.append(f"Tower {str(fields['tower']).strip()}")
    smi, sma = fields.get("bed"), fields.get("bath")
    if smi and sma: parts.append(f"{int(smi)}–{int(sma)} m²")
    elif smi:       parts.append(f"≥{int(smi)} m²")
    elif sma:       parts.append(f"≤{int(sma)} m²")
    if fields.get("level_band"): parts.append(str(fields["level_band"]).strip())
    if fields.get("orientation"): parts.append(str(fields["orientation"]).strip())
    feats = str(fields.get("features_text","")).strip()
    if feats: parts.append(feats)
    return ", ".join(parts) + ". Floor plan layout."

def match_layout(prompt: str = None,
                 fields: dict = None,
                 topk: int = 5,
                 score_threshold: float = 0.30,
                 expected: str = None,          # e.g., "TOWERA_08A.jpg" (ground-truth filename)
                 show_images: int = 3):
    """
    Returns dict with:
      - prompt, topk [(filename, score)], best (name, score), match (bool), threshold, expected_rank (or None)
    """
    if (prompt is None) == (fields is None):
        raise ValueError("Provide exactly one of: prompt OR fields.")
    if fields is not None:
        prompt = build_prompt_from_fields(fields)

    print("PROMPT:", prompt)
    with torch.no_grad():
        tok  = tokenizer([prompt]).to(DEVICE)
        tvec = model.encode_text(tok)
        tvec = tvec / tvec.norm(dim=-1, keepdim=True)
        sims = (tvec @ img_embeds.T).squeeze(0)         # [M]
        scores, idxs = torch.topk(sims, k=min(topk, sims.shape[0]))

    results = [(names[idxs[j].item()], float(scores[j].item())) for j in range(scores.shape[0])]

    print("\nTop matches:")
    for r, (fn, sc) in enumerate(results, start=1):
        print(f"{r:>2}. {fn:>30} | score={sc:.3f}")

    best_name, best_score = results[0]
    is_match = best_score >= score_threshold
    print(f"\nDecision (@ threshold {score_threshold:.2f}): {'MATCH ✅' if is_match else 'NO MATCH ❌'} "
          f"(best={best_name}, score={best_score:.3f})")

    exp_rank = None
    if expected:
        try:
            exp_rank = next((i+1 for i,(fn,_) in enumerate(results) if fn == expected), None)
        except Exception:
            exp_rank = None
        if exp_rank:
            print(f"Ground truth '{expected}' found at rank {exp_rank} ✓")
        else:
            print(f"Ground truth '{expected}' NOT in top-{topk}.")

    if show_images:
        print("\nPreview:")
        for fn, _ in results[:show_images]:
            path = name_to_path.get(fn)
            if path:
                img = Image.open(path).convert("RGB")
                w, h = img.size; max_w = 700
                if w > max_w:
                    img = img.resize((max_w, int(h*max_w/w)))
                display(img)

    return {
        "prompt": prompt,
        "topk": results,
        "best": (best_name, best_score),
        "match": is_match,
        "threshold": score_threshold,
        "expected_rank": exp_rank
    }

# -------------------------
# EXAMPLES — uncomment one and run:
# 1) Free-text prompt
"""out = match_layout(
     prompt="Siena, Downtown Dubai, Tower A, 80–95 m², Floors 10-20, Burj-facing, "
            "2 bathrooms; closed kitchen; balcony. Floor plan layout.",
     topk=5, score_threshold=0.30, expected="TOWERA_08A.jpg"
)"""

#2) Structured fields (same keys as your CSV)
out = match_layout(
    fields={
        "project": "Siena",
        "community": "Tuscan Residence",
        "tower": "Siena 1",
        "bed": 1,
        "bath": 2,
        "floorarea": 908,
        "orientation": "unknown",
        "features_text": "2 bathrooms; closed kitchen; balcony; storage; laundry"
    },
    topk=5, score_threshold=0.30, expected="Siena.jpg"
)

# Tip: Start with threshold 0.28–0.35 for EVA02/ViT-B-16; adjust after eyeballing a few scores.
