In [1]:
import os
import time
import json
import requests
from typing import Dict, Any, List, Optional

In [2]:
locationiq_token = "pk.787388e33aac1f0c4ced6650d7a7e89d"

In [None]:
def load_jsonl(file_path):
    """Load a JSONL file and return a list of locations (strings)."""
    locations = []
    with open(file_path, "r", encoding="utf-8") as f:
        for line in f:
            if line.strip():  # skip empty lines
                obj = json.loads(line)
                locations.append(obj)
    return locations

In [4]:
def save_jsonl(data, file_path):
    """Save a list of dicts to JSONL."""
    with open(file_path, "w", encoding="utf-8") as f:
        for obj in data:
            f.write(json.dumps(obj, ensure_ascii=False) + "\n")


In [5]:
def build_name_query(rec: Dict[str, Any]) -> str:
    """
    Build a geocoding query string.
    Adjust keys as needed; we default to 'name' if present.
    """
    
    parts = [
        rec.get("market_name") or rec.get("apmc_name") or "",
        rec.get("district_name", ""),
        rec.get("state_name", "")
    ]
    base = " ".join(p for p in parts if p).strip()

    # Bias to India by adding the country token
    if base and "india" not in base.lower():
        base = f"{base} India"
    return base

In [6]:
def build_district_query(rec: Dict[str, Any]) -> Optional[str]:
    """
    Fallback query focusing on district + state (admin level).
    """
    dist = (rec.get("district_name") or "").strip()
    state = (rec.get("state_name") or "").strip()
    if not dist and not state:
        return None
    base = " ".join(p for p in [dist, state, "India"] if p)
    return base if base else None

In [7]:
def fetch_locationiq(q, api_key, retries=3, sleep_s=0.6):
    url = "https://us1.locationiq.com/v1/search"
    params = {
        "key": api_key,
        "q": q,
        "format": "json",
        "normalizecity": 1,
        "addressdetails": 1,
        "limit": 5,
        "countrycodes": "in",
        "accept-language": "en"
    }
    for attempt in range(retries):
        try:
            r = requests.get(url, params=params, timeout=20)
            r.raise_for_status()
            return r.json()
        except requests.RequestException:
            if attempt == retries - 1:
                return []
            time.sleep(sleep_s * (attempt + 1))
    return []

In [8]:
def select_best(results):
    if not results:
        return None
    # Prefer IN, then highest importance
    ranked = []
    for item in results:
        addr = item.get("address", {}) or {}
        cc = (addr.get("country_code") or "").lower()
        imp = item.get("importance", 0) or 0
        ranked.append((cc == "in", imp, item))
    ranked.sort(key=lambda t: (t[0], t[1]), reverse=True)
    best = ranked[0][2]
    return {
        "resolved_name": best.get("display_name"),
        "latitude": float(best.get("lat")) if best.get("lat") else None,
        "longitude": float(best.get("lon")) if best.get("lon") else None,
        "country_code": (best.get("address", {}).get("country_code") or "").upper(),
        "place_class": best.get("class"),
        "place_type": best.get("type"),
        "importance": best.get("importance"),
        "source": "locationiq"
    }

In [9]:
def resolve_by_name(rec: Dict[str, Any], api_key: str) -> Optional[Dict[str, Any]]:
    q = build_name_query(rec)
    results = fetch_locationiq(q, api_key=api_key)
    return select_best(results)

In [10]:
def resolve_by_district(rec: Dict[str, Any], api_key: str, cache: Dict[str, Dict[str, Any]]) -> Optional[Dict[str, Any]]:
    """
    Fallback: resolve using district+state. Uses a small in-memory cache to avoid
    repeating identical district queries.
    """
    q = build_district_query(rec)
    if not q:
        return None
    if q in cache:
        return cache[q]
    results = fetch_locationiq(q, api_key=api_key)
    best = select_best(results)
    cache[q] = best
    return best

In [None]:
input_file = "mandies.jsonl"
output_file = "mandies_20250907.jsonl"
locations = load_jsonl(input_file)
district_cache: Dict[str, Dict[str, Any]] = {}
api_key = locationiq_token
for loc in locations:
    print(f"Processing {loc['market_name']} {loc['state_name']}")
    best = resolve_by_name(loc, api_key=api_key)
    strategy = "name"
    # print(best)
    if not best or not best.get("latitude") or not best.get("longitude"):
        district_best = resolve_by_district(loc, api_key=api_key, cache=district_cache)
        if district_best:
            best = district_best
            strategy = "district"
            if best:
                loc.update(best)
                loc["resolution_strategy"] = strategy
            else:
                print(f"Unable to resolve >>>>>>>>>> {loc}")
    else:
        loc.update(best)
        loc["resolution_strategy"] = strategy
    
    # input("...")
save_jsonl(locations, output_file)
print(f"Enriched file saved to {output_file}")

In [None]:
input_file = "apmcs.jsonl"
output_file = "apmcs_20250907.jsonl"
locations = load_jsonl(input_file)
district_cache: Dict[str, Dict[str, Any]] = {}
api_key = locationiq_token
for loc in locations:
    print(f"Processing {loc['apmc_name']} {loc['state_name']}")
    best = resolve_by_name(loc, api_key=api_key)
    strategy = "name"
    # print(best)
    if not best or not best.get("latitude") or not best.get("longitude"):
        district_best = resolve_by_district(loc, api_key=api_key, cache=district_cache)
        if district_best:
            best = district_best
            strategy = "district"
            if best:
                loc.update(best)
                loc["resolution_strategy"] = strategy
            else:
                print(f"Unable to resolve >>>>>>>>>> {loc}")
    else:
        loc.update(best)
        loc["resolution_strategy"] = strategy
    
    # input("...")
save_jsonl(locations, output_file)
print(f"Enriched file saved to {output_file}")