In [0]:
USE CATALOG main;

CREATE SCHEMA IF NOT EXISTS geo_ref; -- world cities reference schema
CREATE SCHEMA IF NOT EXISTS src_geo; -- source geo contains raw locations to be normalized
CREATE SCHEMA IF NOT EXISTS gold_geo; -- gold_geo contains locations resolved table

##  Reference Table Data

In [0]:
CREATE OR REPLACE TABLE geo_ref.world_cities (
  id          BIGINT,
  city        STRING,
  city_ascii  STRING,
  admin_name  STRING,   -- state/province (admin1)
  country     STRING,
  lat         DOUBLE,
  lng         DOUBLE,
  population  BIGINT,
  timezone    STRING,
  location_text_ascii STRING
) USING DELTA;

In [0]:
INSERT OVERWRITE geo_ref.world_cities VALUES
  (1,  'San Jose',     'San Jose',   'California',         'United States', 37.3382, -121.8863,  1000000, 'America/Los_Angeles', NULL),
  (2,  'San José',     'San Jose',   'San José Province',  'Costa Rica',     9.9281,  -84.0907,   340000, 'America/Costa_Rica', NULL),
  (3,  'Paris',        'Paris',      'Île-de-France',      'France',        48.8566,    2.3522,  2148327, 'Europe/Paris', NULL),
  (4,  'Paris',        'Paris',      'Texas',              'United States', 33.6609,  -95.5555,    25000, 'America/Chicago', NULL),
  (5,  'Bengaluru',    'Bengaluru',  'Karnataka',          'India',         12.9716,   77.5946,  8443675, 'Asia/Kolkata', NULL),
  (6,  'Bangalore',    'Bangalore',  'Karnataka',          'India',         12.9716,   77.5946,  8443675, 'Asia/Kolkata', NULL),
  (7,  'München',      'Munchen',    'Bavaria',            'Germany',       48.1374,   11.5755,  1488202, 'Europe/Berlin', NULL),
  (8,  'Munich',       'Munich',     'Bavaria',            'Germany',       48.1374,   11.5755,  1488202, 'Europe/Berlin', NULL),
  (9,  'Springfield',  'Springfield','Illinois',           'United States', 39.7817,  -89.6501,   115000, 'America/Chicago', NULL),
  (10, 'Springfield',  'Springfield','Massachusetts',      'United States', 42.1015,  -72.5898,   155000, 'America/New_York', NULL);

UPDATE geo_ref.world_cities
SET location_text_ascii = CONCAT(
  COALESCE(city_ascii, city),
  ', ', admin_name,
  ', ', country
);

In [0]:
select * from geo_ref.world_cities

### Raw location normalized

In [0]:
CREATE OR REPLACE TABLE src_geo.raw_locations (
  rowid        BIGINT,
  city         STRING,
  state        STRING,
  country      STRING,
  lat          DOUBLE,
  lng          DOUBLE,
  population   BIGINT,
  timezone     STRING,
  reference_id BIGINT,
  confidence_score INT,
  stage        STRING,
  resolved_at  TIMESTAMP
) USING DELTA;

In [0]:
INSERT OVERWRITE src_geo.raw_locations (rowid, city, state, country) VALUES

  -- EXACT MATCH: All values match reference (city = city_ascii, state = admin_name, country).
  (101, 'San Jose',      'California',         'United States'),    -- exact ASCII spelling, matches ref city_ascii
  (102, 'San Jose',      'San José Province',  'Costa Rica'),       -- exact ASCII spelling, matches ref city_ascii
  (103, 'Paris',         'Île-de-France',      'France'),           -- exact ASCII spelling, matches ref city_ascii
  (104, 'Paris',         'Texas',              'United States'),    -- exact ASCII spelling, matches ref city_ascii
  (105, 'Bengaluru',     'Karnataka',          'India'),            -- exact, matches city_ascii for Bengaluru
  (106, 'Bangalore',     'Karnataka',          'India'),            -- exact, matches city_ascii for Bangalore
  (107, 'Munchen',       'Bavaria',            'Germany'),          -- exact, English ASCII for ref's city_ascii
  (108, 'Munich',        'Bavaria',            'Germany'),          -- exact, matches city_ascii in ref
  (109, 'Springfield',   'Illinois',           'United States'),    -- exact, matches city_ascii in ref
  (110, 'Springfield',   'Massachusetts',      'United States'),    -- exact, matches city_ascii in ref

  -- FUZZY MATCH: Minor typo, diacritic, or transliteration in city (state & country are exact).
  (201, 'San Josee',     'California',         'United States'),    -- extra 'e', Levenshtein 1 from 'San Jose'
  (202, 'San Hose',      'California',         'United States'),    -- substitution 'J'→'H', Levenshtein 1 from 'San Jose'
  (203, 'San José',      'San José Province',  'Costa Rica'),       -- accented 'é'; city_ascii in reference is 'San Jose'
  (204, 'Pariss',        'Île-de-France',      'France'),           -- double 's', Levenshtein 1 from 'Paris'
  (205, 'Bangaluru',     'Karnataka',          'India'),            -- vowel swapped, Levenshtein-1 from 'Bengaluru'
  (206, 'Bengalur',      'Karnataka',          'India'),            -- missing trailing 'u', Levenshtein-1 from 'Bengaluru'
  (207, 'München',       'Bavaria',            'Germany'),          -- umlaut, city_ascii in ref is 'Munchen'
  (208, 'Muenchen',      'Bavaria',            'Germany'),          -- alternative transliteration (ue for ü), not an exact match to any city_ascii
  (209, 'Springfeld',    'Illinois',           'United States'),    -- missing 'i', Levenshtein-1 from 'Springfield'
  (210, 'Sprinfgield',   'Massachusetts',      'United States'),    -- transposed letters, Levenshtein-1 from 'Springfield'

  -- VECTOR/SEMANTIC MATCH: Abbreviations or semantic equivalents, but admin_name/country correct.
  (301, 'SJ',                  'California',        'United States'),     -- abbreviation/initialism for 'San Jose'
  (302, 'CR San Jose',         'San José Province', 'Costa Rica'),        -- informal/compound, 'CR' for 'Costa Rica'
  (303, 'Paris TX',            'Texas',             'United States'),     -- state as part of city, common colloquial format
  (304, 'BLR',                 'Karnataka',         'India'),             -- airport/tech abbreviation for 'Bangalore/Bengaluru'
  (305, 'Bangalore City',      'Karnataka',         'India'),             -- unnecessary "City" label
  (306, 'MUC',                 'Bavaria',           'Germany'),           -- airport code for 'Munich'
  (307, 'Springfield IL',      'Illinois',          'United States'),     -- state abbreviation in city field
  (308, 'BGLR',                'Karnataka',         'India'),             -- common local abbreviation for Bengaluru/Bangalore
  (309, 'Munch',               'Bavaria',           'Germany'),           -- truncated city name for 'Munich/Munchen'
  (310, 'Paris France',        'Île-de-France',     'France'),            -- country in city field, semantic match for 'Paris'

  -- LLM: Ambiguous, nickname, or highly context-dependent city names. State/country is plausible.
  (401, 'Silicon Valley',      'California',        'United States'),     -- region nickname, commonly associated with 'San Jose'
  (402, 'City of Light',       'Île-de-France',     'France'),            -- globally known nickname for Paris
  (403, 'Bang',                'Karnataka',         'India'),             -- truncation/slang for Bangalore/Bengaluru
  (404, 'Capital Bavaria', 'Bavaria', 'Germany'),
  (405, 'Springfield', 'Unknown', 'United States'),
  -- Only city (misspelled) provided
  (406, 'Parees', '', ''),            -- ["Paris"]
  (407, 'Bangloore', '', ''),         -- ["Bangalore/Bengaluru"]
  (408, 'Sprngfeld', '', ''),         -- ["Springfield"]
  (409, 'Muniche', '', ''),           -- ["Munich"]
  -- Misspelled city with missing state or country
  (410, 'Munch', '', 'Germanny'),     -- ["Munich" in "Germany"]
  (411, 'Sao Jsoe', 'San José Province', ''),          -- ["San Jose"]
  (412, 'San J', 'Cali', 'United States');         --["San Jose, California"]

In [0]:
select * from src_geo.raw_locations

### Exact Match

In [0]:
-- Exact join and keep only rows with exactly 1 match
CREATE OR REPLACE TEMP VIEW exact_match AS
SELECT
  r.rowid, r.city, r.state, r.country,
  c.id AS reference_id, c.lat, c.lng, c.population, c.timezone,
  100 AS confidence_score
FROM src_geo.raw_locations r
JOIN geo_ref.world_cities c
  ON r.city    = COALESCE(c.city_ascii, c.city)
 AND r.state   = c.admin_name
 AND r.country = c.country;

CREATE OR REPLACE TEMP VIEW exact_unique AS
SELECT *
FROM (
  SELECT
    em.*,
    COUNT(*) OVER (PARTITION BY em.rowid) AS match_count
  FROM exact_match em
)
WHERE match_count = 1;

-- assume exact_unique already built (only exact matches, one per rowid)
-- exact_unique cols: rowid, lat, lng, population, timezone, reference_id, confidence_score

CREATE OR REPLACE TABLE gold_geo.location_resolved AS
SELECT
  r.rowid,
  r.city,
  r.state,
  r.country,
  eu.lat,
  eu.lng,
  eu.population,
  eu.timezone,
  eu.reference_id,
  eu.confidence_score,
  CASE WHEN eu.rowid IS NOT NULL THEN 'Exact' ELSE NULL END AS stage,
  current_timestamp() AS resolved_at
FROM src_geo.raw_locations r
LEFT JOIN exact_unique eu
  ON r.rowid = eu.rowid;


In [0]:
select * from gold_geo.location_resolved order by rowid ASC

In [0]:
-- Remaining rows to fuzzy stage
CREATE OR REPLACE TABLE gold_geo.to_fuzzy_state AS
SELECT r.*
FROM  gold_geo.location_resolved r
where r.stage IS NULL

In [0]:
select * from gold_geo.to_fuzzy_state

## Fuzzy Match

In [0]:
WITH candidates AS (
  SELECT
    r.rowid,
    r.city, r.state, r.country,
    c.id AS reference_id,
    COALESCE(c.city_ascii, c.city) AS ref_city_ascii,
    c.lat, c.lng, c.population, c.timezone,
    LEVENSHTEIN(UPPER(r.city), UPPER(COALESCE(c.city_ascii, c.city))) AS edit_dist,
    GREATEST(LENGTH(r.city), LENGTH(COALESCE(c.city_ascii, c.city)))  AS max_len
  FROM gold_geo.to_fuzzy_state r
  JOIN geo_ref.world_cities c
    ON UPPER(r.state)   = UPPER(c.admin_name)
   AND UPPER(r.country) = UPPER(c.country)
),
scored AS (
  SELECT
    *,
    CASE WHEN max_len = 0 THEN 0.0
         ELSE (1.0 - (edit_dist / max_len)) * 100.0
    END AS confidence_score
  FROM candidates
),
top1 AS (
  SELECT *
  FROM (
    SELECT
      rowid, reference_id, lat, lng, population, timezone,
      CAST(confidence_score AS INT) AS confidence_score,
      ROW_NUMBER() OVER (PARTITION BY rowid ORDER BY edit_dist ASC) AS rk
    FROM scored
  )
  WHERE rk = 1 AND confidence_score >= 85
)
SELECT * FROM top1;

## Create fuzzy_top1 temp view

In [0]:
CREATE OR REPLACE TEMP VIEW fuzzy_top1 AS
SELECT rowid, reference_id, lat, lng, population, timezone, confidence_score
FROM (
  WITH candidates AS (
    SELECT
      r.rowid,
      r.city, r.state, r.country,
      c.id AS reference_id,
      COALESCE(c.city_ascii, c.city) AS ref_city_ascii,
      c.lat, c.lng, c.population, c.timezone,
      LEVENSHTEIN(UPPER(r.city), UPPER(COALESCE(c.city_ascii, c.city))) AS edit_dist,
      GREATEST(LENGTH(r.city), LENGTH(COALESCE(c.city_ascii, c.city)))  AS max_len
    FROM gold_geo.to_fuzzy_state r
    JOIN geo_ref.world_cities c
      ON UPPER(r.state)   = UPPER(c.admin_name)
     AND UPPER(r.country) = UPPER(c.country)
  ),
  scored AS (
    SELECT
      *,
      CASE WHEN max_len = 0 THEN 0.0
           ELSE (1.0 - (edit_dist / max_len)) * 100.0
      END AS confidence_score
    FROM candidates
  ),
  top1 AS (
    SELECT *
    FROM (
      SELECT
        rowid, reference_id, lat, lng, population, timezone,
        CAST(confidence_score AS INT) AS confidence_score,
        ROW_NUMBER() OVER (PARTITION BY rowid ORDER BY edit_dist ASC) AS rk
      FROM scored
    )
    WHERE rk = 1 AND confidence_score >= 85
  )
  SELECT * FROM top1
);

## MERGE fuzzy matches into location_resolved

In [0]:
MERGE INTO gold_geo.location_resolved AS t
USING fuzzy_top1 AS m
ON t.rowid = m.rowid AND (t.stage IS NULL OR t.stage <> 'Exact')
WHEN MATCHED THEN UPDATE SET
  t.confidence_score = m.confidence_score,
  t.stage = 'FuzzyCity',
  t.resolved_at = current_timestamp(),
  t.reference_id = m.reference_id,
  t.lat = m.lat,
  t.lng = m.lng,
  t.population = m.population,
  t.timezone = m.timezone;

In [0]:
select * from gold_geo.location_resolved order by rowid ASC

## Vector Search

In [0]:
%python
%pip install databricks-vectorsearch
dbutils.library.restartPython()

In [0]:
ALTER TABLE main.geo_ref.world_cities SET TBLPROPERTIES (delta.enableChangeDataFeed = true)

## Create Delta Sync Vector Index from world_cities table

In [0]:
%python
from databricks.vector_search.client import VectorSearchClient

# Credentials are automatically detected from notebook context
vsc = VectorSearchClient()

try:
  index = vsc.create_delta_sync_index_and_wait(
    index_name="main.geo_ref.world_cities_index",
    endpoint_name="one-env-shared-endpoint-11",
    primary_key="id",
    source_table_name="main.geo_ref.world_cities",
    pipeline_type="TRIGGERED",
    embedding_model_endpoint_name="databricks-bge-large-en", # This model is used for ingestion, and is also used for querying unless model_endpoint_name_for_query is specified.
    embedding_source_column="location_text_ascii",
  )
  
except Exception as e:
  index = vsc.get_index(
    endpoint_name="one-env-shared-endpoint-14",
    index_name="main.geo_ref.world_cities_index",
)

## Define search_with_reranking Pandas UDF

In [0]:
%python
import pandas as pd
from databricks.vector_search.client import VectorSearchClient
from databricks.vector_search.reranker import DatabricksReranker
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import StructType, StructField, ArrayType, LongType, DoubleType, StringType, FloatType, Optional

# Define result schema matching world_cities table types
result_schema = StructType([
    StructField("id", LongType()),
    StructField("city_ascii", StringType()),
    StructField("admin_name", StringType()),
    StructField("country", StringType()),
    StructField("lat", DoubleType()),
    StructField("lng", DoubleType()),
    StructField("population", LongType()),
    StructField("timezone", StringType()),
    StructField("score", FloatType(), True)
])

# Output schema: array of structs with proper types
output_schema = StructType([
    StructField("results", ArrayType(result_schema))
])

vsc = VectorSearchClient()
index = vsc.get_index(
    endpoint_name="one-env-shared-endpoint-11",
    index_name="main.geo_ref.world_cities_index",
)

@pandas_udf(output_schema)
def search_with_reranking_udf(rowid: pd.Series, city: pd.Series, state: pd.Series, country: pd.Series, score_threshold: pd.Series) -> pd.DataFrame:
    all_results = []
    for i in range(len(rowid)):
        query_text = f"{city[i]}, {state[i]}, {country[i]}"
        filters = {"country": country[i]} if country[i] else None
        row_results = []
        
        try:
            res = index.similarity_search(
                query_text=query_text,
                columns=["id", "city_ascii", "admin_name", "country", "lat", "lng", "population", "timezone"],
                num_results=3,
                filters=filters,
                query_type="HYBRID",
                reranker=DatabricksReranker(
                    columns_to_rerank=[
                        "city_ascii",
                        "admin_name",
                        "location_text_ascii"
                    ]
                ),
                score_threshold=score_threshold[i]
            )
            columns = [c['name'] for c in res['manifest']['columns']]
            
            if res['result']['row_count'] > 0:
                for idx, row in enumerate(res['result']['data_array']):
                    result = dict(zip(columns, row))
                    
                    # Append with proper types
                    row_results.append((
                        int(result['id']) if result['id'] is not None else None,
                        result.get('city_ascii'),
                        result.get('admin_name'),
                        result.get('country'),
                        float(result['lat']) if result['lat'] is not None else None,
                        float(result['lng']) if result['lng'] is not None else None,
                        int(result['population']) if result['population'] is not None else None,
                        result.get('timezone'),
                        float(result['score']) if result['score'] is not None else None
                    ))
            else:
                # Retry without filters and score_threshold
                try:
                    res2 = index.similarity_search(
                        query_text=query_text,
                        columns=["id", "city_ascii", "admin_name", "country", "lat", "lng", "population", "timezone"],
                        num_results=5,
                        reranker=DatabricksReranker(
                            columns_to_rerank=[
                                "city_ascii",
                                "admin_name",
                                "location_text_ascii"
                            ]
                        ),
                    )
                    columns2 = [c['name'] for c in res2['manifest']['columns']]
                    
                    if res2['result']['row_count'] > 0:
                        for idx2, row2 in enumerate(res2['result']['data_array']):
                            result2 = dict(zip(columns2, row2))
                            
                            # Append with proper types
                            row_results.append((
                                int(result2['id']) if result2['id'] is not None else None,
                                result2.get('city_ascii'),
                                result2.get('admin_name'),
                                result2.get('country'),
                                float(result2['lat']) if result2['lat'] is not None else None,
                                float(result2['lng']) if result2['lng'] is not None else None,
                                int(result2['population']) if result2['population'] is not None else None,
                                result2.get('timezone'),
                                float(result2['score']) if result2['score'] is not None else None
                            ))
                except Exception as e2:
                    pass
        except Exception as e:
            pass
        
        all_results.append(row_results)
    
    return pd.DataFrame({"results": all_results})

## Apply search_with_reranking_udf to unresolved locations

In [0]:
%python
import pyspark.sql.functions as F

# Load unresolved locations from SQL
unresolved_df = spark.sql("SELECT rowid, city, state, country FROM gold_geo.location_resolved WHERE stage IS NULL")

vector_search_df = unresolved_df.withColumn(
    "vector_search_match_results",
    search_with_reranking_udf(
        F.col("rowid"),
        F.col("city"),
        F.col("state"),
        F.col("country"),
        F.lit(0.90)
    )
).withColumn('candidates', F.expr("transform(vector_search_match_results.results, x -> named_struct('reference_id', x.id, 'city_ascii', x.city_ascii, 'state/province', x.admin_name, 'country', x.country))"))

## LLM Validation for re-ranked records

In [0]:
%python
import json
from pyspark.sql.functions import col, from_json
import pyspark.sql.functions as F

LLM_ENDPOINT_NAME = 'databricks-gpt-5-mini'

response_format = json.dumps({
    "type": "json_schema",
    "json_schema": {
        "name": "candidate_selection",
        "schema": {
            "type": "object",
            "properties": {
                "reference_id": {"type": "integer", "description": "reference_id of the selected candidate"},
                "reasoning": {"type": "string", "description": "Reasoning for the selected id"}
            },
            "strict": True,
        },
    },
})

PROMPT = f"""
    You are a location validation assistant. Your task is to select the best matching reference location from a list of candidates returned by a vector search operation.

    INPUT:
    - Location Query Text: Contains city, state, and country fields (some may be empty or missing)
    - Candidates: Array of potential matching locations
    
    INSTRUCTIONS:
    1. Compare the location query text fields to each candidate''s corresponding fields.
    2. Select the candidate with the best overall match considering all available fields. If there is a single clear best match based on the available query text fields, output that candidate''s reference_id
    3. OUTPUT -1 in the following situations:
        a. AMBIGUITY: Multiple candidates are equally likely matches because the query text is missing critical context (e.g., missing state/country means you cannot distinguish between "Springfield, Illinois" vs "Springfield, Massachusetts")
        b. INSUFFICIENT CONTEXT: The query text lacks enough information to confidently select one candidate over others
        c. NO CLEAR MATCH: None of the candidates are a reasonable match for the query text
        d. CONFLICTING INFORMATION: The query text contains contradictory information that doesn''t align with any candidate
    4. DO NOT guess or make assumptions when context is missing

    OUTPUT FORMAT:
    {response_format}

    """


ai_query_expr = f"""
  CASE WHEN SIZE(vector_search_match_results.results) = 1 
    THEN to_json(named_struct('reference_id', CAST(vector_search_match_results.results[0].id AS INTEGER), 'reasoning', 'Vector search returned a single result'))
    ELSE ai_query(
    endpoint => '{LLM_ENDPOINT_NAME}',
    request => '{PROMPT}' || CONCAT(
        '\nLocation Query Text: city= ', COALESCE(city, ''), 
        ', state= ', COALESCE(state, ''), 
        ', country= ', COALESCE(country, ''), 
        '\nCandidates: ', CAST(candidates AS STRING)
    ),
    responseFormat => '{response_format}'
    ) 
    END AS response
  """



# the json schema of the LLM response string which we want to unpack
json_schema = "STRUCT<reference_id INTEGER, reasoning STRING>"

# run the batch query and unpack the response
vector_search_llm_validated_df = vector_search_df.selectExpr(
    "*", ai_query_expr
).withColumn(
    "reference_id",
    F.when(from_json(col("response"), json_schema).reference_id == -1, None).otherwise(from_json(col("response"), json_schema).reference_id)
).withColumn(
    "reasoning",
    from_json(col("response"), json_schema).reasoning
).withColumn(
    "matched_result",
    F.when(
        (F.col("reference_id") == -1) | (F.col("reference_id").isNull()),
        F.lit(None)
    ).otherwise(
        F.expr("filter(vector_search_match_results.results, x -> x.id = reference_id)[0]")
    )
)

vector_search_llm_validated_df.selectExpr("rowid", "city", "state", "country", "reference_id", "reasoning", "matched_result.city_ascii", "matched_result.admin_name", "matched_result.country", "matched_result.lat", "matched_result.lng", "matched_result.population", "matched_result.timezone", "matched_result.score").createOrReplaceTempView("vector_search_llm_validated")

In [0]:
SELECT *
FROM vector_search_llm_validated

## Merge to location resolved dataset

In [0]:
MERGE INTO gold_geo.location_resolved AS t
USING (SELECT * FROM vector_search_llm_validated WHERE reference_id IS NOT NULL) AS m
ON t.rowid = m.rowid AND (t.stage IS NULL OR t.stage NOT IN ('Exact', 'FuzzyCity'))
WHEN MATCHED THEN UPDATE SET
  t.stage = 'VectorSearch',
  t.confidence_score = m.score,
  t.resolved_at = current_timestamp(),
  t.reference_id = m.reference_id,
  t.lat = m.lat,
  t.lng = m.lng,
  t.population = m.population,
  t.timezone = m.timezone;

In [0]:
SELECT * FROM gold_geo.location_resolved