In [None]:
import pandas as pd
import time
import requests
import logging
import random

INPUT_FILE = "../../../data/processed/dim_street.csv"
OUTPUT_FILE = "dim_street_enriched.csv"
LOG_FILE = "nominatim_street_enrichment.log"

NOMINATIM_URL = "https://nominatim.openstreetmap.org/search"

HEADERS = {
    # MUST be descriptive + real contact
    "User-Agent": "street-verification/1.0 (contact: said.paul@gmail.com)"
}

BASE_DELAY = 1.2        # seconds (policy-safe)
MAX_RETRIES = 3

# --------------------
# Logging
# --------------------
logging.basicConfig(
    filename=LOG_FILE,
    filemode="w",
    level=logging.INFO,
    format="%(asctime)s | %(levelname)s | %(message)s"
)

logging.info("Starting Nominatim street enrichment")

df = pd.read_csv(INPUT_FILE)

for col in ["chatgpt_lat", "chatgpt_lon", "chatgpt_street_type"]:
    if col not in df.columns:
        df[col] = ""

def polite_sleep():
    time.sleep(BASE_DELAY + random.uniform(0.2, 0.6))

def query_nominatim(street, town):
    params = {
        "q": f"{street}, {town}, Malta",
        "format": "json",
        "addressdetails": 1,
        "limit": 3
    }

    for attempt in range(1, MAX_RETRIES + 1):
        try:
            r = requests.get(
                NOMINATIM_URL,
                params=params,
                headers=HEADERS,
                timeout=30
            )

            if r.status_code in (429, 503):
                wait = BASE_DELAY * attempt * 2
                logging.warning(
                    f"Rate-limited (HTTP {r.status_code}), "
                    f"retry {attempt}/{MAX_RETRIES}, sleeping {wait:.1f}s"
                )
                time.sleep(wait)
                continue

            r.raise_for_status()
            return r.json()

        except requests.RequestException as e:
            logging.error(
                f"Request error on attempt {attempt}/{MAX_RETRIES}: {e}"
            )
            time.sleep(BASE_DELAY * attempt)

    return []

for idx, row in df.iterrows():

    if not row.get("is_canonical", False):
        continue

    street = str(row.get("street", "")).strip()
    town = str(row.get("town_name", "")).strip()

    if not street or not town:
        logging.warning(f"Row {idx}: missing street/town")
        continue

    logging.info(f"Row {idx}: querying '{street}, {town}'")

    results = query_nominatim(street, town)
    logging.info(f"Row {idx}: {len(results)} result(s) returned")

    accepted = False

    if results:
        # If we got at least one result, accept the first one
        res = results[0]
        res_type = res.get("type")
        
        # ACCEPT
        df.at[idx, "chatgpt_lat"] = res.get("lat", "")
        df.at[idx, "chatgpt_lon"] = res.get("lon", "")
        df.at[idx, "chatgpt_street_type"] = res_type

        logging.info(
            f"Row {idx}: ACCEPTED "
            f"lat={res.get('lat')} lon={res.get('lon')} type={res_type}"
        )
        accepted = True

    if not accepted:
        logging.info(f"Row {idx}: NOT_FOUND")

    polite_sleep()

df.to_csv(OUTPUT_FILE, index=False)

logging.info("Completed enrichment safely")
logging.info(f"Output: {OUTPUT_FILE}")
logging.info(f"Log: {LOG_FILE}")

print("Done safely.")

# Configuration: Nominatim contact

Set an environment variable `NOMINATIM_CONTACT` to a valid email or URL per Nominatim usage policy so requests are accepted.

Examples (Windows PowerShell):

- `$env:NOMINATIM_CONTACT = "name@domain.tld"`
- `$env:NOMINATIM_CONTACT = "https://yourdomain.tld/contact"`

This notebook will use that value to build the `User-Agent`.

In [None]:
import time
import random
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
import pandas as pd

# -----------------------
# CONFIG
# -----------------------
INPUT_CSV = "../../../data/processed/dim_street.csv"
OUTPUT_CSV = "../../../data/processed/dim_street_with_chatgpt_cols.csv"
NOMINATIM_URL = "https://nominatim.openstreetmap.org/search"
ENABLE_REVERSE_FALLBACK = False  # set True to try reverse lookup when not class=highway
HEADERS = {
    "User-Agent": CONTACT_HEADER,
    "Accept-Language": "en"
}

SLEEP_SECONDS = 1

# -----------------------
# LOAD DATA
# -----------------------
df = pd.read_csv(INPUT_CSV)

for col in ["chatgpt_lat", "chatgpt_lon", "chatgpt_street_type", "chatgpt_road_class"]:
    if col not in df.columns:
        df[col] = None

canonical_mask = df["is_canonical"].astype(str).str.upper() == "TRUE"

total_rows = int(canonical_mask.sum())
if total_rows == 0:
    print("No canonical rows to process.")
else:
    print(f"Starting geocoding for {total_rows} canonical rows...")

# -----------------------
# Helpers
# -----------------------
ROAD_CLASS_NORMALIZATION = {
    "primary": "primary",
    "primary_link": "primary_link",
    "secondary": "secondary",
    "secondary_link": "secondary_link",
    "tertiary": "tertiary",
    "tertiary_link": "tertiary_link",
    "residential": "residential",
    "unclassified": "unclassified",
    "service": "service",
    "living_street": "living_street",
    "pedestrian": "pedestrian",
    "track": "track",
    "path": "path",
    "cycleway": "cycleway",
    "trunk": "trunk",
    "trunk_link": "trunk_link",
    "motorway": "motorway",
    "motorway_link": "motorway_link",
}

def extract_highway_type(result):
    cls = result.get("class")
    typ = result.get("type")
    extratags = result.get("extratags", {})
    if cls == "highway" and typ:
        return typ
    return extratags.get("highway")

def normalize_road_class(hw: str | None) -> str:
    if not hw:
        return "unknown"
    hw = str(hw).strip().lower()
    return ROAD_CLASS_NORMALIZATION.get(hw, hw)

# -----------------------
# HTTP session with retries
# -----------------------
session = requests.Session()
retry = Retry(
    total=3,
    backoff_factor=1,
    status_forcelist=[403, 429, 500, 502, 503, 504],
    allowed_methods=["GET"],
)
adapter = HTTPAdapter(max_retries=retry)
session.mount("https://", adapter)
session.mount("http://", adapter)

# -----------------------
# Counters
# -----------------------
successes = 0
no_matches = 0
skips = 0
errors = 0

# -----------------------
# GEOCODING LOOP
# -----------------------
for i, (idx, row) in enumerate(df[canonical_mask].iterrows(), start=1):
    street = str(row["street"]).strip()
    town = str(row["town_name"]).strip()

    percent_complete = (i / total_rows) * 100 if total_rows else 0
    print(f"Progress: {i}/{total_rows} ({percent_complete:.1f}%)")
    print(f"Query: street='{street}', town='{town}'")

    if not street or not town:
        print("Result: skipped (missing street or town)")
        skips += 1
        time.sleep(SLEEP_SECONDS + random.uniform(0.2, 0.7))
        continue

    params = {
        "q": f"{street}, {town}, Malta",
        "format": "jsonv2",
        "limit": 1,
        "addressdetails": 1,
        "extratags": 1
    }

    response = None
    try:
        response = session.get(
            NOMINATIM_URL,
            params=params,
            headers=HEADERS,
            timeout=30
        )
        response.raise_for_status()
        results = response.json()

        if results:
            result = results[0]

            lat = float(result.get("lat"))
            lon = float(result.get("lon"))
            df.at[idx, "chatgpt_lat"] = lat
            df.at[idx, "chatgpt_lon"] = lon

            result_class = result.get("class")
            result_type = result.get("type")
            highway_type = extract_highway_type(result)
            road_class = normalize_road_class(highway_type)
            df.at[idx, "chatgpt_street_type"] = highway_type
            df.at[idx, "chatgpt_road_class"] = road_class

            display_name = result.get("display_name", "")
            print(
                f"Result: lat={lat}, lon={lon}, class='{result_class}', type='{result_type}', highway='{highway_type}', road_class='{road_class}', display_name='{display_name[:120]}'"
            )
            print(
                "Output applied: lat={}, lon={}, road_class='{}'".format(lat, lon, road_class)
            )
            successes += 1
        else:
            print("Result: none (no matches)")
            no_matches += 1

    except Exception as e:
        status = getattr(response, "status_code", None)
        msg = f"Result: error ({e})"
        if status is not None:
            msg += f", status={status}"
        print(msg)
        if response is not None:
            try:
                txt = response.text or ""
                if txt:
                    print(f"Response snippet: {txt[:200].replace('\n',' ')}")
            except Exception:
                pass
        if status in (403, 429):
            time.sleep(2 + random.uniform(0, 1))
        errors += 1

    time.sleep(SLEEP_SECONDS + random.uniform(0.2, 0.7))

# -----------------------
# WRITE OUTPUT + SUMMARY
# -----------------------
df.to_csv(OUTPUT_CSV, index=False)

print(f"Done. Output written to {OUTPUT_CSV}")
print(f"Summary: successes={successes}, no_matches={no_matches}, skips={skips}, errors={errors}")
