In [None]:
# POI Data Enrichment with Google Maps Places API (New)

This notebook enriches property data with Points of Interest (POI) using the Google Maps Places API v1 (New).

- Uses the same POI category table (`realitky.cleaned.poi_category`) with `category_code` and `max_results`.
- Selects properties to enrich from `realitky.cleaned.property` similarly to the Geoapify flow.
- Calls `places.googleapis.com/v1/places:searchNearby` with an appropriate field mask and type mapping.
- Transforms responses into the schema expected by `realitky.cleaned.property_poi` and upserts via MERGE.

Notes:
- Google Places v1 requires an API key and a Field Mask; this notebook requests a minimal set of fields.
- Some Geoapify categories don't have a perfect Google type equivalent; a best-effort mapping is included.

In [None]:
dbutils.widgets.text("api_key", "API_KEY", "Geoapify API Key")
dbutils.widgets.text("category_key", "1", "category_key")
dbutils.widgets.text("process_id", "manual", "Process ID")
dbutils.widgets.text("max_properties", "2", "Number of Records")
dbutils.widgets.dropdown("test_mode", "true", ["true", "false"], "Test Mode (limit to 5 records)")


# Get widget values
api_key = dbutils.widgets.get("api_key")
category_key = int(dbutils.widgets.get("category_key"))
process_id = dbutils.widgets.get("process_id")
max_properties = int(dbutils.widgets.get("max_properties"))


print(f"Configuration:")
print(f"- API Key: {'*' * (len(api_key) - 4) + api_key[-4:] if len(api_key) > 4 else 'NOT_SET'}")
print(f"- Categeory Key: {category_key}")
print(f"- Process ID: {process_id}")
print(f"- Number of properties: {max_properties}")

In [None]:
# Select POI Category and properties to enrich
row = spark.sql(f"""
    SELECT 
      category_code_google,
      max_results,
      max_distance_m
    FROM realitky.cleaned.poi_category 
    WHERE 
      category_key = {category_key} 
      AND del_flag = FALSE
""").first()

if row is None:
    raise ValueError(f"No category found for category_key={category_key}")

category_code = row['category_code_google']
max_results_cfg = int(row['max_results']) if row['max_results'] is not None else 10
radius_m = int(row['max_distance_m']) if row['max_distance_m'] is not None else 1000

# Apply test_mode caps
_test_mode = dbutils.widgets.get("test_mode").lower() == "true"
if _test_mode:
    max_properties = min(max_properties, 5)
    max_results_cfg = min(max_results_cfg, 5)

print(f"Category (google types str): {category_code}")
print(f"Max results (per property): {max_results_cfg}")
print(f"Radius (m): {radius_m}")
print(f"Test mode: {_test_mode}")

df_properties_to_be_enriched = spark.sql(f"""
    SELECT 
      property.property_id, 
      property.address_latitude, 
      property.address_longitude
    FROM realitky.cleaned.property AS property
    FULL OUTER JOIN realitky.stats.property_stats
      ON property_stats.property_id = property.property_id 
     AND property_stats.src_web = property.src_web
     AND property_stats.poi_places_check = TRUE
     AND property_stats.del_flag = FALSE
    WHERE 
      property.property_type_id IN (1, 2, 7, 15)
      AND property.address_latitude > 0
      AND property.address_longitude > 0
      AND property.del_flag = FALSE
    ORDER BY
      property_stats.ins_dt DESC,
      property_stats.upd_dt    
    LIMIT {max_properties}
""")
display(df_properties_to_be_enriched)

In [None]:
# Download POI data from Google Maps Places API v1 (Nearby Search)
import requests
import time
import json
from pyspark.sql.types import StructType
from datetime import datetime

# Google Places API v1 Nearby Search endpoint
BASE_URL = 'https://places.googleapis.com/v1/places:searchNearby'

# Parse google types: category_code may be a comma-separated string
if category_code is None or str(category_code).strip() == "":
    included_types = []
else:
    included_types = [t.strip() for t in str(category_code).split(',') if t.strip()]

# Build the request body for Nearby Search v1
# radius in meters from table; max_results_cfg limits number of returned results.
body_template = {
    "includedTypes": included_types,
    "maxResultCount": int(max_results_cfg) if max_results_cfg else 10,
    "rankPreference": "DISTANCE"
}

headers = {
    'Content-Type': 'application/json',
    'X-Goog-Api-Key': api_key,
    # Keep a conservative FieldMask with known-valid fields
    'X-Goog-FieldMask': ",".join([
        "places.id",
        "places.displayName",
        "places.location",
        "places.shortFormattedAddress",
        "places.types",
        "places.distanceMeters",
        "places.googleMapsUri"
    ])
}

all_pois = []

prop_count = df_properties_to_be_enriched.count()
print(f"Getting POIs for {prop_count} properties; includedTypes: {included_types}")

for idx, row in enumerate(df_properties_to_be_enriched.collect(), 1):
    property_id = row['property_id']
    address_latitude = float(row['address_latitude'])
    address_longitude = float(row['address_longitude'])
    body = dict(body_template)
    body["locationRestriction"] = {
        "circle": {
            "center": {"latitude": address_latitude, "longitude": address_longitude},
            "radius": float(radius_m)
        }
    }
    print(f"Requesting Google Places for property_id='{property_id}' at ({address_latitude}, {address_longitude}) with includedTypes={included_types}")
    try:
        response = requests.post(BASE_URL, headers=headers, data=json.dumps(body), timeout=10)
        response.raise_for_status()
        poi_data = response.json()
        all_pois.append({
            "property_id": property_id,
            "category_key": category_key,
            "poi_raw_response": json.dumps(poi_data, ensure_ascii=False)
        })
        places_count = len(poi_data.get('places', []))
        print(f"Success for property_id={property_id}, found {places_count} POIs.")
        time.sleep(0.1)
    except requests.exceptions.Timeout as e:
        print(f"TIMEOUT for property {property_id}: {e}\nBody: {body}")
        continue
    except requests.exceptions.HTTPError as e:
        print(f"HTTPError for property {property_id}: {e}\nStatus: {getattr(e.response, 'status_code', None)}")
        try:
            print(f"Response: {e.response.text[:500]}")
        except Exception:
            pass
        continue
    except Exception as e:
        print(f"Error for property {property_id}: {e}")
        continue

print(f"Finished POI download. Total successful: {len(all_pois)}")

if len(all_pois) > 0:
    df_all_pois = spark.createDataFrame(all_pois)
    display(df_all_pois)
else:
    df_all_pois = None
    print("No POIs found")

In [None]:
# POI Data Cleaning and Transformation
import json
from pyspark.sql.functions import from_json, col, explode_outer, schema_of_json, element_at, current_timestamp, lit, concat, udf, trim, when
from pyspark.sql.types import StringType, DoubleType

def _is_not_none_df(x):
    try:
        return x is not None
    except Exception:
        return False

if _is_not_none_df(df_all_pois):
    # UDFs to extract fields from Places v1 response for fallback name/address
    def extract_field(json_str, path_list):
        try:
            obj = json.loads(json_str)
            places = obj.get('places', [])
            for place in places:
                cur = place
                for p in path_list:
                    if isinstance(cur, dict) and p in cur:
                        cur = cur[p]
                    else:
                        cur = None
                        break
                if cur is not None:
                    return cur
            return None
        except Exception:
            return None

    extract_name_udf = udf(lambda x: extract_field(x, ['displayName', 'text']), StringType())
    extract_address1_udf = udf(lambda x: extract_field(x, ['shortFormattedAddress']), StringType())
    # address2 may be absent due to trimmed FieldMask; keep nullable
    extract_address2_udf = udf(lambda x: extract_field(x, ['formattedAddress']), StringType())

    # Infer schema from a sample
    sample_row = df_all_pois.select("poi_raw_response").filter(col("poi_raw_response").isNotNull()).first()
    if sample_row is not None:
        sample_json = sample_row["poi_raw_response"]
        inferred_schema = schema_of_json(sample_json)
    else:
        inferred_schema = schema_of_json('{}')

    # Parse JSON and explode
    df = df_all_pois.withColumn("json", from_json(col("poi_raw_response"), inferred_schema))
    df = df.withColumn("place", explode_outer(col("json.places")))

    # Fallback extracted fields
    df = df.withColumn("poi_name", extract_name_udf(col("poi_raw_response")))
    df = df.withColumn("poi_address1", extract_address1_udf(col("poi_raw_response")))
    df = df.withColumn("poi_address2", extract_address2_udf(col("poi_raw_response")))

    # Build URL if not provided
    google_uri_col = col("place.googleMapsUri")
    lat_col = col("place.location.latitude")
    lng_col = col("place.location.longitude")
    poi_url_expr = concat(lit("https://www.google.com/maps/search/?api=1&query="), lat_col.cast(StringType()), lit(","), lng_col.cast(StringType()))

    df_final = df.select(
        col("category_key"),
        col("property_id"),
        col("place").cast(StringType()).alias("poi_attributes"),
        lat_col.alias("poi_latitude"),
        lng_col.alias("poi_longitude"),
        col("poi_name"),
        col("place.id").alias("poi_id"),
        col("place.distanceMeters").alias("poi_distance_m"),
        col("poi_address1"),
        col("poi_address2"),
        lit("google").alias("data_source"),
        when(google_uri_col.isNotNull(), google_uri_col).otherwise(poi_url_expr).alias("poi_url"),
        current_timestamp().alias("ins_dt"),
        lit(process_id).alias("ins_process_id"),
        current_timestamp().alias("upd_dt"),
        lit(process_id).alias("upd_process_id"),
        lit(False).alias("del_flag")
    )

    df_final = df_final.filter(~((trim(col("poi_attributes")) == "{}") | (col("poi_attributes").isNull())))
    display(df_final)
else:
    print("No POIs found")

In [None]:
# Write POIs to realitky.cleaned.property_poi partitioned by category_key
if df_all_pois is not None:
    df_final.createOrReplaceTempView("tmp_property_poi")
    spark.sql(f"""
        MERGE INTO realitky.cleaned.property_poi AS target
        USING tmp_property_poi AS source
        ON target.property_id = source.property_id
            AND target.category_key = source.category_key
            AND target.poi_id = source.poi_id
        WHEN MATCHED 
            AND target.category_key = {category_key}
            AND (
                target.poi_attributes <> source.poi_attributes OR
                target.del_flag <> source.del_flag
            )
        THEN UPDATE SET
            target.poi_attributes = source.poi_attributes,
            target.poi_latitude = source.poi_latitude,
            target.poi_longitude = source.poi_longitude,
            target.poi_name = source.poi_name,
            target.poi_distance_m = source.poi_distance_m,
            target.poi_address1 = source.poi_address1,
            target.poi_address2 = source.poi_address2,
            target.data_source = source.data_source,
            target.poi_url = source.poi_url,
            target.upd_dt = source.upd_dt,
            target.upd_process_id = source.upd_process_id,
            target.del_flag = source.del_flag
        WHEN NOT MATCHED
            AND source.category_key = {category_key}
        THEN INSERT(
            category_key,
            property_id,
            poi_attributes,
            poi_latitude,
            poi_longitude,
            poi_name,
            poi_id,
            poi_distance_m,
            poi_address1,
            poi_address2,
            data_source,
            poi_url,
            ins_dt,
            ins_process_id,
            upd_dt,
            upd_process_id,
            del_flag
        ) VALUES(
            source.category_key,
            source.property_id,
            source.poi_attributes,
            source.poi_latitude,
            source.poi_longitude,
            source.poi_name,
            source.poi_id,
            source.poi_distance_m,
            source.poi_address1,
            source.poi_address2,
            source.data_source,
            source.poi_url,
            source.ins_dt,
            source.ins_process_id,
            source.upd_dt,
            source.upd_process_id,
            source.del_flag
        )
    """)
else:
    print("No POIs found")