##

In [None]:
import os
import sys
import base64
import json
import time
import re
from typing import List, Dict, Optional
from openai import OpenAI

from groundedvision.config import RAW_DATA_DIR, PROCESSED_DATA_DIR
from tqdm import tqdm

from loguru import logger

logger.remove()  # Remove default handler
logger.add(sys.stderr, level="DEBUG", format=" {level} | {message}", colorize=True)

class ConstructionProgressAnalyzer:
    def __init__(
        self,
        api_key: Optional[str] = None,
        model: str = "qwen3-vl-plus",  # Options: "qwen3-vl-plus", "qwen-vl-max", "qwen2-vl-72b-instruct"
        base_url: str = "https://dashscope-us.aliyuncs.com/compatible-mode/v1"
    ):
        self.api_key = api_key or os.environ.get("DASHSCOPE_API_KEY")
        if not self.api_key:
            raise ValueError("API key must be provided via parameter or DASHSCOPE_API_KEY environment variable")
        
        self.client = OpenAI(
            api_key=self.api_key,
            base_url=base_url.strip()  # Remove accidental trailing spaces
        )
        self.model = model
        self.prompt = CONSTRUCTION_PROGRESS_PROMPT
        self.face_order = ["front","right", "back", "left",  "top", "bottom"]
        self.max_retries = 3
    
    def encode_image(self, image_path: str) -> str:
        """Encode image to base64 data URL"""
        try:
            with open(image_path, "rb") as f:
                encoded = base64.b64encode(f.read()).decode("utf-8")
            return f"data:image/jpeg;base64,{encoded}"
        except Exception as e:
            raise ValueError(f"Failed to encode image {image_path}: {str(e)}")
    
    def _clean_json_response(self, response_text: str) -> str:
        """Extract clean JSON from possible markdown wrappers or extra text"""
        # Remove markdown code block markers
        response_text = re.sub(r'^```json\s*', '', response_text)
        response_text = re.sub(r'\s*```$', '', response_text)
        
        # Find first { and last } to extract JSON object
        start = response_text.find('{')
        end = response_text.rfind('}') + 1
        
        if start == -1 or end == 0:
            raise ValueError(f"Could not extract JSON from response: {response_text[:200]}...")
        
        return response_text[start:end]
    
    def analyze_face_pair(
        self,
        img_a_path: str,
        img_b_path: str,
        face_id: str,
        timeout: int = 120
    ) -> Dict:
        """Analyze a single cubemap face pair using OpenAI-compatible API"""
        try:
            # Encode images
            img_a_b64 = self.encode_image(img_a_path)
            img_b_b64 = self.encode_image(img_b_path)
            
            # Build messages in OpenAI format
            messages = [
                {
                    "role": "system",
                    "content": self.prompt
                },
                {
                    "role": "user",
                    "content": [
                        {
                            "type": "text",
                            "text": f"FACE ID: {face_id}\n\nAnalyze the structural changes between these two images of the same construction site location taken at different times. Output STRICT VALID JSON ONLY."
                        },
                        {
                            "type": "text",
                            "text": "Image A (Earlier Time)"
                        },
                        {
                            "type": "image_url",
                            "image_url": {
                                "url": img_a_b64,
                                "detail": "high"  # Request high detail for structural analysis
                            }
                        },
                        {
                            "type": "text",
                            "text": "Image B (Later Time)"
                        },
                        {
                            "type": "image_url",
                            "image_url": {
                                "url": img_b_b64,
                                "detail": "high"
                            }
                        }
                    ]
                }
            ]
            
            # Retry logic with exponential backoff
            for attempt in range(self.max_retries):
                try:
                    response = self.client.chat.completions.create(
                        model=self.model,
                        messages=messages,
                        temperature=0.01,  # Minimize creativity
                        max_tokens=1500,
                        timeout=timeout
                    )
                    
                    raw_content = response.choices[0].message.content.strip()
                    logger.debug(f"Raw API response for {face_id}: {raw_content[:300]}...")
                    
                    # Clean and parse JSON
                    clean_json = self._clean_json_response(raw_content)
                    result = json.loads(clean_json)
                    
                    # Validate required fields
                    required_fields = ["structural_change_detected", "confidence_level", "change_description"]
                    if not all(field in result for field in required_fields):
                        raise ValueError(f"Missing required fields in JSON: {required_fields}")
                    
                    # Enrich with metadata
                    result["face_id"] = face_id
                    result["analysis_timestamp"] = time.time()
                    result["model_used"] = self.model
                    
                    logger.info(f"✓ Successfully analyzed {face_id} face (confidence: {result.get('confidence_level', 'N/A')})")
                    return result
                    
                except (json.JSONDecodeError, ValueError) as e:
                    logger.warning(f"Attempt {attempt+1}/{self.max_retries} failed for {face_id}: {str(e)}")
                    if attempt == self.max_retries - 1:
                        return self._error_result(face_id, f"JSON parsing failed after {self.max_retries} attempts: {str(e)}")
                    time.sleep(1.5 ** attempt)
                
                except Exception as e:
                    logger.warning(f"API error on attempt {attempt+1} for {face_id}: {str(e)}")
                    if attempt == self.max_retries - 1:
                        return self._error_result(face_id, f"API failed after {self.max_retries} attempts: {str(e)}")
                    time.sleep(2 ** attempt)
            
        except Exception as e:
            return self._error_result(face_id, f"Processing error: {str(e)}")
    
    def _error_result(self, face_id: str, error_msg: str) -> Dict:
        """Generate standardized error result"""
        logger.error(f"Analysis failed for {face_id}: {error_msg}")
        return {
            "face_id": face_id,
            "error": error_msg,
            "structural_change_detected": False,
            "confidence_level": "Low",
            "change_description": f"Analysis failed: {error_msg}",
            "registration_quality": "Low",
            "transient_objects_ignored": [],
            "specific_changes": [],
            "potential_artifacts": [error_msg]
        }
    
    def consolidate_results(self, face_results: List[Dict]) -> Dict:
        """Aggregate face-level results into site-wide progress assessment"""
        # Filter valid results
        valid_results = [r for r in face_results if "error" not in r]
        errored_faces = [r["face_id"] for r in face_results if "error" in r]
        logger.info(f"Valid results: {len(valid_results)}, Errored faces: {len(errored_faces)}")
        # Collect high-confidence changes only
        high_conf_changes = []
        for res in valid_results:
            logger.info(f"Processing face {res['face_id']}, results {res}")
            if (res.get("structural_change_detected") and 
                res.get("confidence_level") == "High" and
                res.get("specific_changes")):
                for change in res["specific_changes"]:
                    logger.info(f"Processing change for {res['face_id']}: {change}")
                    if change.get("confidence") == "High":
                        high_conf_changes.append({
                            **change,
                            "detected_in_face": res["face_id"],
                            "face_registration_quality": res.get("registration_quality", "Medium")
                        })
        
        logger.info(f"High confidence changes: {len(high_conf_changes)}")
        # Apply cross-face verification (critical for eliminating artifacts)
        verified_changes = self._verify_cross_face_consistency(high_conf_changes, valid_results)
        
        # Determine progress direction
        if not verified_changes:
            direction = "No Progress Detected"
        elif all(c["change_type"] == "Removed" for c in verified_changes):
            direction = "Demolition/Removal"
        elif all(c["change_type"] in ["Added", "Modified"] for c in verified_changes):
            direction = "Construction Added"
        else:
            direction = "Mixed Progress"
        
        # Build consolidated description
        change_description = self._generate_consolidated_description(verified_changes, valid_results)
        
        # Calculate overall confidence (conservative approach)
        overall_confidence = "High" if len(verified_changes) >= 2 else (
            "Medium" if len(verified_changes) == 1 else "High"  # High confidence in "no change" when all faces agree
        )
        
        # Assess registration quality across all faces
        registration_quality = self._assess_registration_quality(valid_results)
        
        return {
            "analysis_metadata": {
                "timestamp": time.time(),
                "model_used": self.model,
                "total_faces_analyzed": len(face_results),
                "successful_analyses": len(valid_results),
                "errored_faces": errored_faces,
                "face_order": self.face_order
            },
            "registration_quality_assessment": registration_quality,
            "overall_confidence_level": overall_confidence,
            "structural_change_detected": len(verified_changes) > 0,
            "direction_of_progress": direction,
            "change_description": change_description,
            "consolidated_changes": verified_changes,
            "transient_objects_summary": self._aggregate_list_field(valid_results, "transient_objects_ignored"),
            "potential_artifacts_requiring_verification": self._aggregate_list_field(valid_results, "potential_artifacts")[:5],
            "face_level_results": face_results,  # For auditability
            "verification_protocol_applied": [
                "Cross-face geometric consistency check",
                ">90% confidence threshold enforced per face",
                "Transient objects explicitly filtered",
                "Registration quality assessment across all faces",
                "Construction sequence logic validation"
            ]
        }
    
    def _verify_cross_face_consistency(self, changes: List[Dict], face_results: List[Dict]) -> List[Dict]:
        """
        Apply spatial reasoning to eliminate artifacts:
        - Wall changes must appear in geometrically adjacent faces OR have unambiguous texture evidence
        - Ceiling/floor changes must primarily appear in top/bottom faces
        """
        if not changes:
            return []
        
        # Cubemap adjacency map (which faces share edges)
        adjacency_map = {
            "front": ["left", "right", "top", "bottom"],
            "back": ["left", "right", "top", "bottom"],
            "left": ["front", "back", "top", "bottom"],
            "right": ["front", "back", "top", "bottom"],
            "top": ["front", "back", "left", "right"],
            "bottom": ["front", "back", "left", "right"]
        }
        
        verified = []
        for change in changes:
            element_type = change["element_type"]
            detected_face = change["detected_in_face"]
            logger.info(f"Verifying change: {change} detected face {detected_face}")
            # Strong evidence markers that can override corroboration requirement
            # Get evidence text - handle both old and new formats
            evidence_field = change.get("supporting_evidence", "")
            if not evidence_field:
                # New format uses texture_evidence as a list
                texture_list = change.get("texture_evidence", [])
                evidence_field = " ".join(texture_list) if isinstance(texture_list, list) else str(texture_list)

            strong_evidence = any(kw in evidence_field.lower() 
                                for kw in ["seam", "fastener", "screw", "joint", "grid", "conduit run"])

            logger.info(f"Strong evidence: {strong_evidence}")
            # Apply verification rules
            if element_type == "Wall":
                # Wall changes should appear in adjacent faces OR have strong texture evidence
                adjacent_faces = adjacency_map.get(detected_face, [])
                corroborating_changes = [
                    c for c in changes 
                    if c["element_type"] == "Wall" 
                    and c["detected_in_face"] in adjacent_faces
                    and abs(self._face_distance(detected_face, c["detected_in_face"])) <= 1
                ]
                
                if not corroborating_changes and not strong_evidence:
                    logger.warning(f"Wall change in {detected_face} lacks corroboration - treating as artifact")
                    continue
            
            elif element_type == "Ceiling" and detected_face != "top":
                # Ceiling changes should primarily be in top face
                top_face_has_change = any(
                    c["element_type"] == "Ceiling" and c["detected_in_face"] == "top"
                    for c in changes
                )
                if not top_face_has_change and not strong_evidence:
                    logger.warning(f"Ceiling change detected outside top face without corroboration - treating as artifact")
                    continue
            
            elif element_type == "Floor" and detected_face != "bottom":
                # Floor changes should primarily be in bottom face
                bottom_face_has_change = any(
                    c["element_type"] == "Floor" and c["detected_in_face"] == "bottom"
                    for c in changes
                )
                if not bottom_face_has_change and not strong_evidence:
                    logger.warning(f"Floor change detected outside bottom face without corroboration - treating as artifact")
                    continue
            
            # Passed verification - add to verified changes
            verified.append(change)
        
        return verified
    
    def _face_distance(self, face1: str, face2: str) -> int:
        """Calculate geometric distance between cube faces (0=same, 1=adjacent, 2=opposite)"""
        opposites = {"front": "back", "back": "front", "left": "right", "right": "left", "top": "bottom", "bottom": "top"}
        if face1 == face2:
            return 0
        elif opposites.get(face1) == face2:
            return 2
        else:
            return 1
    
    def _assess_registration_quality(self, results: List[Dict]) -> str:
        """Assess overall registration quality based on face analyses"""
        if not results:
            return "Low"
        
        quality_counts = {"High": 0, "Medium": 0, "Low": 0}
        for r in results:
            quality = r.get("registration_quality", "Medium")
            if quality in quality_counts:
                quality_counts[quality] += 1
        
        total = len(results)
        if quality_counts["High"] / total >= 0.7:
            return "High"
        elif quality_counts["Low"] / total >= 0.5:
            return "Low"
        else:
            return "Medium"
    
    def _generate_consolidated_description(self, changes: List[Dict], results: List[Dict]) -> str:
        if not changes:
            # Report verified unchanged elements
            verified_elements = set()
            for r in results:
                if "Verified:" in r.get("change_description", ""):
                    parts = r["change_description"].split("Verified:")[1].strip()
                    elements = [e.strip() for e in parts.split(",") if e.strip()]
                    verified_elements.update(elements)
            
            if verified_elements:
                elements_str = ", ".join(sorted(verified_elements))
                return f"No permanent structural changes detected. Verified unchanged across site: {elements_str}"
            else:
                return "No permanent structural changes detected. All visible structural elements remain unchanged."
        
        # Build change descriptions
        descriptions = []
        for change in changes:
            loc = f"{change['location']} ({change['detected_in_face']} face)"
            # Get evidence from either old or new field format
            evidence = change.get('supporting_evidence', '')
            if not evidence:
                texture_list = change.get('texture_evidence', [])
                evidence = ', '.join(texture_list) if isinstance(texture_list, list) else str(texture_list)
            desc = f"{change['element_type']} {change['change_type'].lower()}: {loc} [{evidence}]"
            descriptions.append(desc)
        
        return " | ".join(descriptions)
    
    def _aggregate_list_field(self, results: List[Dict], field_name: str) -> List[str]:
        """Deduplicate and aggregate list fields from multiple results"""
        items = []
        for r in results:
            items.extend(r.get(field_name, []))
        # Deduplicate while preserving order
        seen = set()
        unique_items = []
        for item in items:
            if item not in seen:
                seen.add(item)
                unique_items.append(item)
        return unique_items
    
    def analyze_site_progress(
        self,
        set_a_paths: List[str],
        set_b_paths: List[str],
        face_labels: Optional[List[str]] = None
    ) -> Dict:
        """
        Main entry point: Analyze full cubemap sets and return consolidated progress report
        
        Args:
            set_a_paths: List of 6 image paths (Time A) in order: [front, right, back, left, top, bottom]
            set_b_paths: List of 6 image paths (Time B) in same order
            face_labels: Optional custom labels for faces (defaults to standard cubemap order)
        
        Returns:
            Consolidated JSON report of construction progress
        """
        if len(set_a_paths) != 6 or len(set_b_paths) != 6:
            raise ValueError(
                "Exactly 6 images required per set (cube faces in order: front, back, left, right, top, bottom). "
                f"Received {len(set_a_paths)} for set A and {len(set_b_paths)} for set B."
            )
        
        face_ids = face_labels if face_labels else self.face_order
        
        # Analyze each face pair sequentially
        face_results = []
        for i, face_id in enumerate(face_ids):
            logger.info(f"Analyzing {face_id} face ({i+1}/6)...")
            result = self.analyze_face_pair(set_a_paths[i], set_b_paths[i], face_id)
            face_results.append(result)
            time.sleep(0.3)  # Small delay to avoid rate limits
        
        # Consolidate into site-wide assessment
        consolidated = self.consolidate_results(face_results)
        return consolidated

In [None]:
CONSTRUCTION_PROGRESS_PROMPT = """ROLE: You are a Construction Progress Monitoring AI. Detect PERMANENT structural changes between two construction site images. Balance precision with recall - catch real changes while avoiding false positives.

INPUT CONTEXT:
- You are analyzing ONE PAIR of cubemap faces from the same viewing direction at different times.
- The FACE ID will be provided in the user message.
- Images come from 360° cameras with IMPERFECT registration (expect ≤5° misalignment, perspective shifts, lighting differences).
- THIS IS A SINGLE FACE ANALYSIS. Do not assume knowledge from other faces.

VERIFICATION PROTOCOL:

Step 0 - DESCRIBE BEFORE COMPARE (MANDATORY):
Describe each image independently BEFORE looking for differences:
  • Image A: "I see [structural elements] at [locations]..."
  • Image B: "I see [structural elements] at [locations]..."

Step 1 - IDENTIFY CANDIDATE CHANGES:
Compare your descriptions. A change exists ONLY if:
  - An element is ABSENT in Image A AND PRESENT in Image B (or vice versa)
  - You can identify the EXACT region where the difference occurs

Step 2 - ELIMINATE ARTIFACTS:
For each candidate change, ask: "Could this be explained by camera angle, lighting, shadows, or reflections?"
  • Same surface appearing brighter/darker = lighting, NOT structural
  • Same element at different angle = perspective, NOT structural
  • Reflections on metal/glass = artifact, NOT structural
  → If YES = discard. Only proceed if definitively NO.

Step 3 - VERIFY WITH ELEMENT-SPECIFIC MARKERS:
Each element type has specific visual markers. Identify at least ONE marker from the appropriate category:

WALLS & PARTITIONS:
  • Drywall/Gypsum: Panel seams, screw patterns, tape joints, uniform matte finish
  • Metal studs: C-channel profile, consistent vertical spacing, track connections
  • Concrete block: Mortar lines, block pattern, grey uniform texture

MEP SYSTEMS (Mechanical, Electrical, Plumbing):
  • HVAC Ductwork: Rectangular/round shapes, foil insulation wrap, support hangers, flex connections
  • Fire suppression: Copper/red pipes, T-joints, sprinkler heads, ceiling brackets
  • Electrical conduit: Metal/PVC tubing, junction boxes, support clips, corner bends
  • Plumbing: PVC/copper pipes, elbows, valves, floor/wall penetrations

CEILING SYSTEMS:
  • Drop ceiling grid: T-bar intersections, consistent pattern, hanging wires
  • Insulation: Batts between joists, foil-faced products, vapor barriers
  • Exposed structure: Deck, joists, beams becoming visible/covered

FLOOR SYSTEMS:
  • Concrete: Pour lines, control joints, finish differences, form marks
  • Coatings: Epoxy sheen, color changes, edge lines

SAFETY & ACCESS:
  • Guardrails/Handrails: Metal posts at regular intervals, horizontal rails, floor brackets
  • Stairs: Treads, risers, stringers, landing platforms
  • Temporary barriers: Plywood, caution tape, signage

Step 4 - CONSTRUCTION LOGIC CHECK:
Does this change make sense in construction sequence?
  • New elements appear with proper supports/connections
  • Changes are consistent across the visible area
  • Logical progression (framing → MEP rough-in → insulation → drywall)

STRICT EXCLUSIONS (NEVER REPORT):
✗ Equipment movement (lifts, carts, scaffolding positions)
✗ Worker positions, tools, personal items
✗ Loose materials, debris, temporary storage
✗ Lighting/shadow differences
✗ Camera angle/perspective variations
✗ Reflections, glare, motion blur, compression artifacts

DECISION THRESHOLD:
• structural_change_detected = true if:
  1. Element is clearly absent in one image and present in the other
  2. At least ONE element-specific marker is identified
  3. Change passes artifact elimination (Step 2)
  4. Change makes construction sequence sense

• When genuinely uncertain → default to FALSE with explanation
• Report real changes; avoid false positives

OUTPUT FORMAT (STRICT JSON ONLY - no markdown):
{
  "face_id": "<face name from user message>",
  "image_a_description": "Brief structural element inventory of Image A",
  "image_b_description": "Brief structural element inventory of Image B", 
  "registration_quality": "High|Medium|Low",
  "confidence_level": "High|Medium|Low",
  "transient_objects_ignored": ["list of transient objects seen but ignored"],
  "structural_change_detected": true|false,
  "change_description": "Technical summary OR 'No structural changes detected. Both images show: [elements]'",
  "specific_changes": [
    {
      "element_type": "Wall|Ceiling|MEP|Floor|Safety|Other",
      "element_subtype": "e.g., 'drywall', 'guardrail', 'copper pipe'",
      "location": "specific location in image",
      "change_type": "Added|Removed|Modified",
      "confidence": "High|Medium|Low",
      "absence_in_image_a": "What was at this location in Image A",
      "presence_in_image_b": "What is now at this location in Image B",
      "visual_markers": ["specific markers observed, e.g., 'metal posts at 4ft spacing', 'horizontal rail']"
    }
  ],
  "potential_artifacts": ["lighting/angle differences noted but NOT reported as changes"]
}"""


In [None]:
import os

os.environ['DASHSCOPE_API_KEY'] = 'sk-1234567890' # Your DashScope API Key
# Initialize analyzer
analyzer = ConstructionProgressAnalyzer(
    api_key=os.environ.get("DASHSCOPE_API_KEY"),
    model="qwen3-vl-plus"  # Use "qwen-vl-max" for highest accuracy if budget allows
)

In [None]:
path = f'{PROCESSED_DATA_DIR}/matched_pairs_json'
list_of_aligned_directories = [d for d in os.listdir(path) if os.path.isdir(os.path.join(path, d))]


In [None]:
import os
import json

from pathlib import Path
from concurrent.futures import ThreadPoolExecutor, as_completed

def process_folder(folder_path, analyzer, PROCESSED_DATA_DIR, logger):
    """Process a single folder - this runs in parallel."""
    input_folder_path = f"{PROCESSED_DATA_DIR}/matched_pairs_json/{folder_path}/cubemap_cross/"
    final_output_path = f"{PROCESSED_DATA_DIR}/matched_pairs_json/{folder_path}/construction_progress_report_qwen3_vl_plus.json"

    if os.path.exists(final_output_path):
        with open(final_output_path, 'r') as f:
            report = json.load(f)
        return {"folder": folder_path, "status": "success", "report": report}
    
    new_panorama_frame = None
    old_panorama_frame = None
    
    for image_path in os.listdir(input_folder_path):
        full_path = os.path.join(input_folder_path, image_path)
        path_full_path = Path(full_path)
        
        if "cubemap_cross_collage.jpg" in full_path:
            logger.info(f"Processing {path_full_path.stem}")
            filename = path_full_path.stem
            if filename.startswith("new_"):
                stripped = filename[4:]
                new_panorama_frame = stripped.split("_aligned")[0]
            if filename.startswith("old_"):
                stripped = filename[4:]
                old_panorama_frame = stripped.split("_aligned")[0]
    new_faces_paths = []
    old_faces_paths = []
    for i in range(0, 6):
        new_face_name = os.path.join(input_folder_path, f"new_{new_panorama_frame}_aligned_{i}.jpg")
        old_face_name = os.path.join(input_folder_path, f"old_{old_panorama_frame}_aligned_{i}.jpg")
        if not os.path.exists(new_face_name):
            logger.error(f"The file doesnt exist: {new_face_name}")
        if not os.path.exists(old_face_name):
            logger.error(f"The file doesnt exist: {old_face_name}")
        new_faces_paths.append(new_face_name)
        old_faces_paths.append(old_face_name)
    
    try:
        report = analyzer.analyze_site_progress(
            old_faces_paths, new_faces_paths, 
            ["front", "right", "back", "left", "top", "bottom"]
        )
        
        with open(final_output_path, "w") as f:
            json.dump(report, f, indent=2)
        
        logger.debug(f"\nFull report saved to: {final_output_path}")
        return {"folder": folder_path, "status": "success", "report": report}
        
    except Exception as e:
        logger.error(f"Analysis failed for {folder_path}: {str(e)}")
        return {"folder": folder_path, "status": "failed", "error": str(e)}


In [None]:
# Main parallel execution
max_workers = 4  # Adjust based on API rate limits
with ThreadPoolExecutor(max_workers=max_workers) as executor:
    # Submit all tasks
    futures = {
        executor.submit(process_folder, folder_path, analyzer, PROCESSED_DATA_DIR, logger): folder_path 
        for folder_path in list_of_aligned_directories
    }
    
    # Process results as they complete
    for future in as_completed(futures):
        folder_path = futures[future]
        try:
            result = future.result()
            if result["status"] == "success":
                logger.info(f"✅ Completed: {folder_path}")
            else:
                logger.error(f"❌ Failed: {folder_path}")
        except Exception as e:
            logger.error(f"❌ Exception for {folder_path}: {e}")