In [1]:
# Setup project path and imports
from pathlib import Path
import sys, os, json
from typing import List, Dict, Any

# Try to locate repo root that contains the 'backend' folder
candidates = [Path.cwd(), Path.cwd().parent, Path.cwd().parent.parent, Path(__file__).resolve().parent.parent if '__file__' in globals() else Path.cwd()]
repo_root = None
for p in candidates:
    if (p / 'backend').exists():
        repo_root = p
        break
if repo_root is None:
    raise RuntimeError("Could not locate project root containing 'backend' directory.")
if str(repo_root) not in sys.path:
    sys.path.insert(0, str(repo_root))

from backend.utils.image_filename_parser import parse_image_filename
from backend.utils.floor_type_normalizer import parse_floor_label
from backend.utils.room_type_normalizer import normalize_room_type

print(f"Repo root set to: {repo_root}")

Repo root set to: c:\Users\ramma\Documents\augment-projects\augment


In [2]:
# Load floorplan JSON analysis
from pathlib import Path

floorplan_json_path = repo_root / 'uploads' / 'floor_plans' / 'analysis' / 'genMid.R2929648_1_4_analysis_gemini-2-5-flash_20251102_111958.json'
with open(floorplan_json_path, 'r', encoding='utf-8') as f:
    floorplan = json.load(f)

# Build a simple room index across floors (with features and dimensions)
room_index: List[Dict[str, Any]] = []
room_by_id: Dict[str, Dict[str, Any]] = {}
for fl in floorplan.get('floors', []):
    floor_meta = {
        'floor_number': fl.get('floor_number'),
        'floor_name': fl.get('floor_name'),
        'floor_type': fl.get('floor_type'),
    }
    for r in fl.get('rooms', []):
        rt = normalize_room_type(r.get('room_type') or '')
        entry = {
            'id': r.get('id'),
            'name': r.get('name'),
            'label_ocr': r.get('label_ocr'),
            'room_type': rt,
            'floor_number': floor_meta['floor_number'],
            'floor_type': floor_meta['floor_type'],
            'floor_name': floor_meta['floor_name'],
            'features': r.get('features') or [],
            'measured_dimensions': r.get('measured_dimensions') or {},
        }
        room_index.append(entry)
        if entry['id']:
            room_by_id[entry['id']] = entry

# Build adjacency map per floor (room_id -> set(neighbor_ids))
adjacency_by_room: Dict[str, set] = {}
for fl in floorplan.get('floors', []):
    for edge in fl.get('adjacency', []) or []:
        a = edge.get('from')
        b = edge.get('to')
        if a and b and a in room_by_id and b in room_by_id:
            adjacency_by_room.setdefault(a, set()).add(b)
            adjacency_by_room.setdefault(b, set()).add(a)

# Enrich rooms with adjacency tokens (neighbor types/names)
for rid, neighbors in adjacency_by_room.items():
    neighbor_names = []
    neighbor_types = []
    for nid in neighbors:
        r = room_by_id.get(nid)
        if not r:
            continue
        if r.get('name'):
            neighbor_names.append(r['name'].lower())
        if r.get('room_type'):
            neighbor_types.append(r['room_type'].lower())
    if rid in room_by_id:
        room_by_id[rid]['adjacent_names'] = neighbor_names
        room_by_id[rid]['adjacent_types'] = neighbor_types

print(f"Loaded floors: {len(floorplan.get('floors', []))}; rooms indexed: {len(room_index)}; adjacency mapped: {len(adjacency_by_room)} rooms")
# Quick peek at a few rooms
room_index[:5]

Loaded floors: 3; rooms indexed: 26


[{'id': 'L_KITCHEN_1',
  'name': 'Kitchen',
  'label_ocr': 'KITCHEN 10\'-4" X 9\'-0"',
  'room_type': 'kitchen',
  'floor_number': 0,
  'floor_type': 'basement',
  'floor_name': 'Lower Floor'},
 {'id': 'L_LIVING_ROOM_1',
  'name': 'Living Room',
  'label_ocr': 'LIVING ROOM 19\'-8" X 14\'-3"',
  'room_type': 'living_room',
  'floor_number': 0,
  'floor_type': 'basement',
  'floor_name': 'Lower Floor'},
 {'id': 'L_DINING_AREA_1',
  'name': 'Dining Area',
  'label_ocr': 'DINING AREA 12\'-6" X 10\'-0"',
  'room_type': 'dining_room',
  'floor_number': 0,
  'floor_type': 'basement',
  'floor_name': 'Lower Floor'},
 {'id': 'L_BEDROOM_1',
  'name': 'Bedroom 1',
  'label_ocr': 'BEDROOM 13\'-7" X 10\'-0"',
  'room_type': 'bedroom',
  'floor_number': 0,
  'floor_type': 'basement',
  'floor_name': 'Lower Floor'},
 {'id': 'L_BATH_1',
  'name': 'Bathroom 1',
  'label_ocr': None,
  'room_type': 'bathroom',
  'floor_number': 0,
  'floor_type': 'basement',
  'floor_name': 'Lower Floor'}]

In [3]:
# List and deterministically sample 5 images from uploads/room_images
images_dir = repo_root / 'uploads' / 'room_images'
all_images = sorted([str(p) for p in images_dir.glob('**/*') if p.suffix.lower() in {'.jpg', '.jpeg', '.png', '.webp', '.bmp'}])

sample_count = 5
sampled_images = all_images[:sample_count]
print(f"Found {len(all_images)} images; using first {len(sampled_images)} for now.")
for p in sampled_images:
    print(' -', Path(p).name)

sampled_images

Found 47 images; using first 5 for now.
 - R2929648_12_3.jpg
 - R2929648_15_4.jpg
 - R2929648_18_4.jpg
 - R2929648_21_4.jpg
 - R2929648_24_4.jpg


['c:\\Users\\ramma\\Documents\\augment-projects\\augment\\uploads\\room_images\\R2929648_12_3.jpg',
 'c:\\Users\\ramma\\Documents\\augment-projects\\augment\\uploads\\room_images\\R2929648_15_4.jpg',
 'c:\\Users\\ramma\\Documents\\augment-projects\\augment\\uploads\\room_images\\R2929648_18_4.jpg',
 'c:\\Users\\ramma\\Documents\\augment-projects\\augment\\uploads\\room_images\\R2929648_21_4.jpg',
 'c:\\Users\\ramma\\Documents\\augment-projects\\augment\\uploads\\room_images\\R2929648_24_4.jpg']

In [4]:
# Gemini setup and helpers
import os, re, json
from typing import Dict, Any, List, Optional
from backend.integrations.gemini.client import GeminiClient
from backend.utils.room_type_normalizer import normalize_room_type
from rapidfuzz import fuzz
from pathlib import Path
import nest_asyncio
nest_asyncio.apply()

# Ensure API key is available
if not (os.getenv("GOOGLE_API_KEY") or os.getenv("GEMINI_API_KEY")):
    raise RuntimeError("Set GOOGLE_API_KEY or GEMINI_API_KEY in your environment to analyze images with Gemini.")

# Create client
gemini = GeminiClient()

ROOM_TYPE_CANONICAL = [
    'bedroom','bathroom','kitchen','living_room','dining_room','hallway','staircase',
    'garage','laundry','office','closet','balcony','outdoor','basement','attic','entry','other'
 ]

# Rich image analysis schema aligned with backend.models.ImageAnalysis and RoomAnalysis
# Note: Bounding boxes should be normalized [x, y, w, h] in 0..1 relative to image size
ANALYSIS_INSTRUCTIONS = (
    "You are analyzing a single indoor house photo. Respond ONLY with strict JSON (no markdown). "
    "Infer details to help map this photo to a floor plan and enrich a digital twin database.\n"
    "Use this exact JSON schema (omit no keys; use null or [] when unknown): {\n"
    "  \"room_type\": string // one of: " + ", ".join(ROOM_TYPE_CANONICAL) + ", choose closest;\n"
    "  \"floor_hint\": { \"type\": string|null, \"number\": int|null } // type in {basement, ground, upper} if visible; number if clear;\n"
    "  \"name_hint\": string // 2-5 words that might appear on a floor plan;\n"
    "  \"tags\": string[] // short descriptors like ['primary','ensuite','double vanity','island','staircase','fireplace'];\n"
    "  \"style\": string|null // modern, traditional, farmhouse, etc.;\n"
    "  \"description\": string|null // one concise sentence;\n"
    "  \"color_palette\": { \"dominant\": string[], \"accents\": string[], \"neutrals\": string[] } // colors as CSS names or hex;\n"
    "  \"dominant_colors\": string[] // top 3-6 colors (#RRGGBB or names);\n"
    "  \"materials_visible\": [ { \"category\": string, \"material_type\": string, \"color\": string|null, \"finish\": string|null } ] // e.g., flooring/tile/wood;\n"
    "  \"fixtures_visible\": [ { \"fixture_type\": string, \"style\": string|null, \"finish\": string|null } ];\n"
    "  \"appliances\": [ { \"type\": string, \"brand\": string|null, \"model\": string|null, \"color\": string|null } ];\n"
    "  \"objects_detected\": [ { \"label\": string, \"category\": string // furniture|appliance|fixture|decor|other, \"confidence\": number|null, \"bbox_norm\": [number,number,number,number]|null } ];\n"
    "  \"spatial_cues\": { \n"
    "     \"approx_dimensions_ft\": { \"length\": number|null, \"width\": number|null, \"height\": number|null },\n"
    "     \"layout_hint\": string|null, // e.g., open, galley, L-shape (kitchen), narrow hall, etc.\n"
    "     \"counts\": { \"windows\": int|null, \"doors\": int|null },\n"
    "     \"adjacency_hints\": string[] // e.g., ['near staircase','adjacent to kitchen','by exterior window']\n"
    "  },\n"
    "  \"view_angle\": string|null // front, corner, low, high, ceiling, detail;\n"
    "  \"estimated_coverage\": number|null // 0..1 percent of room visible;\n"
    "  \"lighting_quality\": string|null // excellent, good, fair, poor;\n"
    "  \"image_quality_score\": number|null // 0..1;\n"
    "  \"clarity\": string|null // sharp, acceptable, blurry;\n"
    "  \"confidence\": number // 0..1 overall confidence in room_type.\n"
    "}\n"
    "Be concise. JSON only. Use nulls when uncertain. Bounding boxes are optional but preferred as normalized [x,y,w,h]."
)

def extract_json(text: str) -> Dict[str, Any]:
    """Robustly extract the first JSON object from a text response."""
    if text is None:
        return {}
    # Remove code fences if present
    text = re.sub(r"^```[a-zA-Z]*\n|\n```$", "", text.strip())
    # Try direct parse
    try:
        return json.loads(text)
    except Exception:
        pass
    # Try to locate a JSON object with braces
    m = re.search(r"\{[\s\S]*\}", text)
    if m:
        try:
            return json.loads(m.group(0))
        except Exception:
            pass
    return {}

# Coerce/normalize Gemini output toward DB-aligned structure
DB_DEFAULT_ANALYSIS: Dict[str, Any] = {
    "room_type": "other",
    "floor_hint": {"type": None, "number": None},
    "name_hint": "",
    "tags": [],
    "style": None,
    "description": None,
    "color_palette": {"dominant": [], "accents": [], "neutrals": []},
    "dominant_colors": [],
    "materials_visible": [],
    "fixtures_visible": [],
    "appliances": [],
    "objects_detected": [],
    "spatial_cues": {
        "approx_dimensions_ft": {"length": None, "width": None, "height": None},
        "layout_hint": None,
        "counts": {"windows": None, "doors": None},
        "adjacency_hints": []
    },
    "view_angle": None,
    "estimated_coverage": None,
    "lighting_quality": None,
    "image_quality_score": None,
    "clarity": None,
    "confidence": 0.0,
}



In [5]:
# Analyze the sampled images with Gemini (sequential, 5 images)
from typing import Tuple
import asyncio

def canonicalize_room_type(rt: Optional[str]) -> str:
    rt = (rt or '').strip().lower()
    if not rt:
        return 'other'
    # map some common variants
    mapping = {
        'living': 'living_room',
        'livingroom': 'living_room',
        'family': 'living_room',
        'washroom': 'bathroom',
        'wc': 'bathroom',
        'toilet': 'bathroom',
        'stairs': 'staircase',
        'entryway': 'entry',
        'foyer': 'entry',
        'utility': 'laundry',
    }
    rt = mapping.get(rt, rt)
    # pass through normalizer to align with floorplan normalization
    norm = normalize_room_type(rt) or rt
    if norm == 'unknown':
        norm = 'other'
    return norm


def with_defaults(d: Dict[str, Any]) -> Dict[str, Any]:
    # Merge deep defaults for known structure
    def merge(a: Any, b: Any) -> Any:
        if isinstance(a, dict) and isinstance(b, dict):
            out = dict(a)
            for k, v in b.items():
                out[k] = merge(a.get(k), v) if k in a else v
            return out
        return b if b is not None else a
    return merge(DB_DEFAULT_ANALYSIS, d or {})

async def analyze_images_with_gemini(paths: List[str]) -> List[Dict[str, Any]]:
    results = []
    for img_path in paths:
        try:
            text = await gemini.analyze_image(img_path, ANALYSIS_INSTRUCTIONS, temperature=0.2)
            parsed = extract_json(text)
            parsed = with_defaults(parsed)
            parsed['room_type'] = canonicalize_room_type(parsed.get('room_type'))
            results.append({
                'image': str(img_path),
                'filename': Path(img_path).name,
                'analysis': parsed,
                'raw': text,
            })
            print(f"Analyzed: {Path(img_path).name} -> {parsed.get('room_type')} (conf={parsed.get('confidence')})")
        except Exception as e:
            print(f"Error analyzing {img_path}: {e}")
    return results

image_analyses = await analyze_images_with_gemini(sampled_images)
len(image_analyses), image_analyses[:1] if image_analyses else []


Analyzed: R2929648_12_3.jpg -> entry (conf=0.9)
Analyzed: R2929648_15_4.jpg -> bedroom (conf=0.85)
Analyzed: R2929648_18_4.jpg -> bathroom (conf=1.0)
Analyzed: R2929648_21_4.jpg -> living_room (conf=0.9)
Analyzed: R2929648_24_4.jpg -> bathroom (conf=0.99)


(5,
 [{'image': 'c:\\Users\\ramma\\Documents\\augment-projects\\augment\\uploads\\room_images\\R2929648_12_3.jpg',
   'filename': 'R2929648_12_3.jpg',
   'analysis': {'room_type': 'entry',
    'floor_hint': {'type': 'ground', 'number': None},
    'name_hint': 'Main Entry Hall with Staircase',
    'tags': ['staircase',
     'entryway',
     'hallway',
     'modern railing',
     'recessed lighting',
     'hardwood floor',
     'open concept'],
    'style': 'modern',
    'description': 'A bright, modern entry hall featuring light wood flooring, a contemporary staircase with a glass and white wood railing, and an open view into an adjacent living area.',
    'color_palette': {'dominant': ['LightGray', 'White', 'BurlyWood'],
     'accents': ['SaddleBrown', 'DarkGreen', 'Silver'],
     'neutrals': ['White', 'LightGray', 'Beige']},
    'dominant_colors': ['#D3D3D3', '#FFFFFF', '#DEB887', '#A9A9A9', '#8B4513'],
    'materials_visible': [{'category': 'flooring',
      'material_type': 'hardwoo

In [None]:
# Load completed image analyses if available (fallback to in-memory)
from pathlib import Path

analyses_path = repo_root / 'uploads' / 'analysis' / 'image_analyses_db_aligned.json'
loaded_image_analyses: List[Dict[str, Any]] = []
if analyses_path.exists():
    with open(analyses_path, 'r', encoding='utf-8') as f:
        loaded_image_analyses = json.load(f)
    print(f"Loaded {len(loaded_image_analyses)} DB-aligned image analyses from {analyses_path}")
else:
    # Fallback: use in-memory results if present
    if 'image_analyses' in globals() and image_analyses:
        # Wrap into DB-aligned shape on the fly
        def _wrap(rec: Dict[str, Any]) -> Dict[str, Any]:
            a = rec.get('analysis', {})
            return {
                'image': rec.get('image'),
                'filename': rec.get('filename'),
                'image_analysis': {
                    'description': a.get('description'),
                    'keywords': a.get('tags') or [],
                    'dominant_colors': a.get('dominant_colors') or [],
                    'objects_detected': a.get('objects_detected') or [],
                    'materials_visible': a.get('materials_visible') or [],
                    'fixtures_visible': a.get('fixtures_visible') or [],
                    'image_quality_score': a.get('image_quality_score'),
                    'lighting_quality': a.get('lighting_quality'),
                    'clarity': a.get('clarity'),
                    'view_angle': a.get('view_angle'),
                    'estimated_coverage': a.get('estimated_coverage'),
                    'confidence_score': a.get('confidence'),
                    'analysis_model': 'gemini-2.5-flash',
                    'analysis_notes': a.get('name_hint') or '',
                },
                'room_analysis': {
                    'room_type_detected': a.get('room_type'),
                    'style': a.get('style'),
                    'color_palette': a.get('color_palette') or {},
                    'materials_detected': a.get('materials_visible') or [],
                    'fixtures_detected': a.get('fixtures_visible') or [],
                    'products_detected': a.get('appliances') or [],
                    'confidence_score': a.get('confidence'),
                    'analysis_notes': a.get('description') or '',
                },
                'extras': {
                    'floor_hint': a.get('floor_hint') or {},
                    'spatial_cues': a.get('spatial_cues') or {},
                    'name_hint': a.get('name_hint') or '',
                }
            }
        loaded_image_analyses = [_wrap(r) for r in image_analyses]
        print(f"Using in-memory image_analyses (wrapped): {len(loaded_image_analyses)}")
    else:
        raise FileNotFoundError("No completed image analyses found. Expected uploads/analysis/image_analyses_db_aligned.json or in-memory variable 'image_analyses'.")

loaded_image_analyses[:2]

In [None]:
# Improved matching: feature- and adjacency-aware scoring with explanations
from typing import Tuple, Set
import math
import re

FEATURE_SYNONYMS = {
    'island': {'kitchen island', 'island'},
    'fireplace': {'fireplace', 'hearth'},
    'double vanity': {'double vanity', 'dual vanity'},
    'pantry': {'pantry'},
    'closet': {'closet', 'walk-in closet', 'walk in closet'},
    'shower': {'shower'},
    'tub': {'tub', 'bathtub', 'bath tub'},
    'stairs': {'stairs', 'staircase'},
    'sink': {'sink'},
    'range': {'range', 'stove', 'cooktop', 'oven'},
    'dishwasher': {'dishwasher'},
    'fridge': {'fridge', 'refrigerator'},
    'dryer': {'dryer'},
    'washer': {'washer', 'washing machine'},
}

FLOOR_TYPE_EQUIV = {
    'basement': {'basement', 'lower'},
    'ground': {'main', 'ground', 'first'},
    'upper': {'upper', 'second', 'third'},
}

def _tokens(s: str) -> Set[str]:
    return set(t for t in re.split(r"[^a-z0-9]+", (s or '').lower()) if t)


def derive_feature_tags(analysis: Dict[str, Any]) -> Set[str]:
    tags = set(_tokens(' '.join(analysis.get('tags') or [])))
    # From appliances
    for ap in analysis.get('appliances') or []:
        tags.update(_tokens(' '.join([ap.get('type') or '', ap.get('brand') or ''])))
    # From fixtures
    for fx in analysis.get('fixtures_visible') or []:
        tags.update(_tokens(' '.join([fx.get('fixture_type') or '', fx.get('style') or ''])))
    # From objects
    for obj in analysis.get('objects_detected') or []:
        tags.update(_tokens(obj.get('label') or ''))
        tags.update(_tokens(obj.get('category') or ''))
    # From description/name_hint
    tags.update(_tokens(analysis.get('description') or ''))
    tags.update(_tokens(analysis.get('name_hint') or ''))
    # Normalize via synonyms
    norm = set()
    for feat, syns in FEATURE_SYNONYMS.items():
        if any(s in tags for s in syns):
            norm.add(feat)
    return tags | norm


def size_similarity(analysis_dims: Dict[str, Any], room_dims: Dict[str, Any]) -> float:
    try:
        al, aw = analysis_dims.get('length'), analysis_dims.get('width')
        rl = room_dims.get('length_ft') or room_dims.get('length')
        rw = room_dims.get('width_ft') or room_dims.get('width')
        if not all([al, aw, rl, rw]):
            return 0.0
        # Compare area ratio within tolerance
        aa = float(al) * float(aw)
        ra = float(rl) * float(rw)
        ratio = aa / ra if ra > 0 else 0
        return max(0.0, 1.0 - abs(math.log(max(ratio, 1e-6))))  # ~1 when close, decays as ratio deviates
    except Exception:
        return 0.0


def score_candidate(analysis: Dict[str, Any], room: Dict[str, Any]) -> Tuple[float, Dict[str, float]]:
    breakdown = {}
    score = 0.0

    # 1) Room type
    if analysis.get('room_type') and room.get('room_type'):
        if analysis['room_type'] == room['room_type']:
            breakdown['type_exact'] = 4.0; score += 4.0
        elif analysis['room_type'] in (room['room_type'] or ''):
            breakdown['type_partial'] = 2.0; score += 2.0

    # 2) Floor hints
    fh = (analysis.get('floor_hint') or {})
    if fh.get('number') is not None and room.get('floor_number') is not None:
        if int(fh['number']) == int(room['floor_number']):
            breakdown['floor_number'] = 2.0; score += 2.0
    ft_hint = (fh.get('type') or '').lower()
    if ft_hint and room.get('floor_type'):
        for group, equiv in FLOOR_TYPE_EQUIV.items():
            if ft_hint in equiv and (room['floor_type'] or '').lower() in equiv:
                breakdown['floor_type'] = breakdown.get('floor_type', 0.0) + 1.0; score += 1.0
                break

    # 3) Features overlap (image tags vs room features)
    itags = derive_feature_tags(analysis)
    rfeats = set(_tokens(' '.join(room.get('features') or [])))
    feat_overlap = len(itags & rfeats)
    if feat_overlap:
        pts = min(3.0, feat_overlap * 1.0)
        breakdown['features_overlap'] = pts; score += pts

    # 4) Name/hint fuzzy vs label/name
    label = (room.get('label_ocr') or room.get('name') or '').lower()
    needle = ' '.join([analysis.get('name_hint') or ''] + list(analysis.get('tags') or []))
    if needle.strip() and label:
        fr = (fuzz.partial_ratio(needle, label) / 100.0) * 2.0  # up to +2
        if fr > 0:
            breakdown['name_fuzzy'] = round(fr, 3); score += fr

    # 5) Primary/master hint
    hint_tokens = _tokens(analysis.get('name_hint') or '') | _tokens(' '.join(analysis.get('tags') or []))
    if any(t in hint_tokens for t in {'primary','master'}):
        nm = (room.get('name') or '').lower()
        if 'primary' in nm or 'master' in nm:
            breakdown['primary_name'] = 1.5; score += 1.5

    # 6) Adjacency hints (near staircase, adjacent to kitchen, etc.)
    adj_hints = set((analysis.get('spatial_cues') or {}).get('adjacency_hints') or [])
    adj_types = set(room.get('adjacent_types') or [])
    # staircase hint
    if any('stair' in h.lower() for h in adj_hints):
        if 'staircase' in adj_types or 'stairs' in adj_types:
            breakdown['adjacent_stairs'] = 1.0; score += 1.0
    # kitchen adjacency
    if any('kitchen' in h.lower() for h in adj_hints):
        if 'kitchen' in adj_types:
            breakdown['adjacent_kitchen'] = 1.0; score += 1.0

    # 7) Size similarity (if analysis provided approx dims and room has measured)
    sim = size_similarity(((analysis.get('spatial_cues') or {}).get('approx_dimensions_ft') or {}), room.get('measured_dimensions') or {})
    if sim > 0:
        add = round(2.0 * sim, 3)  # up to +2
        breakdown['size_similarity'] = add; score += add

    return round(score, 3), breakdown


def rank_candidates_v2(analysis: Dict[str, Any], rooms: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
    ranked = []
    for r in rooms:
        s, br = score_candidate(analysis, r)
        if s > 0:
            ranked.append({
                'room_id': r['id'],
                'room_name': r.get('name') or '',
                'room_type': r.get('room_type') or '',
                'floor_number': r.get('floor_number'),
                'floor_type': r.get('floor_type') or '',
                'score': round(s, 3),
                'breakdown': br,
            })
    ranked.sort(key=lambda x: x['score'], reverse=True)
    return ranked

In [None]:
# Build enhanced image-room links using improved ranking and save
links_v2: List[Dict[str, Any]] = []
for rec in loaded_image_analyses:
    # Prefer room_type detected in image for analysis input to ranker
    analysis = {
        'room_type': (rec.get('room_analysis') or {}).get('room_type_detected') or 'other',
        'floor_hint': (rec.get('extras') or {}).get('floor_hint') or {},
        'name_hint': (rec.get('extras') or {}).get('name_hint') or '',
        'tags': (rec.get('image_analysis') or {}).get('keywords') or [],
        'style': (rec.get('room_analysis') or {}).get('style'),
        'description': (rec.get('image_analysis') or {}).get('description'),
        'color_palette': (rec.get('room_analysis') or {}).get('color_palette') or {},
        'dominant_colors': (rec.get('image_analysis') or {}).get('dominant_colors') or [],
        'materials_visible': (rec.get('image_analysis') or {}).get('materials_visible') or [],
        'fixtures_visible': (rec.get('image_analysis') or {}).get('fixtures_visible') or [],
        'appliances': (rec.get('room_analysis') or {}).get('products_detected') or [],
        'objects_detected': (rec.get('image_analysis') or {}).get('objects_detected') or [],
        'spatial_cues': (rec.get('extras') or {}).get('spatial_cues') or {},
        'view_angle': (rec.get('image_analysis') or {}).get('view_angle'),
        'estimated_coverage': (rec.get('image_analysis') or {}).get('estimated_coverage'),
        'lighting_quality': (rec.get('image_analysis') or {}).get('lighting_quality'),
        'image_quality_score': (rec.get('image_analysis') or {}).get('image_quality_score'),
        'clarity': (rec.get('image_analysis') or {}).get('clarity'),
        'confidence': (rec.get('image_analysis') or {}).get('confidence_score') or 0.0,
    }

    ranked = rank_candidates_v2(analysis, list(room_by_id.values()))
    top3 = ranked[:3]
    links_v2.append({
        'image': rec.get('image'),
        'filename': rec.get('filename'),
        'analysis': analysis,
        'candidates': top3,
    })

output_path_v2 = repo_root / 'uploads' / 'analysis' / 'image_room_links_v2.json'
output_path_v2.parent.mkdir(parents=True, exist_ok=True)
with open(output_path_v2, 'w', encoding='utf-8') as f:
    json.dump(links_v2, f, indent=2)

print(f"Saved {len(links_v2)} enhanced image-room links with breakdowns to: {output_path_v2}")
links_v2[:2]

In [None]:
# Transform Gemini analysis to DB-aligned records and save
from copy import deepcopy


def to_db_aligned(rec: Dict[str, Any]) -> Dict[str, Any]:
    a = rec.get('analysis', {})
    # ImageAnalysis mapping
    image_analysis = {
        'description': a.get('description') or None,
        'keywords': a.get('tags') or [],
        'dominant_colors': a.get('dominant_colors') or (a.get('color_palette', {}).get('dominant') or []),
        'objects_detected': a.get('objects_detected') or [],
        'materials_visible': a.get('materials_visible') or [],
        'fixtures_visible': a.get('fixtures_visible') or [],
        'image_quality_score': a.get('image_quality_score'),
        'lighting_quality': a.get('lighting_quality'),
        'clarity': a.get('clarity'),
        'view_angle': a.get('view_angle'),
        'estimated_coverage': a.get('estimated_coverage'),
        'confidence_score': a.get('confidence'),
        'analysis_model': 'gemini-2.5-flash',
        'analysis_notes': a.get('name_hint') or '',
    }
    # RoomAnalysis mapping (partial, inferred from image)
    room_analysis = {
        'room_type_detected': a.get('room_type'),
        'style': a.get('style'),
        'color_palette': a.get('color_palette') or {},
        'overall_condition': None,
        'condition_score': None,
        'condition_notes': None,
        'materials_detected': a.get('materials_visible') or [],
        'fixtures_detected': a.get('fixtures_visible') or [],
        'products_detected': a.get('appliances') or [],
        'improvement_suggestions': [],
        'estimated_renovation_priority': None,
        'confidence_score': a.get('confidence'),
        'analysis_notes': a.get('description') or '',
    }
    extras = {
        'floor_hint': a.get('floor_hint') or {},
        'spatial_cues': a.get('spatial_cues') or {},
        'name_hint': a.get('name_hint') or '',
    }
    return {
        'image': rec.get('image'),
        'filename': rec.get('filename'),
        'image_analysis': image_analysis,
        'room_analysis': room_analysis,
        'extras': extras,
    }


_db_aligned = [to_db_aligned(r) for r in image_analyses]
output_db_path = repo_root / 'uploads' / 'analysis' / 'image_analyses_db_aligned.json'
output_db_path.parent.mkdir(parents=True, exist_ok=True)
with open(output_db_path, 'w', encoding='utf-8') as f:
    json.dump(_db_aligned, f, indent=2)

print(f"Saved DB-aligned image analyses to: {output_db_path}")
_db_aligned[:2]


In [None]:
# Rank floorplan rooms based on Gemini analysis
from dataclasses import dataclass

@dataclass
class MatchCandidate:
    room_id: str
    room_name: str
    room_type: str
    floor_number: int
    floor_type: str
    score: float

def name_tokens(s: str):
    return [t for t in re.split(r"[^a-zA-Z0-9]+", (s or '').lower()) if t]

def rank_candidates_from_analysis(analysis: Dict[str, Any], rooms: List[Dict[str, Any]]) -> List[MatchCandidate]:
    rtype = analysis.get('room_type') or 'other'
    fh = (analysis.get('floor_hint') or {})
    floor_type_hint = (fh.get('type') or '').lower()  # basement|ground|upper
    floor_number_hint = fh.get('number')
    hint = (analysis.get('name_hint') or '').lower()
    hint_tokens = set(name_tokens(hint))
    tag_tokens = set(name_tokens(' '.join(analysis.get('tags') or [])))

    ranked: List[MatchCandidate] = []
    for r in rooms:
        score = 0.0
        # strong match on room type
        if rtype and r['room_type'] == rtype:
            score += 3.0
        elif rtype and (rtype in (r['room_type'] or '')):
            score += 1.0

        # floor number match (if provided)
        if floor_number_hint is not None and r.get('floor_number') is not None:
            if int(r['floor_number']) == int(floor_number_hint):
                score += 2.0

        # floor type hint vs floor_type from floorplan
        if floor_type_hint and r.get('floor_type'):
            ft = r['floor_type']
            if floor_type_hint == 'basement' and ft in ('basement', 'lower'):
                score += 1.0
            elif floor_type_hint == 'ground' and ft in ('main', 'ground', 'first'):
                score += 1.0
            elif floor_type_hint == 'upper' and ft in ('upper', 'second', 'third'):
                score += 1.0

        # primary/master hint
        if any(tok in tag_tokens or tok in hint_tokens for tok in ['primary','master']):
            if 'primary' in (r.get('name') or '').lower() or 'master' in (r.get('name') or '').lower():
                score += 2.0

        # fuzzy overlap with label_ocr or name using name_hint and tags
        label = (r.get('label_ocr') or r.get('name') or '').lower()
        hay = label
        needle = ' '.join([hint] + list(tag_tokens))
        if needle.strip() and hay:
            score += (fuzz.partial_ratio(needle, hay) / 100.0) * 2.0  # up to +2

        if score > 0:
            ranked.append(MatchCandidate(
                room_id=r['id'],
                room_name=r.get('name') or '',
                room_type=r.get('room_type') or '',
                floor_number=r.get('floor_number'),
                floor_type=r.get('floor_type') or '',
                score=round(score, 3),
            ))

    ranked.sort(key=lambda x: x.score, reverse=True)
    return ranked

In [None]:
# Build image-room links from analysis and save
links: List[Dict[str, Any]] = []
for rec in image_analyses:
    ranked = rank_candidates_from_analysis(rec.get('analysis', {}), room_index)
    top3 = [c.__dict__ for c in ranked[:3]]
    links.append({
        'image': rec['image'],
        'filename': rec['filename'],
        'analysis': rec['analysis'],
        'candidates': top3,
    })

output_path = repo_root / 'uploads' / 'analysis' / 'image_room_links.json'
output_path.parent.mkdir(parents=True, exist_ok=True)
with open(output_path, 'w', encoding='utf-8') as f:
    json.dump(links, f, indent=2)

print(f"Saved {len(links)} image-room links to: {output_path}")
links[:2]