In [None]:
import csv
import requests
import time
import os
from urllib.parse import quote

In [None]:
INPUT_CSV = "scraped_addresses/sa_addresses_full.csv"
OUTPUT_CSV = "scraped_addresses/geocoded_results.csv"

API_URL_BASE = "https://lsa1.geohub.sa.gov.au/server/rest/services/Locators/SAGAF_Valuation/GeocodeServer/findAddressCandidates"
SEARCH_EXTENT = '{"xmin":128.9,"ymin":-39.5,"xmax":141.0005,"ymax":-25.9}'

# Fieldnames used in the output CSV
OUTPUT_FIELDS = [
    "input_address", "candidate_address", "score",
    "x", "y", "HouseNumber", "StreetName", "StreetType", "UnitNumber",
    "Locality", "State", "Postcode", "Valuation", "SystemValuation",
    "Ref_ID", "Addr_type"
]

In [None]:
# Create the output CSV file and write headers if it doesn't exist
def create_output_csv_if_needed():
    if not os.path.exists(OUTPUT_CSV):
        with open(OUTPUT_CSV, mode="w", newline="", encoding="utf-8") as f:
            writer = csv.DictWriter(f, fieldnames=[
                "input_address", "candidate_address", "score",
                "x", "y", "HouseNumber", "StreetName", "StreetType", "UnitNumber",
                "Locality", "State", "Postcode", "Valuation", "SystemValuation",
                "Ref_ID", "Addr_type"
            ])
            writer.writeheader()

In [None]:
# Load already seen records from the output CSV
def load_seen_records():
    seen = set()
    if os.path.exists(OUTPUT_CSV):
        with open(OUTPUT_CSV, mode="r", newline="", encoding="utf-8") as f:
            reader = csv.DictReader(f)
            for row in reader:
                seen.add((row["input_address"], row["candidate_address"]))
    return seen

In [None]:
# Append new candidate results to the output CSV if they are not duplicates
def append_candidates_to_csv(input_address, candidates, seen_records):
    with open(OUTPUT_CSV, mode="a", newline="", encoding="utf-8") as f:
        writer = csv.DictWriter(f, fieldnames=OUTPUT_FIELDS)
        new_count = 0
        for c in candidates:
            attr = c["attributes"]
            candidate_address = attr.get("Match_addr", "")
            key = (input_address, candidate_address)
            if key in seen_records:
                continue  # Skip duplicates
            writer.writerow({
                "input_address": input_address,
                "candidate_address": candidate_address,
                "score": c.get("score", ""),
                "x": c["location"].get("x", ""),
                "y": c["location"].get("y", ""),
                "HouseNumber": attr.get("HouseNumber", ""),
                "StreetName": attr.get("StreetName", ""),
                "StreetType": attr.get("StreetType", ""),
                "UnitNumber": attr.get("UnitNumber", ""),
                "Locality": attr.get("Locality", ""),
                "State": attr.get("State", ""),
                "Postcode": attr.get("Postcode", ""),
                "Valuation": attr.get("Valuation", ""),
                "SystemValuation": attr.get("SystemValuation", ""),
                "Ref_ID": attr.get("Ref_ID", ""),
                "Addr_type": attr.get("Addr_type", "")
            })
            seen_records.add(key)
            new_count += 1
        if new_count > 0:
            print(f"✓ Added {new_count} new candidates.")
        else:
            print("– No new candidates to add (all duplicates).")

In [None]:
# Query the SAGAF Valuation API
def query_geocode_service(full_address):
    params = {
        "Single Line Input": full_address,
        "f": "json",
        "maxLocations": "10",
        "outfields": "*",
        "matchOutOfRange": "true",
        "searchExtent": SEARCH_EXTENT
    }

    try:
        response = requests.get(API_URL_BASE, params=params, timeout=10)
        response.raise_for_status()
        data = response.json()
        return data.get("candidates", [])
    except Exception as e:
        print(f"Error querying address '{full_address}': {e}")
        return []

In [None]:
# Main loop
def main():
    create_output_csv_if_needed()
    seen_records = load_seen_records()

    with open(INPUT_CSV, mode="r", encoding="utf-8") as csvfile:
        reader = csv.DictReader(csvfile)
        for row in reader:
            full_address = row["full address"]
            print(f"Processing: {full_address}")
            candidates = query_geocode_service(full_address)
            append_candidates_to_csv(full_address, candidates, seen_records)
            time.sleep(0.5)  # Avoid hammering the server

In [None]:
if __name__ == "__main__":
    main()