# HouseHunter Pro - Workflow

**Workflow:**
1. Detect scrape dates
2. Load search results and compute URL diff
3. Show new URLs to scrape
4. Parse all listings
5. Create final dataframe with: `is_sold`, `prices` (history), `days_live`

## Setup

In [1]:
import pandas as pd
from pathlib import Path

from scraping import (
    process_listings_directory,
    process_search_results_directory,
)
from scraping.utils import deduplicate_listings, fix_list_columns
from scraping.derive_fields import derive_fields_for_dataset, serialize_derived_fields

pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', 100)

SCRAPED_DIR = Path('data/scraped')
DATA_DIR = Path('.')

print("Ready")

Ready


## Step 1: Detect Available Scrape Dates

Auto-detect all scraped dates from the data directory.

In [2]:
# Find all date directories (format: YYYY_MM_DD)
import re

date_dirs = []
for d in SCRAPED_DIR.iterdir():
    if d.is_dir() and re.match(r'\d{4}_\d{2}_\d{2}', d.name):
        date_dirs.append(d.name)

# Sort chronologically
date_dirs = sorted(date_dirs)

print(f"Found {len(date_dirs)} scrape dates:")
for i, d in enumerate(date_dirs):
    label = "(current)" if i == len(date_dirs) - 1 else ""
    print(f"  {d} {label}")

# Current date is the most recent
CURRENT_DATE = date_dirs[-1] if date_dirs else None
ALL_DATES = date_dirs

Found 2 scrape dates:
  2026_01_18 
  2026_01_24 (current)


---

## Step 2: Load Search Results for All Dates

Load search results from all scraped dates to find:
- New URLs to scrape (appeared in current but not previous)
- Sold URLs (disappeared between scrapes)

In [3]:
# Load search results for all dates
search_results_by_date = {}

for date_str in ALL_DATES:
    search_dir = SCRAPED_DIR / date_str / 'search_results'
    if search_dir.exists():
        df = process_search_results_directory(search_dir, date_str)
        if not df.empty:
            # Deduplicate - keep first occurrence (lowest page number)
            df = df.drop_duplicates(subset=['listing_id'], keep='first')
            search_results_by_date[date_str] = df
            print(f"{date_str}: {len(df)} unique listings")

print(f"\nLoaded search results for {len(search_results_by_date)} dates")

2026_01_18: 157 unique listings
2026_01_24: 157 unique listings

Loaded search results for 2 dates


In [4]:
# Compute URL diffs and build price history
sorted_dates = sorted(search_results_by_date.keys())

# Track price history for each listing: listing_id -> [(date, price), ...]
price_history = {}

# Track when each listing was last seen (to detect sold)
last_seen_date = {}

# Build history from all dates
for date_str in sorted_dates:
    df = search_results_by_date[date_str]
    for _, row in df.iterrows():
        lid = row['listing_id']
        price = row.get('price')
        
        if lid not in price_history:
            price_history[lid] = []
        price_history[lid].append((date_str, price))
        last_seen_date[lid] = date_str

# Compute new/removed for latest comparison
if len(sorted_dates) >= 2:
    prev_date = sorted_dates[-2]
    curr_date = sorted_dates[-1]
    
    df_prev = search_results_by_date[prev_date]
    df_curr = search_results_by_date[curr_date]
    
    prev_ids = set(df_prev['listing_id'].unique())
    curr_ids = set(df_curr['listing_id'].unique())
    
    new_ids = curr_ids - prev_ids
    removed_ids = prev_ids - curr_ids
    
    df_new_urls = df_curr[df_curr['listing_id'].isin(new_ids)].copy()
    
    print(f"Comparison: {prev_date} -> {curr_date}")
    print(f"  New: {len(new_ids)}")
    print(f"  Removed (sold): {len(removed_ids)}")
    print(f"  Still active: {len(curr_ids & prev_ids)}")
else:
    df_new_urls = pd.DataFrame()
    removed_ids = set()
    print("Only one scrape date - no comparison possible")

Comparison: 2026_01_18 -> 2026_01_24
  New: 13
  Removed (sold): 13
  Still active: 144


## New URLs to Scrape

URLs that appeared in the current scrape but weren't in the previous one.

In [5]:
# Show new URLs to scrape
if not df_new_urls.empty:
    print(f"New URLs to scrape: {len(df_new_urls)}\n")
    display(df_new_urls[['portal', 'listing_id', 'url', 'price']].reset_index(drop=True))
else:
    print("No new URLs to scrape")

New URLs to scrape: 13



Unnamed: 0,portal,listing_id,url,price
0,idealista,ideal_34602221,https://www.idealista.it/immobile/34602221/,470000.0
1,idealista,ideal_34638590,https://www.idealista.it/immobile/34638590/,610000.0
2,idealista,ideal_34635714,https://www.idealista.it/immobile/34635714/,550000.0
3,immobiliare,immo_126177541,https://www.immobiliare.it/annunci/126177541/,
4,immobiliare,immo_126018123,https://www.immobiliare.it/annunci/126018123/,
5,immobiliare,immo_126170915,https://www.immobiliare.it/annunci/126170915/,
6,immobiliare,immo_126171269,https://www.immobiliare.it/annunci/126171269/,
7,immobiliare,immo_126108861,https://www.immobiliare.it/annunci/126108861/,
8,immobiliare,immo_124590193,https://www.immobiliare.it/annunci/124590193/,
9,immobiliare,immo_126011745,https://www.immobiliare.it/annunci/126011745/,


---

## Step 3: Parse Listings

In [6]:
# Parse listings from all scraped dates
all_listings = []

for date_str in ALL_DATES:
    listings_dir = SCRAPED_DIR / date_str
    df = process_listings_directory(listings_dir, date_str)
    
    if not df.empty:
        all_listings.append(df)
        print(f"{date_str}: {len(df)} listings parsed")
    else:
        print(f"{date_str}: No listing HTML files found")

# Combine all listings
if all_listings:
    df_listings = pd.concat(all_listings, ignore_index=True)
    print(f"\nTotal: {len(df_listings)} listings parsed")
else:
    df_listings = pd.DataFrame()
    print("\nNo listings parsed yet")

2026_01_18: 152 listings parsed
2026_01_24: 13 listings parsed

Total: 165 listings parsed


---

## Step 4: Final DataFrame

Add `is_sold`, `prices` (history from search results), and `days_live`.

In [7]:
# Build final dataframe
if not df_listings.empty:
    df = df_listings.copy()
    
    # Get the most recent scrape date (current)
    current_date = sorted_dates[-1] if sorted_dates else None
    
    # is_sold: True if listing was seen before but not in the latest search results
    def check_sold(lid):
        last_seen = last_seen_date.get(lid)
        if last_seen and current_date:
            return last_seen != current_date
        return False
    
    df['is_sold'] = df['listing_id'].apply(check_sold)
    
    # prices: list of prices, only adding when price changed
    def get_prices(lid):
        history = price_history.get(lid, [])
        prices = []
        for (d, p) in history:
            if p is not None:
                # Only add if different from last price
                if not prices or p != prices[-1]:
                    prices.append(p)
        return prices if prices else None
    
    df['prices'] = df['listing_id'].apply(get_prices)
    
    # days_live: for sold listings, calculate days from created_at to removal
    def calc_days_live(row):
        if not row['is_sold']:
            return None
        
        # Get listing date from created_at (Unix timestamp for Immobiliare)
        listing_date = None
        if 'created_at' in row.index and pd.notna(row.get('created_at')):
            val = row['created_at']
            # Check if it's a Unix timestamp (large number)
            if isinstance(val, (int, float)) and val > 1e9:
                listing_date = pd.to_datetime(val, unit='s')
            else:
                listing_date = pd.to_datetime(val, errors='coerce')
        
        if listing_date is None or pd.isna(listing_date):
            return None
        
        # Get removal date (the date after last_seen)
        last_seen = last_seen_date.get(row['listing_id'])
        if last_seen:
            try:
                idx = sorted_dates.index(last_seen)
                if idx + 1 < len(sorted_dates):
                    removal_date = pd.to_datetime(sorted_dates[idx + 1], format='%Y_%m_%d')
                    return (removal_date - listing_date).days
            except:
                pass
        return None
    
    df['days_live'] = df.apply(calc_days_live, axis=1)
    
    print(f"Final dataframe: {len(df)} listings")
    print(f"  Sold: {df['is_sold'].sum()}")
    print(f"  Active: {(~df['is_sold']).sum()}")
    
    # Show days_live stats for sold
    sold_days = df[df['is_sold']]['days_live'].dropna()
    if len(sold_days) > 0:
        print(f"  Days live (sold): min={sold_days.min():.0f}, max={sold_days.max():.0f}, avg={sold_days.mean():.0f}")
else:
    df = pd.DataFrame()
    print("No listings to process")

Final dataframe: 165 listings
  Sold: 12
  Active: 153
  Days live (sold): min=11, max=119, avg=80


---

## Quick Reference

**Final DataFrame (`df`) columns:**
- `listing_id`, `portal`, `url`, `title`, `price`, etc. (from parsed HTML)
- `is_sold` - True if listing disappeared from search results
- `prices` - List of prices (only when changed)
- `days_live` - Days from `created_at` to removal date (for sold listings)
- `created_at` - Listing date (Unix timestamp, both portals)

**Other variables:**
- `df_new_urls` - New URLs to scrape (from latest comparison)

## Step 5: Deduplicate Listings

Remove duplicate listings that appear on both Immobiliare and Idealista.

**Matching criteria:**
- Exact same price
- Exact same surface area
- Coordinates within 100m

Keeps the Immobiliare listing and merges any additional data from the Idealista duplicate.

In [8]:
# Deduplicate listings across portals
# Keeps Immobiliare listings and merges data from duplicate Idealista listings

if not df.empty:
    print(f"Before deduplication: {len(df)} listings")
    print(f"  Immobiliare: {len(df[df.portal == 'immobiliare'])}")
    print(f"  Idealista: {len(df[df.portal == 'idealista'])}")
    
    df = deduplicate_listings(df)
    df = df.reset_index(drop=True)
    
    # Fix list columns for parquet serialization
    df = fix_list_columns(df)
    
    print(f"\nAfter deduplication: {len(df)} listings")
    print(f"  Immobiliare: {len(df[df.portal == 'immobiliare'])}")
    print(f"  Idealista: {len(df[df.portal == 'idealista'])}")

Before deduplication: 165 listings
  Immobiliare: 97
  Idealista: 68
Found 22 duplicate listings
  Merging: ideal_24217142 -> immo_125271473
  Merging: ideal_24851423 -> immo_125233655
  Merging: ideal_25075109 -> immo_125522717
  Merging: ideal_31845035 -> immo_117757313
  Merging: ideal_32148437 -> immo_118939011
  Merging: ideal_32198336 -> immo_119038273
  Merging: ideal_32551850 -> immo_125402535
  Merging: ideal_32693488 -> immo_125699521
  Merging: ideal_33075416 -> immo_121549264
  Merging: ideal_33121115 -> immo_121674072
  Merging: ideal_33404330 -> immo_122526106
  Merging: ideal_33569998 -> immo_125930031
  Merging: ideal_33724493 -> immo_123422763
  Merging: ideal_33909723 -> immo_123937121
  Merging: ideal_33934638 -> immo_124014459
  Merging: ideal_33993703 -> immo_124174799
  Merging: ideal_34180958 -> immo_124722345
  Merging: ideal_34283360 -> immo_125070539
  Merging: ideal_34297868 -> immo_125106347
  Merging: ideal_34305642 -> immo_125131153
  Merging: ideal_343621

In [9]:
# Load existing derived_fields from JSON cache
# This is separate from the parquet and persists between runs

import json

derived_fields_path = Path('data/processed/derived_fields.json')

if derived_fields_path.exists():
    with open(derived_fields_path) as f:
        derived_cache = json.load(f)
    print(f"Loaded {len(derived_cache)} cached derived_fields")
else:
    derived_cache = {}
    print("No derived_fields cache found, starting fresh")

# Add derived_fields column to df from cache
if not df.empty:
    df['derived_fields'] = df['id'].apply(lambda x: derived_cache.get(x))
    cached = df['derived_fields'].notna().sum()
    print(f"Matched {cached}/{len(df)} listings from cache")

Loaded 143 cached derived_fields
Matched 143/143 listings from cache


In [10]:
# Derive missing fields using Gemini (runs in parallel with 10 workers)
# Only processes listings not in the cache

if not df.empty:
    missing = df['derived_fields'].isna().sum()
    print(f"Listings missing derived_fields: {missing}/{len(df)}")
    
    if missing > 0:
        df = derive_fields_for_dataset(df, DATA_DIR, max_workers=10)
        
        # Update cache with all successful derived_fields (new or retried)
        updated = 0
        for _, row in df.iterrows():
            if row['derived_fields']:  # If we have a valid result
                if row['id'] not in derived_cache:
                    derived_cache[row['id']] = row['derived_fields']
                    updated += 1
        
        # Save updated cache
        with open(derived_fields_path, 'w') as f:
            json.dump(derived_cache, f, indent=2)
        print(f"Added {updated} new derived_fields to cache (total: {len(derived_cache)})")
    
    # Reload cache to get all derived fields
    with open(derived_fields_path) as f:
        derived_cache = json.load(f)
    
    # Mapping from derived field names to dataframe column names
    # (derived_key, df_column, is_new_column)
    field_mapping = [
        ('summary', 'ai_summary', True),  # New column
        ('beauty_score', 'beauty_score', True),  # New column (1-5 rating)
        ('beauty_notes', 'beauty_notes', True),  # New column (explanation)
        ('bedrooms', 'bedrooms', False),  # Exists
        ('bathrooms', 'bathrooms', False),  # Exists
        ('kitchen_type', 'kitchen', False),  # Exists as 'kitchen'
        ('balconies', 'balconies', True),  # New column (count)
        ('terraces', 'terraces', True),  # New column (count)
        ('has_cantina', 'has_cellar', False),  # Exists as 'has_cellar'
        ('has_garage', 'has_garage', True),  # New column
        ('parking_spots', 'parking_spots', True),  # New column
        ('has_elevator', 'elevator', False),  # Exists as 'elevator'
        ('floor_number', 'floor', False),  # Exists as 'floor'
        ('total_floors', 'floors_building', False),  # Exists
        ('has_air_conditioning', 'has_air_conditioning', False),  # Exists
        ('heating_type', 'heating', False),  # Exists
        ('condition', 'condition', False),  # Exists
        ('exposure', 'exposure', True),  # New column
        ('has_garden', 'has_garden', False),  # Exists
        ('energy_class', 'energy_class', False),  # Exists
    ]
    
    # Add new columns if they don't exist
    for derived_key, df_col, is_new in field_mapping:
        if is_new and df_col not in df.columns:
            df[df_col] = None
    
    # Fill missing values from derived fields
    filled_count = 0
    for idx, row in df.iterrows():
        derived = derived_cache.get(row['id'], {})
        if not derived:
            continue
        
        for derived_key, df_col, _ in field_mapping:
            if derived_key not in derived:
                continue
            derived_val = derived[derived_key]
            
            # Skip invalid values
            if derived_val is None or derived_val == -1 or derived_val == '':
                continue
            
            # Check if current value is missing
            current_val = row.get(df_col)
            is_missing = current_val is None or (isinstance(current_val, float) and pd.isna(current_val)) or current_val == ''
            
            if is_missing:
                # Convert value to match column dtype if needed
                col_dtype = df[df_col].dtype
                if pd.api.types.is_string_dtype(col_dtype) and not isinstance(derived_val, str):
                    derived_val = str(derived_val)
                
                df.at[idx, df_col] = derived_val
                filled_count += 1
    
    print(f"Filled {filled_count} missing values from derived fields")
    
    # Drop the derived_fields column - we've merged into flat columns
    if 'derived_fields' in df.columns:
        df = df.drop(columns=['derived_fields'])
    
    print(f"Final columns: {len(df.columns)}")

Listings missing derived_fields: 0/143
Filled 1235 missing values from derived fields
Final columns: 73


In [11]:
# Save final dataframe
output_dir = Path('data/processed')
output_dir.mkdir(parents=True, exist_ok=True)

if not df.empty:
    df.to_parquet(output_dir / 'listings.parquet', index=False)
    print(f"Saved: {output_dir / 'listings.parquet'}")
    
# Export new URLs to scrape
if not df_new_urls.empty:
    urls_file = output_dir / 'new_urls_to_scrape.txt'
    with open(urls_file, 'w') as f:
        for url in df_new_urls['url']:
            f.write(url + '\n')
    print(f"Saved: {urls_file} ({len(df_new_urls)} URLs)")

Saved: data/processed/listings.parquet
Saved: data/processed/new_urls_to_scrape.txt (13 URLs)
