In [None]:
from google.colab import auth
auth.authenticate_user()   # then create the BigQuery client

In [None]:
import os
import sys
import time
import json
import requests
import unicodedata
import re
import pandas as pd
import numpy as np
from typing import Optional, Dict, Any, List, Tuple

from google.cloud import bigquery
from google.cloud.exceptions import NotFound
from google.auth.exceptions import DefaultCredentialsError, RefreshError
from google.oauth2 import service_account
import concurrent.futures as _fut

In [None]:
PROJECT_ID   = os.getenv("PROJECT_ID",   "compact-garage-473209-u4")
DATASET_ID   = os.getenv("DATASET_ID",   "RDW_final")
LOCATION     = os.getenv("LOCATION",     "EU")

# Year window
YEAR_START   = int(os.getenv("YEAR_START", "2023"))
YEAR_END     = int(os.getenv("YEAR_END",   "2025"))

# Final table name (only table we write)
TBL_AGG      = os.getenv("TBL_AGG", "rdw_brand_model_peryear")

# RDW SODA config RDW db
SODA_BASE     = "https://opendata.rdw.nl/resource"
RESOURCE_VEH  = "m9d7-ebf2"   # Gekentekende voertuigen
RESOURCE_FUEL = "8ys7-d773"   # Gekentekende voertuigen brandstof
RDW_APP_TOKEN = os.getenv("RDW_APP_TOKEN")  # optional

# Fetch sizing
PAGE_LIMIT = int(os.getenv("PAGE_LIMIT", "50000"))
MAX_ROWS   = os.getenv("MAX_ROWS")
MAX_ROWS   = int(MAX_ROWS) if MAX_ROWS else None

# Google Programmable Search (Custom Search JSON API)
GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY", "AIzaSyCsGoDVcx99QBb5Xd50BPCUFSEk0mN2XGA")
GOOGLE_CSE_ID  = os.getenv("GOOGLE_CSE_ID",  "26a5701f7452744ed")
IMG_VALIDATE   = os.getenv("IMG_VALIDATE", "true").lower() == "true"  # HEAD/GET quick check
IMG_FALLBACK_OV= os.getenv("IMG_FALLBACK_OPENVERSE", "true").lower() == "true"
IMG_SLEEP_S    = float(os.getenv("IMG_SLEEP_S", "0.2")) #only for sequential version
IMG_TOP_N      = os.getenv("IMG_TOP_N")  # limit enrichment to top-N rows (by latest-year count) to save quota
IMG_TOP_N      = int(IMG_TOP_N) if IMG_TOP_N else None

SA_PATH = os.getenv("GOOGLE_APPLICATION_CREDENTIALS", "")
SA_JSON = os.getenv("GOOGLE_SERVICE_ACCOUNT_JSON", "")

In [None]:
def make_bq_client(project_id: str) -> bigquery.Client:
    try:
        return bigquery.Client(project=project_id)
    except (DefaultCredentialsError, RefreshError) as e_default:
        print("[AUTH] Default credentials not usable:", repr(e_default))

    if SA_PATH and os.path.exists(SA_PATH):
        print(f"[AUTH] Using service account file at {SA_PATH}")
        creds = service_account.Credentials.from_service_account_file(SA_PATH)
        return bigquery.Client(project=project_id, credentials=creds)

    if SA_JSON:
        try:
            print("[AUTH] Using service account JSON from GOOGLE_SERVICE_ACCOUNT_JSON")
            info = json.loads(SA_JSON)
            creds = service_account.Credentials.from_service_account_info(info)
            return bigquery.Client(project=project_id, credentials=creds)
        except Exception as e_json:
            print("[AUTH] Failed to parse GOOGLE_SERVICE_ACCOUNT_JSON:", repr(e_json))

    try:
        from google.colab import auth as colab_auth  # type: ignore
        print("[AUTH] Starting Colab OAuth flow...")
        colab_auth.authenticate_user()
        print("[AUTH] Colab user authenticated; using ADC")
        return bigquery.Client(project=project_id)
    except Exception as e_colab:
        print("[AUTH] Colab OAuth not available / failed:", repr(e_colab))
        raise RuntimeError(
            "No valid Google Cloud credentials found. "
            "Provide a service account or run in Colab and authenticate."
        )

### helpers functions

In [None]:
def soda_headers() -> Dict[str, str]:
    h = {"Accept": "application/json", "User-Agent": "rdw-bq-loader/3.1"}
    if RDW_APP_TOKEN:
        h["X-App-Token"] = RDW_APP_TOKEN
    return h

In [None]:
def fetch_soda(
    resource: str,
    select: str,
    where: Optional[str] = None,
    order: Optional[str] = None,
    limit: int = 50000,
    max_rows: Optional[int] = None,
    timeout: int = 120,
) -> pd.DataFrame:
    rows: List[Dict[str, Any]] = []
    offset = 0
    while True:
        params = {"$select": select, "$limit": limit, "$offset": offset}
        if where: params["$where"] = where
        if order: params["$order"] = order
        url = f"{SODA_BASE}/{resource}.json"

        r = requests.get(url, params=params, headers=soda_headers(), timeout=timeout)
        if not r.ok:
            print("\n[SODA] ERROR URL:", r.url, file=sys.stderr)
            print("[SODA] ERROR BODY]:", r.text[:1000], file=sys.stderr)
            r.raise_for_status()

        batch = r.json()
        if not batch:
            break
        rows.extend(batch)

        if max_rows and len(rows) >= max_rows:
            rows = rows[:max_rows]
            break

        if len(batch) < limit:
            break

        offset += limit
        time.sleep(0.05)  # be kind to API

    return pd.DataFrame(rows)

In [None]:
def ensure_dataset(client: bigquery.Client, dataset_id: str, location: str = "EU"):
    ds_ref = f"{client.project}.{dataset_id}"
    try:
        client.get_dataset(ds_ref)
        print(f"[BQ] Dataset exists: {ds_ref}")
    except NotFound:
        print(f"[BQ] Creating dataset: {ds_ref} in {location} ...")
        ds = bigquery.Dataset(ds_ref)
        ds.location = location
        client.create_dataset(ds, exists_ok=True)
        print(f"[BQ] Dataset created: {ds_ref}")

In [None]:
def upload_to_bigquery(
    client: bigquery.Client,
    rows: List[Dict[str, Any]],
    table_id: str,
    schema: List[bigquery.SchemaField],
    write_disposition: str = "WRITE_TRUNCATE",
    batch_size: int = 50000,
):
    if not rows:
        print(f"[BQ] No data to load for {table_id}")
        return

    safe_rows: List[Dict[str, Any]] = []
    bad = 0
    for i, r in enumerate(rows):
        try:
            _ = json.loads(json.dumps(r, ensure_ascii=False))
            safe_rows.append(r)
        except Exception as e:
            bad += 1
            if bad <= 5:
                print(f"[SANITY] Dropping row {i} due to JSON error: {e}. Snippet: {str(r)[:200]}", file=sys.stderr)
    if bad:
        print(f"[SANITY] Dropped {bad} malformed rows before BigQuery load.", file=sys.stderr)

    job_config = bigquery.LoadJobConfig(schema=schema, write_disposition=write_disposition)
    for i in range(0, len(safe_rows), batch_size):
        chunk = safe_rows[i:i + batch_size]
        job = client.load_table_from_json(chunk, table_id, job_config=job_config)
        job.result()
        print(f"[BQ] Loaded {len(chunk)} rows into {table_id} (batch {i//batch_size+1})")

### RDW fetchers (multi-year)

In [None]:
def _norm_plate(s):
    if pd.isna(s):
        return s
    return str(s).upper().replace(" ", "").replace("-", "")


In [None]:
def fetch_vehicles_between(year_start: int, year_end: int) -> pd.DataFrame:
    def _postprocess(d: pd.DataFrame, dt_variant: bool) -> pd.DataFrame:
        if d.empty:
            return d
        if dt_variant:
            d["first_registration_nl_date"] = pd.to_datetime(
                d["datum_eerste_tenaamstelling_in_nederland_dt"], errors="coerce"
            )
            d["current_registration_date"] = pd.to_datetime(
                d["datum_tenaamstelling_dt"], errors="coerce"
            )
            d["first_adm_date"] = pd.to_datetime(
                d.get("datum_eerste_toelating_dt"), errors="coerce"
            )
        else:
            d["first_registration_nl_date"] = pd.to_datetime(
                d["datum_eerste_tenaamstelling_in_nederland"], format="%Y%m%d", errors="coerce"
            )
            d["current_registration_date"] = pd.to_datetime(
                d["datum_tenaamstelling"], format="%Y%m%d", errors="coerce"
            )
            d["first_adm_date"] = pd.to_datetime(
                d.get("datum_eerste_toelating"), format="%Y%m%d", errors="coerce"
            )

        keep = [
            "kenteken","merk","handelsbenaming","catalogusprijs",
            "first_registration_nl_date","current_registration_date","zuinigheidsclassificatie",
            "inrichting",
            "aantal_zitplaatsen","massa_ledig_voertuig",
            "lengte","breedte","wielbasis",
            "vermogen_massarijklaar","massa_rijklaar",
            "first_adm_date",
        ]
        for col in keep:
            if col not in d.columns:
                d[col] = pd.NA
        out = d[keep].copy()

        num_cols = [
            "catalogusprijs","aantal_zitplaatsen","massa_ledig_voertuig",
            "lengte","breedte","wielbasis",
            "vermogen_massarijklaar","massa_rijklaar",
        ]
        for c in num_cols:
            out[c] = pd.to_numeric(out[c], errors="coerce")

        with np.errstate(divide="ignore", invalid="ignore"):
            ptw = out["vermogen_massarijklaar"] / out["massa_rijklaar"]
        out["power_to_weight"] = ptw.replace([np.inf, -np.inf], np.nan)

        out["datum_eerste_toelating_year"] = (
            pd.to_datetime(out["first_adm_date"], errors="coerce").dt.year.astype("Int64")
        )

        out["kenteken"] = out["kenteken"].map(_norm_plate)
        return out

    def _try_dt(include_class: bool) -> pd.DataFrame:
        base_fields = [
            "kenteken","merk","handelsbenaming",
            "(catalogusprijs::number) as catalogusprijs",
            "datum_eerste_tenaamstelling_in_nederland_dt",
            "datum_tenaamstelling_dt",
        ]
        if include_class:
            base_fields.append("zuinigheidsclassificatie")
        extra_fields = [
            "inrichting",
            "(aantal_zitplaatsen::number) as aantal_zitplaatsen",
            "(massa_ledig_voertuig::number) as massa_ledig_voertuig",
            "(lengte::number) as lengte",
            "(breedte::number) as breedte",
            "(wielbasis::number) as wielbasis",
            "(vermogen_massarijklaar::number) as vermogen_massarijklaar",
            "(massa_rijklaar::number) as massa_rijklaar",
            "datum_eerste_toelating_dt",
        ]
        select_dt = ",".join(base_fields + extra_fields)
        where_dt = (
            f"datum_eerste_tenaamstelling_in_nederland_dt >= '{year_start}-01-01T00:00:00' AND "
            f"datum_eerste_tenaamstelling_in_nederland_dt <  '{year_end+1}-01-01T00:00:00' AND "
            "catalogusprijs IS NOT NULL AND merk IS NOT NULL AND handelsbenaming IS NOT NULL"
        )
        d = fetch_soda(RESOURCE_VEH, select_dt, where_dt, limit=PAGE_LIMIT, max_rows=MAX_ROWS)
        return _postprocess(d, True)

    def _try_str(include_class: bool) -> pd.DataFrame:
        base_fields = [
            "kenteken","merk","handelsbenaming",
            "(catalogusprijs::number) as catalogusprijs",
            "datum_eerste_tenaamstelling_in_nederland",
            "datum_tenaamstelling",
        ]
        if include_class:
            base_fields.append("zuinigheidsclassificatie")
        extra_fields = [
            "inrichting",
            "(aantal_zitplaatsen::number) as aantal_zitplaatsen",
            "(massa_ledig_voertuig::number) as massa_ledig_voertuig",
            "(lengte::number) as lengte",
            "(breedte::number) as breedte",
            "(wielbasis::number) as wielbasis",
            "(vermogen_massarijklaar::number) as vermogen_massarijklaar",
            "(massa_rijklaar::number) as massa_rijklaar",
            "datum_eerste_toelating",
        ]
        select_str = ",".join(base_fields + extra_fields)
        where_str = (
            f"datum_eerste_tenaamstelling_in_nederland >= '{year_start}0101' AND "
            f"datum_eerste_tenaamstelling_in_nederland <= '{year_end}1231' AND "
            "catalogusprijs IS NOT NULL AND merk IS NOT NULL AND handelsbenaming IS NOT NULL"
        )
        d = fetch_soda(RESOURCE_VEH, select_str, where_str, limit=PAGE_LIMIT, max_rows=MAX_ROWS)
        return _postprocess(d, False)

    try:
        d = _try_dt(include_class=True)
    except requests.HTTPError:
        d = pd.DataFrame()
    if d.empty:
        try:
            d = _try_dt(include_class=False)
        except requests.HTTPError:
            d = pd.DataFrame()
    if d.empty:
        try:
            d = _try_str(include_class=True)
        except requests.HTTPError:
            d = pd.DataFrame()
    if d.empty:
        d = _try_str(include_class=False)

    if d.empty:
        return d

    mask = (
        d["first_registration_nl_date"].notna() &
        (d["first_registration_nl_date"].dt.year >= year_start) &
        (d["first_registration_nl_date"].dt.year <= year_end)
    )
    df = d[mask].copy()

    if not df.empty:
        fr_min = df["first_registration_nl_date"].min()
        fr_max = df["first_registration_nl_date"].max()
        cr_min = df["current_registration_date"].min()
        cr_max = df["current_registration_date"].max()
        print("[DEBUG] NL first-registration range:", fr_min.date(), "→", fr_max.date())
        print("[DEBUG] Current registration range: ", cr_min.date(), "→", cr_max.date())

    return df

In [None]:
def _chunk_list(lst: List[str], n: int):
    for i in range(0, len(lst), n):
        yield lst[i:i+n]

In [None]:
def fetch_fuel_primary_for_plates(kentekens: List[str], batch_size: int = 400) -> pd.DataFrame:
    if not kentekens:
        return pd.DataFrame(columns=["kenteken", "brandstof_omschrijving"])
    out = []
    for chunk in _chunk_list(list(kentekens), batch_size):
        in_list = ",".join([f"'{k}'" for k in chunk])
        where = f"kenteken in ({in_list}) AND brandstof_volgnummer = '1' AND brandstof_omschrijving IS NOT NULL"
        df = fetch_soda(
            RESOURCE_FUEL,
            select="kenteken,brandstof_omschrijving,brandstof_volgnummer",
            where=where,
            limit=5000,
            max_rows=None,
        )
        out.append(df)
    df_all = pd.concat(out, ignore_index=True) if out else pd.DataFrame()
    if df_all.empty:
        return pd.DataFrame(columns=["kenteken", "brandstof_omschrijving"])
    df_all["kenteken"] = df_all["kenteken"].map(_norm_plate)
    df_all = (df_all.sort_values(["kenteken", "brandstof_volgnummer"])
                    .drop_duplicates(subset=["kenteken"], keep="first"))
    return df_all[["kenteken", "brandstof_omschrijving"]]


### Feature engineering & aggregation

In [None]:
def compute_occasion_flag(df: pd.DataFrame) -> pd.DataFrame:
    a = pd.to_datetime(df.get("first_registration_nl_date"), errors="coerce")
    b = pd.to_datetime(df.get("current_registration_date"), errors="coerce")
    df["occasion_flag"] = np.where(a.notna() & b.notna() & (a != b), 1, 0).astype("int64")
    return df

In [None]:
def unique_non_null(values):
    vals = [str(v).strip() for v in values if pd.notnull(v) and str(v).strip()]
    return ", ".join(sorted(set(vals))) if vals else None

def mode_non_null(values):
    s = pd.Series([str(v).strip() for v in values if pd.notnull(v) and str(v).strip()])
    return s.mode().iloc[0] if not s.empty else None

def median_non_null(values):
    s = pd.to_numeric(pd.Series(values), errors="coerce")
    if s.notna().any():
        return float(s.median())
    return np.nan


In [None]:
def build_brand_model_per_year(df_join: pd.DataFrame,
                               year_start: int, year_end: int) -> pd.DataFrame:
    """
    Brand–model aggregation (per-year) + extras (incl. computed power_to_weight).
    """
    if df_join.empty:
        cols = ["brand","model","fuel_types_primary","economy_rate","resold_flag",
                "inrichting_std",
                "seats_median","mass_empty_median","length_median","width_median","wheelbase_median",
                "pw_ratio_median",
                "datum_eerste_toelating_year"] + \
               [f"count_{y}" for y in range(year_start, year_end+1)] + \
               [f"avg_{y}" for y in range(year_start, year_end+1)]
        return pd.DataFrame(columns=cols)

    df = df_join.copy()

    df["catalogusprijs"] = pd.to_numeric(df["catalogusprijs"], errors="coerce")
    df = df[df["catalogusprijs"].notna() & (df["catalogusprijs"] > 0)]
    df["merk"] = df["merk"].astype(str).str.strip()
    df["handelsbenaming"] = df["handelsbenaming"].astype(str).str.strip()

    if "occasion_flag" not in df.columns:
        df = compute_occasion_flag(df)

    if "power_to_weight" not in df.columns:
        with np.errstate(divide="ignore", invalid="ignore"):
            df["power_to_weight"] = pd.to_numeric(df.get("vermogen_massarijklaar"), errors="coerce") / \
                                    pd.to_numeric(df.get("massa_rijklaar"), errors="coerce")
        df["power_to_weight"] = df["power_to_weight"].replace([np.inf, -np.inf], np.nan)

    df["year"] = pd.to_datetime(df["first_registration_nl_date"], errors="coerce").dt.year
    df = df[(df["year"].notna()) & (df["year"] >= year_start) & (df["year"] <= year_end)]

    counts = (df.groupby(["merk","handelsbenaming","year"], as_index=False)
                .agg(vehicle_count=("kenteken", "nunique")))
    pivot_counts = (counts.pivot_table(index=["merk","handelsbenaming"], columns="year",
                                       values="vehicle_count", fill_value=0)
                          .rename(columns={y: f"count_{y}" for y in counts["year"].unique()})
                          .reset_index())

    avgs = (df.groupby(["merk","handelsbenaming","year"], as_index=False)
              .agg(avg_price=("catalogusprijs","mean")))
    pivot_avgs = (avgs.pivot_table(index=["merk","handelsbenaming"], columns="year",
                                   values="avg_price")
                        .rename(columns={y: f"avg_{y}" for y in avgs["year"].unique()})
                        .reset_index())

    per_year = pivot_counts.merge(pivot_avgs, on=["merk","handelsbenaming"], how="left")

    for y in range(year_start, year_end+1):
        ccol, acol = f"count_{y}", f"avg_{y}"
        if ccol not in per_year.columns:
            per_year[ccol] = 0
        if acol not in per_year.columns:
            per_year[acol] = np.nan
        per_year.loc[per_year[ccol] == 0, acol] = np.nan
        per_year[acol] = pd.to_numeric(per_year[acol], errors="coerce").round(2)

    fuel_agg = (df.groupby(["merk","handelsbenaming"], as_index=False)
                  .agg(fuel_types_primary=("brandstof_omschrijving", unique_non_null)))

    econ_agg = (df.groupby(["merk","handelsbenaming"], as_index=False)
                  .agg(economy_rate=("zuinigheidsclassificatie", mode_non_null)))

    resold = (df.groupby(["merk","handelsbenaming"], as_index=False)
                .agg(resold_flag=("occasion_flag", lambda x: int((pd.Series(x) == 1).any()))))

    text_agg = (df.groupby(["merk","handelsbenaming"], as_index=False)
                  .agg(inrichting_std=("inrichting", mode_non_null)))

    med_agg = (df.groupby(["merk","handelsbenaming"], as_index=False)
                 .agg(seats_median=("aantal_zitplaatsen", median_non_null),
                      mass_empty_median=("massa_ledig_voertuig", median_non_null),
                      length_median=("lengte", median_non_null),
                      width_median=("breedte", median_non_null),
                      wheelbase_median=("wielbasis", median_non_null),
                      pw_ratio_median=("power_to_weight", median_non_null)))

    year_ref = (df.groupby(["merk","handelsbenaming"], as_index=False)
                  .agg(datum_eerste_toelating_year=("datum_eerste_toelating_year",
                                                    lambda v: pd.Series(v).mode().iloc[0]
                                                    if len(pd.Series(v).dropna()) else np.nan)))

    grouped = (per_year
               .merge(fuel_agg, on=["merk","handelsbenaming"], how="left")
               .merge(econ_agg, on=["merk","handelsbenaming"], how="left")
               .merge(resold,  on=["merk","handelsbenaming"], how="left")
               .merge(text_agg, on=["merk","handelsbenaming"], how="left")
               .merge(med_agg,  on=["merk","handelsbenaming"], how="left")
               .merge(year_ref, on=["merk","handelsbenaming"], how="left"))

    if "datum_eerste_toelating_year" in grouped.columns:
        grouped["datum_eerste_toelating_year"] = (
            pd.to_numeric(grouped["datum_eerste_toelating_year"], errors="coerce").astype("Int64")
        )

    for y in range(year_start, year_end+1):
        for prefix in ("count", "avg"):
            col = f"{prefix}_{y}"
            if col not in grouped.columns:
                grouped[col] = 0 if prefix == "count" else np.nan

    grouped = grouped.rename(columns={"merk":"brand","handelsbenaming":"model"})
    cols = ["brand","model",
            "fuel_types_primary","economy_rate","resold_flag",
            "inrichting_std",
            "seats_median","mass_empty_median","length_median","width_median","wheelbase_median",
            "pw_ratio_median",
            "datum_eerste_toelating_year"] + \
           [f"count_{y}" for y in range(year_start, year_end+1)] + \
           [f"avg_{y}"   for y in range(year_start, year_end+1)]
    grouped = grouped[cols].sort_values(
        [f"count_{year_end}", f"count_{max(year_start, year_end-1)}", "brand", "model"],
        ascending=[False, False, True, True]
    ).reset_index(drop=True)

    return grouped

In [None]:
_CTRL_RE = re.compile(r"[\x00-\x08\x0B\x0C\x0E-\x1F]")  # keep \t \n \r, drop others

def _scrub_text(x: Any) -> Optional[str]:
    """
    Return a safe UTF-8 JSON-friendly string:
    - convert to str (if not None)
    - normalize (NFC)
    - remove control chars except TAB/CR/LF
    - collapse odd whitespace
    """
    if x is None or (isinstance(x, float) and np.isnan(x)):
        return None
    s = str(x)
    s = unicodedata.normalize("NFC", s)
    s = _CTRL_RE.sub("", s)
    s = s.strip()
    s = re.sub(r"[ \t]{2,}", " ", s)
    return s if s else None

In [None]:
def _sanitize_for_bigquery(df: pd.DataFrame, year_start: int, year_end: int) -> pd.DataFrame:
    out = df.copy()

    int_cols = [f"count_{y}" for y in range(year_start, year_end + 1)]
    if "resold_flag" in out.columns:
        int_cols.append("resold_flag")
    if "datum_eerste_toelating_year" in out.columns:
        int_cols.append("datum_eerste_toelating_year")
    for c in int_cols:
        if c in out.columns:
            s = pd.to_numeric(out[c], errors="coerce").astype("Int64")
            out[c] = s.apply(lambda v: int(v) if pd.notna(v) else None)

    float_cols = [
        "seats_median","mass_empty_median","length_median","width_median","wheelbase_median",
        "pw_ratio_median",
        *[f"avg_{y}" for y in range(year_start, year_end + 1)],
    ]
    for c in float_cols:
        if c in out.columns:
            s = pd.to_numeric(out[c], errors="coerce").replace([np.inf, -np.inf], np.nan)
            out[c] = s.apply(lambda v: float(v) if pd.notna(v) else None)

    exp_str_cols = ["brand","model","fuel_types_primary","economy_rate","inrichting_std","image_url"]
    for c in exp_str_cols:
        if c in out.columns:
            out[c] = out[c].apply(_scrub_text)

    for c in out.columns:
        if c not in int_cols and c not in float_cols:
            if out[c].dtype == "object":
                out[c] = out[c].apply(_scrub_text)

    out = out.replace([np.inf, -np.inf], np.nan)
    out = out.where(pd.notnull(out), None)
    return out

### Google custom search (url images)

In [None]:
import concurrent.futures as _fut

# Domains: higher is better
_DOMAIN_WEIGHTS = {
    # Manufacturer / press
    "media.mercedes-benz.com": 10,
    "press.bmwgroup.com": 10,
    "bmw.com": 9,
    "mercedes-benz.com": 9,
    "volkswagen-newsroom.com": 9,
    "toyota.eu": 8,
    "press.toyota": 8,
    # Reputable automotive media
    "topgear.com": 7,
    "autocar.co.uk": 7,
    "caranddriver.com": 7,
    "motor1.com": 7,
    "ev-database.nl": 7,
    "autoweek.nl": 7,
    "autoweek.com": 6,
    "autoblog.nl": 6,
    "whatcar.com": 6,
    # Reference
    "wikipedia.org": 8,
    "wikimedia.org": 8,
    # Marketplaces/stock (downweight, still acceptable)
    "autotrader": 3,
    "cars.com": 3,
    "mobile.de": 3,
    "marktplaats.nl": 2,
}

_BAD_TOKENS = [
    "render", "concept", "spyshot", "spy-shot", "camouflage", "tuning", "widebody",
    "lego", "forza", "gran-turismo", "gta", "assetto", "mod", "wrap", "damage",
]

def _domain_weight(url: str) -> int:
    try:
        from urllib.parse import urlparse
        host = urlparse(url).netloc.lower()
    except Exception:
        return 0
    score = 0
    for dom, w in _DOMAIN_WEIGHTS.items():
        if dom in host:
            score = max(score, w)
    return score

def _tokenize(s: str) -> List[str]:
    s = unicodedata.normalize("NFC", s)
    s = re.sub(r"[_\-.,/]", " ", s)
    s = re.sub(r"\s{2,}", " ", s)
    return [t for t in s.lower().split() if t]

def _contains_bad_tokens(s: str) -> bool:
    low = s.lower()
    return any(bt in low for bt in _BAD_TOKENS)

def _google_search_images(brand: str, model: str, year_hint: Optional[int],
                          body_kw: List[str], fuel_kw: List[str],
                          api_key: str, cx: str,
                          timeout: int = 12, num: int = 10) -> List[Dict[str, Any]]:
    # Build base query variants
    base = f"{brand} {model}".strip()
    q_variants = []
    if year_hint:
        q_variants += [
            f"{base} {year_hint}",
            f"{base} {year_hint} photo",
        ]
    q_variants += [base, f"{base} car"]

    # Include body/fuel hints
    hints = []
    if body_kw:
        hints.append(" ".join(body_kw))
    if fuel_kw:
        hints.append(" ".join(fuel_kw))
    if hints:
        q_variants = [f"{q} {' '.join(hints)}" for q in q_variants]

    seen = set()
    items: List[Dict[str, Any]] = []
    for q in q_variants:
        try:
            r = requests.get(
                "https://www.googleapis.com/customsearch/v1",
                params={
                    "q": q,
                    "searchType": "image",
                    "num": max(1, min(num, 10)),
                    "safe": "active",
                    "key": api_key,
                    "cx": cx,
                    "gl": "nl",                  # country bias
                    "lr": "lang_nl|lang_en",     # language bias
                    "imgType": "photo",          # prefer real photos
                },
                timeout=timeout,
            )
            if not r.ok:
                continue
            for it in (r.json() or {}).get("items", []) or []:
                link = it.get("link") or (it.get("image") or {}).get("thumbnailLink")
                if not link or link in seen:
                    continue
                title = it.get("title") or ""
                context = it.get("image", {}).get("contextLink") or ""
                items.append({"link": link, "title": title, "context": context, "q": q})
                seen.add(link)
        except requests.RequestException:
            pass
        # Small delay between variants to be nice to API; increase if you see 429s
        time.sleep(0.05)
    return items

def _score_candidate(link: str, title: str, context: str,
                     brand: str, model_canon: str,
                     body_kw: List[str], year_hint: Optional[int]) -> float:
    # base tokens
    want_tokens = set(_tokenize(f"{brand} {model_canon}"))
    text = " ".join([link, title, context])
    have_tokens = set(_tokenize(text))

    # token overlap
    overlap = len(want_tokens & have_tokens)
    score = overlap * 3.0

    # domain quality
    score += _domain_weight(link) * 1.5

    # body style bonus
    if body_kw:
        if any(b in have_tokens for b in body_kw):
            score += 2.0

    # year bonus (not hard filter)
    if year_hint:
        if str(year_hint) in text:
            score += 1.5

    # penalties
    if _contains_bad_tokens(text):
        score -= 5.0

    # very small bonus if title contains exact model string
    if model_canon.lower() in title.lower():
        score += 0.8

    return score

def _pick_best_image_smart(brand_raw: str, model_raw: str,
                           year_hint: Optional[int],
                           inrichting: Optional[str],
                           fuel_text: Optional[str],
                           api_key: str, cx: str,
                           validate_urls: bool = False,
                           fallback_openverse: bool = True) -> Optional[str]:
    brand = _canon_brand(brand_raw)
    model_canon = _canon_model_tokens(model_raw)

    body_kw = _inrichting_to_body_keywords(inrichting)
    fuel_kw = []
    if fuel_text:
        ft = fuel_text.lower()
        if "elektr" in ft or "electric" in ft or "bev" in ft:
            fuel_kw.append("electric")
        elif "hybr" in ft or "plug" in ft or "phev" in ft:
            fuel_kw.append("hybrid")
        elif "diesel" in ft or "d" == ft.strip():
            fuel_kw.append("diesel")
        elif "benzine" in ft or "petrol" in ft or "gasoline" in ft:
            fuel_kw.append("petrol")

    items = _google_search_images(brand, model_canon, year_hint, body_kw, fuel_kw,
                                  api_key, cx, timeout=12, num=10)
    if not items and fallback_openverse:
        return _openverse_first(brand, model_canon)

    # score and choose best
    best_url = None
    best_score = -1e9
    for it in items:
        url = it["link"]
        title = it["title"] or ""
        ctx = it["context"] or ""
        sc = _score_candidate(url, title, ctx, brand, model_canon, body_kw, year_hint)
        if validate_urls and sc > best_score:
            if not _head_ok(url, timeout=8):
                continue
        if sc > best_score:
            best_score = sc
            best_url = url

    if best_url is None and fallback_openverse:
        return _openverse_first(brand, model_canon)
    return best_url

In [None]:
def _head_ok(url: str, timeout: int = 6) -> bool:
    try:
        h = requests.head(url, allow_redirects=True, timeout=timeout)
        if 200 <= h.status_code < 400:
            return True
        g = requests.get(url, stream=True, allow_redirects=True, timeout=timeout)
        return 200 <= g.status_code < 400
    except requests.RequestException:
        return False

def _openverse_first(brand: str, model: str) -> Optional[str]:
    try:
        resp = requests.get(
            "https://api.openverse.engineering/v1/images",
            params={"q": f"{brand} {model} car", "page_size": 1},
            timeout=8,
        )
        if resp.ok:
            res = (resp.json() or {}).get("results", [])
            if res:
                return res[0].get("thumbnail") or res[0].get("url")
    except requests.RequestException:
        pass
    return None

In [None]:
def add_image_column_google_parallel_smart(
    df_agg: pd.DataFrame,
    api_key: str,
    cx: str,
    *,
    max_workers: int = 10,           # increase if you have quota
    per_request_delay: float = 0.0,  # leave 0 when parallel
    validate_urls: bool = False,     # HEAD check; slower
    fallback_openverse: bool = True,
    top_n: Optional[int] = None,
    use_empty_list_if_missing: bool = True,
) -> pd.DataFrame:
    work = df_agg.copy()
    if top_n:
        sort_keys = [k for k in [f"count_{YEAR_END}", "brand", "model"] if k in work.columns]
        work = work.sort_values(sort_keys, ascending=[False, True, True]).head(top_n).copy()

    missing_sentinel = "[]" if use_empty_list_if_missing else None

    if not api_key or not cx:
        print("[WARN] GOOGLE_API_KEY or GOOGLE_CSE_ID missing. Skipping image enrichment.")
        work["image_url"] = missing_sentinel
        if top_n:
            full = df_agg.merge(work[["brand","model","image_url"]], on=["brand","model"], how="left")
            if use_empty_list_if_missing:
                full["image_url"] = full["image_url"].fillna(missing_sentinel)
            return full
        return work

    pairs = []
    for _, r in work.iterrows():
        pairs.append((
            str(r["brand"]), str(r["model"]),
            int(r["datum_eerste_toelating_year"]) if pd.notna(r.get("datum_eerste_toelating_year")) else None,
            r.get("inrichting_std"),
            r.get("fuel_types_primary"),
        ))

    # Deduplicate (brand, model, year, body, fuel) to reduce API calls
    uniq_pairs = list(dict.fromkeys(pairs))

    cache: Dict[Tuple[str, str, Optional[int], Optional[str], Optional[str]], Optional[str]] = {}

    def _task(bm_y_body_fuel: Tuple[str, str, Optional[int], Optional[str], Optional[str]]
             ) -> Tuple[Tuple[str, str, Optional[int], Optional[str], Optional[str]], Optional[str]]:
        brand, model, year_hint, inr, fuel = bm_y_body_fuel
        if per_request_delay > 0:
            time.sleep(per_request_delay)
        url = _pick_best_image_smart(
            brand, model, year_hint, inr, fuel,
            api_key=api_key, cx=cx,
            validate_urls=validate_urls,
            fallback_openverse=fallback_openverse,
        )
        return bm_y_body_fuel, url

    if uniq_pairs:
        with _fut.ThreadPoolExecutor(max_workers=max_workers) as ex:
            for key, url in ex.map(_task, uniq_pairs, chunksize=max(1, len(uniq_pairs)//(max_workers*4))):
                cache[key] = url

    urls = []
    for key in pairs:
        chosen = cache.get(key)
        if chosen is None and use_empty_list_if_missing:
            chosen = missing_sentinel
        urls.append(chosen)

    work = work.assign(image_url=urls)

    if top_n:
        full = df_agg.merge(work[["brand","model","image_url"]], on=["brand","model"], how="left")
        if use_empty_list_if_missing:
            full["image_url"] = full["image_url"].fillna(missing_sentinel)
        return full

    return work

In [None]:
def _canon_model_tokens(model: str) -> str:
    s = str(model).upper().strip()
    # common RDW-to-consumer fixes
    replacements = {
        r"\bA KLASSE\b": "A-Class",
        r"\bB KLASSE\b": "B-Class",
        r"\bC KLASSE\b": "C-Class",
        r"\bE KLASSE\b": "E-Class",
        r"\bS KLASSE\b": "S-Class",
        r"\b3 SERIE\b": "3 Series",
        r"\b5 SERIE\b": "5 Series",
        r"\b7 SERIE\b": "7 Series",
        r"\bID3\b": "ID.3",
        r"\bID4\b": "ID.4",
        r"\bID5\b": "ID.5",
        r"\bC HR\b": "C-HR",
        r"\bCX 5\b": "CX-5",
        r"\bCX 3\b": "CX-3",
        r"\bQASHQAI\b": "Qashqai",
        r"\bKUGA PLUG IN\b": "Kuga Plug-in",
        r"\bPLUG[- ]?IN\b": "Plug-in",
        r"\bHYBRID\b": "Hybrid",
    }
    for pat, rep in replacements.items():
        s = re.sub(pat, rep, s)
    # kill common engine codes / junk tokens
    s = re.sub(r"\b(\d\.\d|TSI|TFSI|TDI|CDI|dCi|DCI|HDI|BlueHDi|BlueTec|MHEV|PHEV|HEV|BEV|AWD|4MATIC)\b", "", s, flags=re.I)
    s = re.sub(r"\s{2,}", " ", s).strip()
    return s

def _canon_brand(brand: str) -> str:
    # Keep brand case nicely
    known = {
        "VW": "Volkswagen",
        "MB": "Mercedes-Benz",
        "MERCEDES": "Mercedes-Benz",
        "BMW": "BMW",
        "TOYOTA": "Toyota",
        "VOLVO": "Volvo",
        "PEUGEOT": "Peugeot",
        "CITROEN": "Citroën",
        "CITROËN": "Citroën",
        "OPEL": "Opel",
        "RENAULT": "Renault",
        "ŠKODA": "Škoda",
        "SKODA": "Škoda",
    }
    s = str(brand).strip()
    up = s.upper()
    return known.get(up, s)

def _inrichting_to_body_keywords(inrichting: Optional[str]) -> List[str]:
    if not inrichting or not str(inrichting).strip():
        return []
    s = str(inrichting).lower()
    # very rough mapping
    pairs = [
        ("hatchback", ["hatch", "hb"]),
        ("sedan", ["sedan", "saloon"]),
        ("wagon", ["station", "estate", "kombi", "touring"]),
        ("suv", ["suv", "mpv", "crossover"]),
        ("coupe", ["coupe", "coupé"]),
        ("convertible", ["cabrio", "convertible", "roadster"]),
        ("van", ["bestel", "van"]),
    ]
    out = []
    for key, toks in pairs:
        if any(t in s for t in toks + [key]):
            out.append(key)
    return out

### Main

In [None]:
def main():
    if PROJECT_ID == "YOUR_GCP_PROJECT_ID":
        print("Please set PROJECT_ID env var or edit the script with your GCP project id.", file=sys.stderr)
        return

    client = make_bq_client(PROJECT_ID)  # your existing function
    ensure_dataset(client, DATASET_ID, LOCATION)
    dataset_ref = f"{PROJECT_ID}.{DATASET_ID}"

    veh_csv     = f"vehicles_{YEAR_START}_{YEAR_END}.csv"
    join_csv    = f"vehicles_with_fuel_{YEAR_START}_{YEAR_END}.csv"
    agg_csv     = f"brand_model_peryear_{YEAR_START}_{YEAR_END}.csv"
    agg_img_csv = f"brand_model_peryear_with_images_{YEAR_START}_{YEAR_END}.csv"

    # Vehicles
    if os.path.exists(veh_csv):
        print(f"[LOAD] Found {veh_csv}; loading it instead of re-fetching.")
        parse_cols = ["first_registration_nl_date", "current_registration_date", "first_adm_date"]
        present_parse = [c for c in parse_cols if c in pd.read_csv(veh_csv, nrows=0).columns]
        df_veh = pd.read_csv(veh_csv, parse_dates=present_parse, low_memory=False)
        print(f"[LOAD] vehicles rows: {len(df_veh)}")
    else:
        print(f"[RDW] Fetching vehicles between {YEAR_START} and {YEAR_END} ...")
        df_veh = fetch_vehicles_between(YEAR_START, YEAR_END)
        print(f"[RDW] vehicles rows: {len(df_veh)}")
        if df_veh.empty:
            print("[ETL] No vehicles found; nothing to write.")
            return
        df_veh.to_csv(veh_csv, index=False)
        print(f"[SAVE] Wrote {veh_csv}")

    if "kenteken" in df_veh.columns:
        df_veh["kenteken"] = df_veh["kenteken"].map(_norm_plate)
    df_veh = compute_occasion_flag(df_veh)

    # Join with fuel or load
    if os.path.exists(join_csv):
        print(f"[LOAD] Found {join_csv}; loading it instead of re-fetching fuel.")
        parse_cols = ["first_registration_nl_date", "current_registration_date", "first_adm_date"]
        present_parse = [c for c in parse_cols if c in pd.read_csv(join_csv, nrows=0).columns]
        df_join = pd.read_csv(join_csv, parse_dates=present_parse, low_memory=False)
        print(f"[LOAD] vehicles_with_fuel rows: {len(df_join)}")
        if "kenteken" in df_join.columns:
            df_join["kenteken"] = df_join["kenteken"].map(_norm_plate)
    else:
        plates = df_veh["kenteken"].dropna().unique().tolist() if "kenteken" in df_veh.columns else []
        df_fuel = fetch_fuel_primary_for_plates(plates, batch_size=400)
        print(f"[RDW] fuel rows: {len(df_fuel)}")
        right = df_fuel if not df_fuel.empty else pd.DataFrame(columns=["kenteken","brandstof_omschrijving"])
        df_join = df_veh.merge(right, on="kenteken", how="left")
        df_join.to_csv(join_csv, index=False)
        print(f"[SAVE] Wrote {join_csv}")
        print(f"[INFO] vehicles_with_fuel rows: {len(df_join)}")

    if "brandstof_omschrijving" in df_join.columns:
        non_null_fuel = df_join["brandstof_omschrijving"].notna().sum()
        print(f"[CHECK] rows with primary fuel present: {non_null_fuel} / {len(df_join)}")

    # Aggregate
    df_agg = build_brand_model_per_year(df_join, YEAR_START, YEAR_END)
    print(f"[AGG] Aggregated rows: {len(df_agg)}")
    if df_agg.empty:
        print("[ETL] Aggregation empty; nothing to write.")
        return

    df_agg.to_csv(agg_csv, index=False)
    print(f"[SAVE] Wrote {agg_csv}")

    # Smart, parallel image enrichment (better quality; slightly slower)
    df_agg = add_image_column_google_parallel_smart(
        df_agg,
        api_key=GOOGLE_API_KEY,
        cx=GOOGLE_CSE_ID,
        max_workers=10,
        per_request_delay=0.0,
        validate_urls=IMG_VALIDATE,
        fallback_openverse=IMG_FALLBACK_OV,
        top_n=IMG_TOP_N,
        use_empty_list_if_missing=True,
    )

    df_agg.to_csv(agg_img_csv, index=False)
    print(f"[SAVE] Wrote {agg_img_csv}")

    # Light numeric cast (optional readability)
    for y in range(YEAR_START, YEAR_END + 1):
        c = f"count_{y}"
        a = f"avg_{y}"
        if c in df_agg.columns:
            df_agg[c] = pd.to_numeric(df_agg[c], errors="coerce").fillna(0).astype("int64")
        if a in df_agg.columns:
            df_agg[a] = pd.to_numeric(df_agg[a], errors="coerce").round(2)
    if "resold_flag" in df_agg.columns:
        df_agg["resold_flag"] = pd.to_numeric(df_agg["resold_flag"], errors="coerce").fillna(0).astype("int64")

    # Sanitize strictly for BigQuery
    df_bq = _sanitize_for_bigquery(df_agg, YEAR_START, YEAR_END)
    rows = df_bq.to_dict(orient="records")

    count_fields = [bigquery.SchemaField(f"count_{y}", "INT64")  for y in range(YEAR_START, YEAR_END+1)]
    avg_fields   = [bigquery.SchemaField(f"avg_{y}",   "FLOAT")  for y in range(YEAR_START, YEAR_END+1)]

    schema = [
        bigquery.SchemaField("brand", "STRING", mode="REQUIRED"),
        bigquery.SchemaField("model", "STRING", mode="REQUIRED"),
        bigquery.SchemaField("fuel_types_primary", "STRING"),
        bigquery.SchemaField("economy_rate", "STRING"),
        bigquery.SchemaField("resold_flag", "INT64"),
        bigquery.SchemaField("inrichting_std", "STRING"),
        bigquery.SchemaField("seats_median", "FLOAT"),
        bigquery.SchemaField("mass_empty_median", "FLOAT"),
        bigquery.SchemaField("length_median", "FLOAT"),
        bigquery.SchemaField("width_median", "FLOAT"),
        bigquery.SchemaField("wheelbase_median", "FLOAT"),
        bigquery.SchemaField("pw_ratio_median", "FLOAT"),
        bigquery.SchemaField("datum_eerste_toelating_year", "INT64"),
        *count_fields,
        *avg_fields,
        bigquery.SchemaField("image_url", "STRING"),
    ]

    table_id = f"{dataset_ref}.{TBL_AGG}"
    upload_to_bigquery(
        client,
        rows,
        table_id,
        schema=schema,
        write_disposition="WRITE_TRUNCATE",
    )

    print(f"[DONE] BigQuery table written: {table_id}")
    print(f"       Window: {YEAR_START}..{YEAR_END}")
    print(f"       CSVs: {veh_csv}, {join_csv}, {agg_csv}, {agg_img_csv}")

In [None]:
if __name__ == "__main__":
    try:
        main()
    except KeyboardInterrupt:
        print("\nInterrupted by user.", file=sys.stderr)
        sys.exit(130)
    except Exception as e:
        print(f"\nUnexpected error: {e}", file=sys.stderr)
        sys.exit(1)

[BQ] Dataset exists: compact-garage-473209-u4.RDW_final
[LOAD] Found vehicles_2023_2025.csv; loading it instead of re-fetching.
[LOAD] vehicles rows: 2108141
[LOAD] Found vehicles_with_fuel_2023_2025.csv; loading it instead of re-fetching fuel.
[LOAD] vehicles_with_fuel rows: 2108141
[CHECK] rows with primary fuel present: 2108141 / 2108141
[AGG] Aggregated rows: 13326
[SAVE] Wrote brand_model_peryear_2023_2025.csv
[SAVE] Wrote brand_model_peryear_with_images_2023_2025.csv



Unexpected error: 400 Error while reading data, error message: JSON table encountered too many errors, giving up. Rows: 2; errors: 1. Please look into the errors[] collection for more details.; reason: invalid, message: Error while reading data, error message: JSON table encountered too many errors, giving up. Rows: 2; errors: 1. Please look into the errors[] collection for more details.; reason: invalid, message: Error while reading data, error message: JSON processing encountered too many errors, giving up. Rows: 2; errors: 1; max bad: 0; error percent: 0; reason: invalid, message: Error while reading data, error message: JSON parsing error in row starting at position 589: Parser terminated before end of string
ERROR:root:Internal Python error in the inspect module.
Below is the traceback from this internal error.



Traceback (most recent call last):
  File "/tmp/ipython-input-230879549.py", line 3, in <cell line: 0>
    main()
  File "/tmp/ipython-input-2840071918.py", line 123, in main
    upload_to_bigquery(
  File "/tmp/ipython-input-2380194694.py", line 30, in upload_to_bigquery
    job.result()
  File "/usr/local/lib/python3.12/dist-packages/google/cloud/bigquery/job/base.py", line 1048, in result
    return super(_AsyncJob, self).result(timeout=timeout, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/dist-packages/google/api_core/future/polling.py", line 261, in result
    raise self._exception
google.api_core.exceptions.BadRequest: 400 Error while reading data, error message: JSON table encountered too many errors, giving up. Rows: 2; errors: 1. Please look into the errors[] collection for more details.; reason: invalid, message: Error while reading data, error message: JSON table encountered too many errors, giving up. Rows: 

TypeError: object of type 'NoneType' has no len()