In [0]:
import pandas as pd
from mlflow.deployments import get_deploy_client
from yaml import safe_load
from pyspark.sql.functions import col, concat, lit, when, udf
import time 
from pyspark.sql import functions as F
from pyspark.sql.types import DoubleType, StringType
from pyspark.sql.window import Window
import math

with open('../params.yml', 'r') as f:
    params = safe_load(f)
CATALOG = params.get('data_params')['catalog']
SCHEMA = params.get('data_params')['schema']

In [0]:
towers = spark.table(f"{CATALOG}.{SCHEMA}.sf_mobilelocations").select(['tower_id', 'lat', 'lon'])
triaged = (spark.read.option("readChangeData", "true")
      .option('startingVersion', '1').table(f"{CATALOG}.{SCHEMA}.triaged_devices")
)

triaged_latlon = triaged.join(towers, triaged.tower_id == towers.tower_id, how='left') \
            .select(triaged['*'], towers['lat'], towers['lon']).withColumnsRenamed({'lat': 'station_lat', 'lon': 'station_lon'})
                  # withColumn('status', F.when(F.col('solution').startswith('AUTOMATED'), F.col('solution')).otherwise(')))

display(triaged_latlon)

In [0]:
# Haversine distance in kilometers
def haversine(lat1, lon1, lat2, lon2):
    R = 6371.0
    phi1 = math.radians(lat1)
    phi2 = math.radians(lat2)
    dphi = math.radians(lat2 - lat1)
    dlambda = math.radians(lon2 - lon1)
    a = math.sin(dphi/2)**2 + math.cos(phi1)*math.cos(phi2)*math.sin(dlambda/2)**2
    return R * 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))

haversine_udf = F.udf(haversine, returnType=DoubleType())

# Broadcast the small table
df_small_b = F.broadcast(triaged_latlon)

routes = spark.table(f"{CATALOG}.{SCHEMA}.fieldtech_route")
max_date = routes.agg(F.max("date").alias("max_date")).collect()[0]["max_date"]
routes = routes.filter((F.col("date") == max_date) & (F.col("random_text") != "not scheduled"))
 
# Cross join and calculate distance
joined = (routes.crossJoin(df_small_b) 
    .withColumn("distance_km", haversine_udf(
        F.col("lat"), F.col("lon"), F.col("station_lat"), F.col("station_lon")))
    .filter(F.col('stop_number') < 10)
    .withColumn('triage_timestamp', F.current_timestamp())
    .select(['tower_id', 'device_id', 'device', 'solution','station_lat', 'station_lon', 'fieldtech_route.guid', 'fieldtech_route.name', 'fieldtech_route.stop_number', 'distance_km'])
)

solved_df = (joined.groupBy(['tower_id', 'device_id', 'device', 'guid', 'name', 'solution'])
             .agg(F.min('distance_km').alias('distance_km'), F.min('station_lat').alias('lat'), F.min('station_lon').alias('lon'))
             .withColumn('status', F.when(F.col('solution').startswith('AUTOMATED'), F.col('solution')).otherwise(F.concat(F.lit('Deploying Field Technician '), F.col('name'), lit(' with instruction '), F.col('solution'))))
             .select(['tower_id', 'device_id', 'device', 'status']).withColumn('event_timestamp', F.current_timestamp())
)

solved_df.write.insertInto(f"{CATALOG}.{SCHEMA}.device_status")