In [None]:
import os
import sys
from pathlib import Path
import time
import json
import random
import hashlib
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor, as_completed

import pandas as pd
import numpy as np
import requests
from bs4 import BeautifulSoup
from tqdm import tqdm

from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC

# Locate project root so we can import config.py regardless of notebook launch directory
PROJECT_ROOT = Path.cwd().resolve()
while PROJECT_ROOT != PROJECT_ROOT.parent and not (PROJECT_ROOT / "config.py").exists():
    PROJECT_ROOT = PROJECT_ROOT.parent

if not (PROJECT_ROOT / "config.py").exists():
    raise FileNotFoundError("Unable to locate config.py from this notebook. Please run from within the repo.")

if str(PROJECT_ROOT) not in sys.path:
    sys.path.insert(0, str(PROJECT_ROOT))

from config import (
    ALL_LISTING_URLS_FILE,
    LISTINGS_BRONZE,
    LOGS_DIR,
    SCRAPING_CONFIG,
)
from elferspot_listings.utils.helpers import ensure_dir

In [None]:
# Centralized paths for URL registry, bronze output, and logging
ALL_LISTING_URLS_FILE = Path(ALL_LISTING_URLS_FILE)
LISTINGS_BRONZE = Path(LISTINGS_BRONZE)
LOGS_DIR = Path(LOGS_DIR)
CHANGELOG_DIR = PROJECT_ROOT / "changelogs"

ensure_dir(ALL_LISTING_URLS_FILE.parent)
ensure_dir(LISTINGS_BRONZE.parent)
ensure_dir(LOGS_DIR)
ensure_dir(CHANGELOG_DIR)

DEBUG_OUTPUT_PATH = LOGS_DIR / "scraper_debug_output.html"

print(f"URL registry ‚Üí {ALL_LISTING_URLS_FILE}")
print(f"Bronze output ‚Üí {LISTINGS_BRONZE}")
print(f"Changelogs ‚Üí {CHANGELOG_DIR}")

In [None]:
# Set up the WebDriver (Selenium)
driver_path = os.getenv("CHROMEDRIVER_PATH", r'Z:\Python\chromedriver-win64\chromedriver.exe')
service = Service(driver_path)
driver = webdriver.Chrome(service=service)

url = "https://www.elferspot.com/en/search/?series%5B%5D=911-f-model&series%5B%5D=912&series%5B%5D=911-g-model&series%5B%5D=930&series%5B%5D=964&series%5B%5D=993&series%5B%5D=996&series%5B%5D=997&series%5B%5D=991&series%5B%5D=992&series%5B%5D=911-backdate-modified&series%5B%5D=981&series%5B%5D=982&series%5B%5D=718"
driver.get(url)
time.sleep(1.5)

# Step 1: Handle the cookie popup
try:
    cookie_accept_button = WebDriverWait(driver, 10).until(
        EC.element_to_be_clickable((By.CSS_SELECTOR, 'button.brlbs-btn-accept-all'))
    )
    cookie_accept_button.click()
    print("Cookie popup accepted.")
except:
    print("No cookie popup found or it was not visible in time.")

# Step 2: Scroll the page one full screen height
driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
time.sleep(1)

# Step 3: Handle the second popup (if it appears)
try:
    dont_show_again_link = WebDriverWait(driver, 8).until(
        EC.element_to_be_clickable((By.ID, 'popover_close'))
    )
    dont_show_again_link.click()
    print("Search agent popup closed.")
except:
    print("No 'Save this search' popup found or it was not visible in time.")

# Step 4: Continue scrolling and waiting for results to load (full-page scrolls)
for _ in tqdm(range(100), desc="Scrolling and loading more listings"):
    driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
    time.sleep(1.5)

# Step 5: Extract all the listing URLs
listing_urls = []
try:
    car_listings = driver.find_elements(By.CSS_SELECTOR, 'a.content-teaser')
    for listing in car_listings:
        url = listing.get_attribute("href")
        if url:
            listing_urls.append(url)

    print(f"Found {len(listing_urls)} listing URLs:")

    prev_path = ALL_LISTING_URLS_FILE
    if prev_path.exists():
        prev_urls = set(pd.read_csv(prev_path)["Listing_URL"].tolist())
        new_urls = set(listing_urls) - prev_urls
        print(f"New URLs not in previous file: {len(new_urls)}")
    else:
        print("No previous URL file found. All URLs are new.")
except Exception as e:
    print(f"Error extracting listing URLs: {str(e)}")

# Step 6: Create a pandas DataFrame and save to CSV
df = pd.DataFrame(listing_urls, columns=['Listing_URL'])
df.to_csv(ALL_LISTING_URLS_FILE, index=False)
print(f"Listing URLs have been saved to '{ALL_LISTING_URLS_FILE}'.")

time.sleep(5)
driver.quit()

In [None]:
session = requests.Session()

REQUEST_TIMEOUT = SCRAPING_CONFIG.get('request_timeout', 10)
MAX_RETRIES = SCRAPING_CONFIG.get('max_retries', 3)
REQUEST_DELAY = SCRAPING_CONFIG.get('delay_between_requests', 1.0)
DEFAULT_MAX_WORKERS = min(16, (os.cpu_count() or 8))

HEADERS = {
    "User-Agent": SCRAPING_CONFIG.get(
        'user_agent',
        "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/125.0.0.0 Safari/537.36"
    ),
    "Accept-Language": "en-US,en;q=0.9",
}

cookies = {
    "borlabs-cookie": "%7B%22consents%22%3A%7B%22essential%22%3A%5B%22borlabs-cookie%22%2C%22vg-wort%22%2C%22woocommerce%22%2C%22wordfence%22%2C%22wpml%22%5D%2C%22statistics%22%3A%5B%22google-analytics-four%22%2C%22woocommerce-google-analytics%22%5D%2C%22marketing%22%3A%5B%22google-ads%22%2C%22meta-pixel-for-woocommerce%22%2C%22taboola%22%2C%22wc-order-attribution%22%5D%2C%22external-media%22%3A%5B%22pinterest%22%2C%22vimeo%22%2C%22youtube%22%5D%7D%2C%22domainPath%22%3A%22www.elferspot.com%2Fen%2F%22%2C%22expires%22%3A%22Wed%2C%2005%20Nov%202025%2016%3A12%3A58%20GMT%22%2C%22uid%22%3A%22m8drsv73-w32f3qy0-h1xdf0ij-1393u9z0%22%2C%22v3%22%3Atrue%2C%22version%22%3A2%7D",
    "atom-no-inquiry-popover": "true",
    "wp-wpml_current_admin_language_d41d8cd98f00b204e9800998ecf8427e": "en",
    "wp-wpml_current_language": "en"
}

FIELDS_TO_HASH = ["Title", "Price", "Mileage", "Description", "Secondary_Description"]

def compute_hash(row, fields):
    row_data = {field: row.get(field, "") for field in fields}
    row_json = json.dumps(row_data, sort_keys=True)
    return hashlib.md5(row_json.encode("utf-8")).hexdigest()

def get_field_changes(old_row, new_row, fields):
    changes = []
    for field in fields:
        old = old_row.get(field, "")
        new = new_row.get(field, "")
        if old != new:
            changes.append(f"{field}: '{old}' ‚Üí '{new}'")
    return "; ".join(changes) if changes else None

def parse_listing(url):
    last_error = None
    for attempt in range(1, MAX_RETRIES + 1):
        try:
            r = session.get(
                url, headers=HEADERS, cookies=cookies, timeout=REQUEST_TIMEOUT
            )
            if r.status_code == 200:
                break
            last_error = f"Status {r.status_code}"
        except requests.RequestException as exc:
            last_error = str(exc)
            r = None
        time.sleep(REQUEST_DELAY + random.uniform(0.1, 0.5))
    else:
        return {"URL": url, "Error": f"Failed after {MAX_RETRIES} attempts: {last_error}", "Scraped_At": datetime.today().date()}

    DEBUG_OUTPUT_PATH.write_text(r.text, encoding="utf-8")

    soup = BeautifulSoup(r.text, 'html.parser')
    data = {"URL": url, "Scraped_At": datetime.today().date()}

    title_tag = soup.find("h1")
    if title_tag:
        data["Title"] = title_tag.get_text(strip=True)

    main_desc = None
    overview_section = soup.find("section", id="overview_anchor")
    if overview_section:
        main_col = overview_section.find("div", class_="col-xs-12 col-md-8")
        if main_col:
            content_div = main_col.find("div", class_="content")
            if content_div:
                main_desc_paragraphs = [
                    p.get_text(strip=True)
                    for p in content_div.find_all("p")
                    if "small" not in (p.get("class") or [])
                ]
                if main_desc_paragraphs:
                    main_desc = "\n".join(main_desc_paragraphs)
    if main_desc:
        data["Description"] = main_desc

    secondary_desc = None
    maincontent_section = soup.find("section", class_="maincontent")
    if maincontent_section:
        for row in maincontent_section.find_all("div", class_="row"):
            for col in row.find_all("div", class_="col-xs-12"):
                if "col-md-8" in (col.get("class") or []):
                    continue
                content_div = col.find("div", class_="content")
                if content_div:
                    sec_desc_parts = []
                    for elem in content_div.find_all(["p", "ul"], recursive=False):
                        if elem.name == "p":
                            if "small" not in (elem.get("class") or []):
                                sec_desc_parts.append(elem.get_text(strip=True))
                        elif elem.name == "ul":
                            items = [li.get_text(strip=True) for li in elem.find_all("li")]
                            if items:
                                sec_desc_parts.append("\n".join(items))
                    if sec_desc_parts:
                        secondary_desc = "\n".join(sec_desc_parts)
                        break
            if secondary_desc:
                break
    if secondary_desc:
        data["Secondary_Description"] = secondary_desc

    spec_tables = soup.select("table.fahrzeugdaten")
    for table in spec_tables:
        for row in table.select("tr"):
            label = row.select_one("td.label")
            content = row.select_one("td.content")
            if label and content:
                key = label.get_text(strip=True).rstrip(":")
                value = content.get_text(strip=True)
                data[key] = value

    price_tag = soup.select_one("div.sidebar-section .price span.p")
    if price_tag:
        data["Price"] = price_tag.get_text(strip=True)

    return data

def parse_all_listings(listing_urls, max_workers=DEFAULT_MAX_WORKERS):
    listings_data = []
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        future_to_url = {executor.submit(parse_listing, url): url for url in listing_urls}
        for future in tqdm(as_completed(future_to_url), total=len(listing_urls), desc="Parsing listings"):
            listing_data = future.result()
            listings_data.append(listing_data)
    return pd.DataFrame(listings_data)

In [None]:
bronze_path = LISTINGS_BRONZE
if not ALL_LISTING_URLS_FILE.exists():
    raise FileNotFoundError(f"URL registry missing at {ALL_LISTING_URLS_FILE}. Run the scrolling cell first.")

df_urls = pd.read_csv(ALL_LISTING_URLS_FILE)
all_urls = set(df_urls['Listing_URL'].dropna())

if bronze_path.exists():
    df_existing = pd.read_excel(bronze_path)
else:
    df_existing = pd.DataFrame(columns=["URL"])

if 'URL' not in df_existing.columns:
    df_existing['URL'] = df_existing.get('URL', pd.Series(dtype=str))

df_existing.set_index("URL", inplace=True, drop=False)
existing_urls = set(df_existing.index)
new_urls = sorted(all_urls - existing_urls)

if new_urls:
    print(f"üîç Found {len(new_urls)} new listings (not in {bronze_path.name}). Parsing...")
    df_new = parse_all_listings(new_urls, max_workers=DEFAULT_MAX_WORKERS)
    df_new['Data_Hash'] = df_new.apply(lambda row: compute_hash(row, FIELDS_TO_HASH), axis=1)
    df_new.set_index("URL", inplace=True)
    df_existing = pd.concat([df_existing, df_new], axis=0)
    print(f"‚úÖ Appended {len(df_new)} new listings.")
else:
    print("No new listings detected.")

In [None]:
update_urls = sorted(existing_urls & all_urls)
change_log = []
if update_urls:
    print(f"üîÑ Checking {len(update_urls)} existing listings for updates...")
    df_updates = parse_all_listings(update_urls, max_workers=DEFAULT_MAX_WORKERS)
    df_updates.set_index("URL", inplace=True)
    for url in update_urls:
        if url not in df_existing.index or url not in df_updates.index:
            continue
        old_row = df_existing.loc[url].to_dict()
        new_row = df_updates.loc[url].to_dict()
        if new_row.get("Error"):
            print(f"‚ö†Ô∏è Skipping update for {url} due to error: {new_row['Error']}")
            continue
        old_hash = old_row.get("Data_Hash", "")
        new_hash = compute_hash(new_row, FIELDS_TO_HASH)
        if old_hash == new_hash:
            continue
        changes = get_field_changes(old_row, new_row, FIELDS_TO_HASH)
        price_changed = old_row.get("Price", "") != new_row.get("Price", "")
        only_price_changed = price_changed and all(
            old_row.get(f, "") == new_row.get(f, "") for f in FIELDS_TO_HASH if f != "Price"
        )
        new_price = str(new_row.get("Price", "")).strip().lower()
        if only_price_changed and new_price in {"reserved", "", "n/a"}:
            continue
        for col in df_existing.columns:
            if col in new_row:
                try:
                    dtype = df_existing[col].dtype
                    value = new_row[col]
                    if pd.isna(value):
                        continue
                    if "datetime" in str(dtype):
                        new_row[col] = pd.to_datetime(value, errors="coerce")
                    elif "float" in str(dtype):
                        new_row[col] = float(value)
                    elif "int" in str(dtype):
                        new_row[col] = int(float(value))
                    else:
                        new_row[col] = str(value)
                except Exception:
                    continue
        new_row['Data_Hash'] = new_hash
        df_existing.loc[url] = pd.Series(new_row)
        change_log.append({
            "URL": url,
            "Changes": changes or "hash-only update",
            "Date": datetime.now().strftime("%Y-%m-%d-%H:%M:%S")
        })
    if change_log:
        log_file = CHANGELOG_DIR / f"changelog_{datetime.now().strftime('%Y-%m-%d-%H-%M-%S')}.xlsx"
        pd.DataFrame(change_log).to_excel(log_file, index=False)
        print(f"üìù Saved changelog with {len(change_log)} updates to {log_file}")
else:
    print("No existing listings required updates.")

In [None]:
df_existing = df_existing[~df_existing.index.duplicated(keep='last')]
df_existing = df_existing.sort_index()
df_existing.reset_index(drop=True, inplace=True)

timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
snapshot_path = LISTINGS_BRONZE.parent / f"{LISTINGS_BRONZE.stem}_{timestamp}.xlsx"

df_existing.to_excel(LISTINGS_BRONZE, index=False)
df_existing.to_excel(snapshot_path, index=False)
print(
    f"‚úÖ Updated listing data saved to '{LISTINGS_BRONZE}' and snapshot '{snapshot_path.name}'. Total listings: {len(df_existing)}"
)

In [None]:
def update_descriptions_parallel(df, max_workers=DEFAULT_MAX_WORKERS):
    urls = df["URL"].dropna().tolist()
    if not urls:
        return df
    def fetch_desc(url):
        try:
            parsed = parse_listing(url)
            return url, parsed.get("Description"), parsed.get("Secondary_Description")
        except Exception as exc:
            print(f"Failed to update {url}: {exc}")
            return url, None, None
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = {executor.submit(fetch_desc, url): url for url in urls}
        for future in tqdm(as_completed(futures), total=len(futures), desc="Updating descriptions"):
            url, desc, sec_desc = future.result()
            mask = df["URL"] == url
            if desc is not None:
                df.loc[mask, "Description"] = desc
            if sec_desc is not None:
                df.loc[mask, "Secondary_Description"] = sec_desc
    return df

if 'df_existing' not in globals():
    if LISTINGS_BRONZE.exists():
        df_existing = pd.read_excel(LISTINGS_BRONZE)
    else:
        raise FileNotFoundError("Bronze dataset not found. Run scraping cells first.")

df_existing = update_descriptions_parallel(df_existing)
refresh_timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
refresh_snapshot = LISTINGS_BRONZE.parent / f"{LISTINGS_BRONZE.stem}_descriptions_{refresh_timestamp}.xlsx"
df_existing.to_excel(LISTINGS_BRONZE, index=False)
df_existing.to_excel(refresh_snapshot, index=False)
print(
    f"‚úÖ Descriptions refreshed at '{LISTINGS_BRONZE}' and snapshot '{refresh_snapshot.name}'."
)