In [60]:
import os
import json
import re
from pathlib import Path
from typing import Dict, List, Optional, Tuple
from datetime import datetime
from tqdm import tqdm

# Docling imports
from docling.document_converter import DocumentConverter
from docling.datamodel.base_models import InputFormat
from docling.datamodel.pipeline_options import PdfPipelineOptions

# LLM imports
import ollama

# Supabase
from supabase import create_client, Client

# Validation
from jsonschema import validate, ValidationError

print(" All libraries imported successfully")

 All libraries imported successfully


In [61]:
# Paths
PDF_DIR = r"C:\Users\samue\Documents\Work\Code\valuation_data_miner\data"
OUTPUT_DIR = "./extracted_data"
ERROR_LOG_DIR = "./errors"
from dotenv import load_dotenv

dotenv_path = r"C:\Users\samue\Documents\Work\Code\valuation_data_miner\.env"

with open(dotenv_path, "r") as f:
    for line in f:
        if line.strip() == "" or line.startswith("#"):
            continue
        key, value = line.strip().split("=", 1)
        os.environ[key] = value  # set environment variable



load_dotenv() 
# Supabase credentials
SUPABASE_URL = os.getenv("SUPABASE_URL")
SUPABASE_KEY = os.getenv("SUPABASE_KEY")
SUPABASE_TABLE = "property_valuations"



# LLM settings
LLM_MODEL = "mistral"
LLM_TEMPERATURE = 0.1

# Processing settings
BATCH_SIZE = 5
MAX_RETRIES = 2

# Create directories
os.makedirs(OUTPUT_DIR, exist_ok=True)
os.makedirs(ERROR_LOG_DIR, exist_ok=True)

print(f" Configuration loaded")
print(f"  PDF Directory: {PDF_DIR}")
print(f"  Output Directory: {OUTPUT_DIR}")
print(f"  LLM Model: {LLM_MODEL}")

 Configuration loaded
  PDF Directory: C:\Users\samue\Documents\Work\Code\valuation_data_miner\data
  Output Directory: ./extracted_data
  LLM Model: mistral


In [62]:
VALUATION_SCHEMA = {
    "type": "object",
    "required": ["property_id", "valuation_report", "property_details", "valuations"],
    "properties": {
        "property_id": {"type": "string"},
        "valuation_report": {
            "type": "object",
            "properties": {
                "report_reference": {"type": "string"},
                "valuer": {"type": "string"},
                "valuers": {
                    "type": "array",
                    "items": {
                        "type": "object",
                        "properties": {
                            "name": {"type": "string"},
                            "qualification": {"type": "string"}
                        }
                    }
                },
                "inspection_date": {"type": "string"},
                "report_date": {"type": "string"},
                "client": {"type": "string"},
                "client_address": {"type": "string"},
                "purpose": {"type": "string"}
            }
        },
        "property_details": {
            "type": "object",
            "properties": {
                "apartment_number": {"type": "string"},
                "block": {"type": "string"},
                "floor": {"type": "string"},
                "title_details": {"type": "object"},
                "location": {"type": "object"},
                "tenure": {"type": "object"},
                "registered_proprietors": {"type": "array"},
                "ownership_type": {"type": "string"},
                "encumbrances": {"type": "string"}
            }
        },
        "property_description": {"type": "object"},
        "apartment_details": {"type": "object"},
        "occupancy": {"type": "string"},
        "condition": {"type": "string"},
        "market_assessment": {"type": "object"},
        "valuations": {
            "type": "object",
            "properties": {
                "current_market_value": {
                    "type": "object",
                    "properties": {
                        "amount": {"type": "number"},
                        "currency": {"type": "string"},
                        "amount_words": {"type": "string"}
                    }
                }
            }
        },
        "valuation_methodology": {"type": "array"},
        "lease_details": {"type": "object"},
        "compliance": {"type": "object"}
    }
}

print("JSON schema defined")

JSON schema defined


In [63]:
import os
from typing import Optional
from supabase import create_client, Client
from dotenv import load_dotenv

dotenv_path = r"C:\Users\samue\Documents\Work\Code\valuation_data_miner\.env"

with open(dotenv_path, "r") as f:
    for line in f:
        if line.strip() == "" or line.startswith("#"):
            continue
        key, value = line.strip().split("=", 1)
        os.environ[key] = value  # set environment variable

load_dotenv() 


def init_supabase() -> Optional[Client]:
    """Initialize Supabase client using environment variables"""
    try:
        supabase_url = os.getenv("SUPABASE_URL")
        supabase_key = os.getenv("SUPABASE_KEY")

        if not supabase_url or not supabase_key:
            print("Supabase credentials not configured")
            return None

        client = create_client(supabase_url, supabase_key)
        print("Supabase client initialized")
        return client

    except Exception as e:
        print(f" Failed to initialize Supabase: {e}")
        return None

supabase_client = init_supabase()

Supabase client initialized


In [64]:
def process_pdf_with_docling(pdf_path: str) -> str:
    """
    Process a PDF file using Docling with OCR capabilities
    Returns extracted text content
    """
    try:
        # Configure pipeline with OCR
        pipeline_options = PdfPipelineOptions()
        pipeline_options.do_ocr = True
        pipeline_options.do_table_structure = True
        
        # Initialize converter
        converter = DocumentConverter(
            allowed_formats=[InputFormat.PDF],
            format_options={
                InputFormat.PDF: pipeline_options
            }
        )
        
        # Convert document
        result = converter.convert(pdf_path)
        
        # Extract text and tables
        full_text = result.document.export_to_markdown()
        
        return full_text
        
    except Exception as e:
        raise Exception(f"Docling processing failed: {str(e)}")

print(" PDF processing function defined")

 PDF processing function defined


###  LLM Extraction with Mistral

In [65]:
def extract_with_llm(text_content: str, filename: str) -> Dict:
    """
    Extract structured data from text using local Mistral model
    """
    
    system_prompt = """You are a property valuation data extraction expert. 
Extract information from property valuation reports and return ONLY valid JSON.
Follow the exact schema provided. If information is not found, use empty strings or empty objects.
Do not include any markdown formatting, explanations, or text outside the JSON object."""
    
    user_prompt = f"""Extract property valuation data from this document and return as JSON following this schema:

{json.dumps(VALUATION_SCHEMA, indent=2)}

Document content:
{text_content[:10000]}

Return ONLY the JSON object, no additional text."""
    
    try:
        response = ollama.chat(
            model=LLM_MODEL,
            messages=[
                {"role": "system", "content": system_prompt},
                {"role": "user", "content": user_prompt}
            ],
            options={
                "temperature": LLM_TEMPERATURE,
                "num_predict": 4096
            }
        )
        
        # Extract JSON from response
        response_text = response['message']['content']
        
        # Clean response (remove markdown if present)
        response_text = response_text.strip()
        if response_text.startswith('```json'):
            response_text = response_text[7:]
        if response_text.startswith('```'):
            response_text = response_text[3:]
        if response_text.endswith('```'):
            response_text = response_text[:-3]
        response_text = response_text.strip()
        
        # Parse JSON
        extracted_data = json.loads(response_text)
        
        # Add metadata
        if not extracted_data.get('property_id'):
            extracted_data['property_id'] = Path(filename).stem
        
        return extracted_data
        
    except json.JSONDecodeError as e:
        raise Exception(f"Failed to parse JSON from LLM response: {str(e)}")
    except Exception as e:
        raise Exception(f"LLM extraction failed: {str(e)}")

print("LLM extraction function defined")

LLM extraction function defined


### Data Validation

In [66]:
def validate_extracted_data(data: Dict) -> Tuple[bool, Optional[str]]:
    """
    Validate extracted data against schema
    Returns (is_valid, error_message)
    """
    try:
        validate(instance=data, schema=VALUATION_SCHEMA)
        return True, None
    except ValidationError as e:
        return False, str(e)

print(" Validation function defined")

 Validation function defined


### Supabase upload

In [67]:
def upload_to_supabase(client: Client, data: Dict, filename: str) -> bool:
    """
    Upload extracted data to Supabase
    Returns True if successful
    """
    if client is None:
        return False
    
    try:
        # Add metadata
        upload_data = data.copy()
        upload_data['source_filename'] = filename
        upload_data['processed_at'] = datetime.now().isoformat()
        
        # Insert into Supabase
        response = client.table(SUPABASE_TABLE).insert(upload_data).execute()
        
        return True
        
    except Exception as e:
        raise Exception(f"Supabase upload failed: {str(e)}")

print("Supabase upload function defined")

Supabase upload function defined


### Processing

In [68]:
def process_single_pdf(pdf_path: str, filename: str) -> Dict:
    """
    Process a single PDF through the entire pipeline
    Returns result dictionary with status
    """
    result = {
        'filename': filename,
        'success': False,
        'stage': None,
        'error': None,
        'data': None
    }
    
    try:
        # Stage 1: Docling OCR
        result['stage'] = 'docling'
        text_content = process_pdf_with_docling(pdf_path)
        
        # Stage 2: LLM Extraction
        result['stage'] = 'extraction'
        extracted_data = extract_with_llm(text_content, filename)
        
        # Stage 3: Validation
        result['stage'] = 'validation'
        is_valid, validation_error = validate_extracted_data(extracted_data)
        
        if not is_valid:
            result['error'] = f"Validation failed: {validation_error}"
        
        result['data'] = extracted_data
        
        # Stage 4: Save locally
        result['stage'] = 'saving'
        output_file = os.path.join(OUTPUT_DIR, f"{Path(filename).stem}.json")
        with open(output_file, 'w', encoding='utf-8') as f:
            json.dump(extracted_data, f, indent=2, ensure_ascii=False)
        
        # Stage 5: Upload to Supabase
        if supabase_client and is_valid:
            result['stage'] = 'upload'
            upload_to_supabase(supabase_client, extracted_data, filename)
        
        result['success'] = True
        result['stage'] = 'complete'
        
    except Exception as e:
        result['error'] = str(e)
        
        # Log error
        error_file = os.path.join(ERROR_LOG_DIR, f"{Path(filename).stem}_error.txt")
        with open(error_file, 'w') as f:
            f.write(f"Stage: {result['stage']}\n")
            f.write(f"Error: {result['error']}\n")
    
    return result

print(" Main pipeline function defined")

 Main pipeline function defined


### Batch processing

In [69]:
def process_all_pdfs():
    """
    Process all PDFs in the configured directory
    """
    # Get all PDF files
    pdf_files = list(Path(PDF_DIR).glob("*.pdf"))
    
    if not pdf_files:
        print(f"✗ No PDF files found in {PDF_DIR}")
        return []
    
    print(f"\n{'='*60}")
    print(f"Found {len(pdf_files)} PDF files to process")
    print(f"{'='*60}\n")
    
    results = []
    
    # Process each PDF
    for pdf_file in tqdm(pdf_files, desc="Processing PDFs"):
        print(f"\n[{pdf_file.name}]")
        result = process_single_pdf(str(pdf_file), pdf_file.name)
        results.append(result)
        
        if result['success']:
            print(f"  ✓ Successfully processed")
        else:
            print(f"  ✗ Failed at stage '{result['stage']}': {result['error']}")
    
    # Summary
    print(f"\n{'='*60}")
    print("PROCESSING SUMMARY")
    print(f"{'='*60}")
    
    successful = sum(1 for r in results if r['success'])
    failed = len(results) - successful
    
    print(f"Total PDFs: {len(results)}")
    print(f"Successful: {successful}")
    print(f"Failed: {failed}")
    print(f"\nOutput saved to: {OUTPUT_DIR}")
    if failed > 0:
        print(f"Error logs saved to: {ERROR_LOG_DIR}")
    
    # Save summary report
    summary_file = os.path.join(OUTPUT_DIR, "processing_summary.json")
    with open(summary_file, 'w') as f:
        json.dump({
            'timestamp': datetime.now().isoformat(),
            'total': len(results),
            'successful': successful,
            'failed': failed,
            'results': results
        }, f, indent=2)
    
    return results

print("Batch processing function defined")


Batch processing function defined


### Ollama setup

In [70]:
import ollama

try:
    response = ollama.list()
    print("RAW MODEL RESPONSE:", response)

    models = response.models  # this is a list of Model objects
    model_names = [m.model for m in models]  # use .model attribute

    print("Detected models:", model_names)

    if LLM_MODEL not in model_names and f"{LLM_MODEL}:latest" not in model_names:
        print(f"⚠ Warning: Model '{LLM_MODEL}' not found in Ollama")
        print(f"Available models: {model_names}")
        print("\nTo install Mistral, run in terminal: ollama pull mistral")
    else:
        print(f"✓ Model '{LLM_MODEL}' is available")

except Exception as e:
    print(f"✗ Could not check Ollama models: {e}")
    print("Make sure Ollama is running (run 'ollama serve' in terminal')")


2025-11-23 17:13:28,711 - INFO - HTTP Request: GET http://localhost:11434/api/tags "HTTP/1.1 200 OK"


RAW MODEL RESPONSE: models=[Model(model='mistral:latest', modified_at=datetime.datetime(2025, 11, 23, 8, 42, 42, 366963, tzinfo=TzInfo(10800)), digest='6577803aa9a036369e481d648a2baebb381ebc6e897f2bb9a766a2aa7bfbc1cf', size=4372824384, details=ModelDetails(parent_model='', format='gguf', family='llama', families=['llama'], parameter_size='7.2B', quantization_level='Q4_K_M'))]
Detected models: ['mistral:latest']
✓ Model 'mistral' is available


### View sample results

In [71]:
json_files = list(Path(OUTPUT_DIR).glob("*.json"))
json_files = [f for f in json_files if f.name != "processing_summary.json"]

if json_files:
    sample_file = json_files[0]
    print(f"Sample output from: {sample_file.name}\n")
    print("="*60)
    
    with open(sample_file, 'r') as f:
        sample_data = json.load(f)
    
    print(json.dumps(sample_data, indent=2))
else:
    print("No output files found yet")

No output files found yet


### reprocess failed files

In [72]:
def reprocess_failed(previous_results):
    """Reprocess only the failed PDFs"""
    failed_files = [r['filename'] for r in previous_results if not r['success']]
    
    if not failed_files:
        print("No failed files to reprocess")
        return []
    
    print(f"Reprocessing {len(failed_files)} failed PDFs...")
    
    retry_results = []
    for filename in tqdm(failed_files, desc="Retrying"):
        pdf_path = os.path.join(PDF_DIR, filename)
        result = process_single_pdf(pdf_path, filename)
        retry_results.append(result)
        
        if result['success']:
            print(f"  ✓ {filename} - Success on retry")
        else:
            print(f"  ✗ {filename} - Failed again")
    
    return retry_results

if 'results' in locals() and results:
    retry_results = reprocess_failed(results)

In [73]:
def query_supabase_sample():
    """Query and display sample data from Supabase"""
    if supabase_client is None:
        print("Supabase client not initialized")
        return
    
    try:
        response = supabase_client.table(SUPABASE_TABLE).select("*").limit(5).execute()
        print(f"Sample records from Supabase ({SUPABASE_TABLE}):\n")
        print(json.dumps(response.data, indent=2))
    except Exception as e:
        print(f"Error querying Supabase: {e}")

query_supabase_sample()

2025-11-23 17:13:30,856 - INFO - HTTP Request: GET https://vtsjmfrfvmqrcjheubyn.supabase.co/rest/v1/property_valuations?select=%2A&limit=5 "HTTP/2 404 Not Found"


Error querying Supabase: {'message': "Could not find the table 'public.property_valuations' in the schema cache", 'code': 'PGRST205', 'hint': None, 'details': None}


In [74]:
def analyze_results(results):
    """Analyze processing results"""
    if not results:
        print("No results to analyze")
        return
    
    print("\n" + "="*60)
    print("DETAILED ANALYSIS")
    print("="*60 + "\n")
    
    # Stage failures
    stage_failures = {}
    for r in results:
        if not r['success'] and r['stage']:
            stage_failures[r['stage']] = stage_failures.get(r['stage'], 0) + 1
    
    if stage_failures:
        print("Failures by stage:")
        for stage, count in sorted(stage_failures.items(), key=lambda x: x[1], reverse=True):
            print(f"  {stage}: {count}")
    
    # Validation stats
    validation_issues = [r for r in results if r['error'] and 'Validation' in r['error']]
    print(f"\nValidation issues: {len(validation_issues)}")
    
    # Success rate
    success_rate = (sum(1 for r in results if r['success']) / len(results)) * 100
    print(f"\nSuccess rate: {success_rate:.1f}%")
    
    print("\n" + "="*60)

if 'results' in locals() and results:
    analyze_results(results)

In [75]:
def export_to_csv():
    """Export extracted valuations to CSV for easy viewing"""
    import csv
    
    json_files = list(Path(OUTPUT_DIR).glob("*.json"))
    json_files = [f for f in json_files if f.name != "processing_summary.json"]
    
    if not json_files:
        print("No JSON files to export")
        return
    
    csv_path = os.path.join(OUTPUT_DIR, "valuations_summary.csv")
    
    with open(csv_path, 'w', newline='', encoding='utf-8') as csvfile:
        fieldnames = ['property_id', 'client', 'location', 'valuation_amount', 
                     'currency', 'report_date', 'source_file']
        writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
        
        writer.writeheader()
        
        for json_file in json_files:
            with open(json_file, 'r') as f:
                data = json.load(f)
            
            row = {
                'property_id': data.get('property_id', ''),
                'client': data.get('valuation_report', {}).get('client', ''),
                'location': data.get('property_details', {}).get('location', {}).get('area', ''),
                'valuation_amount': data.get('valuations', {}).get('current_market_value', {}).get('amount', ''),
                'currency': data.get('valuations', {}).get('current_market_value', {}).get('currency', ''),
                'report_date': data.get('valuation_report', {}).get('report_date', ''),
                'source_file': json_file.name
            }
            
            writer.writerow(row)
    
    print(f"✓ CSV exported to: {csv_path}")

# Uncomment to export to CSV:
# export_to_csv()