#### Notebook to calculate geographical distances between two objects (with Lat/Long coordinates)

In [2]:
#Import geodesic and functions library
from geopy.distance import geodesic
import pyspark.sql.functions as F

StatementMeta(, a6d38ac9-24dc-4fd7-8f02-e1ee3451b1d6, 6, Finished, Available)

###### https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.sql.functions.udf.html

In [3]:
# Define a function that will return the distance between two points
from pyspark.sql.types import FloatType
# https://stackoverflow.com/questions/70315069/calculate-the-geographical-distance-in-pyspark-dataframe
@F.udf(returnType=FloatType())
def geodesic_udf(a, b):
    return geodesic(a, b).miles

StatementMeta(, a6d38ac9-24dc-4fd7-8f02-e1ee3451b1d6, 7, Finished, Available)

In [4]:
# Register the UDF
spark.udf.register("geodesic_udf", geodesic_udf)

StatementMeta(, a6d38ac9-24dc-4fd7-8f02-e1ee3451b1d6, 8, Finished, Available)

<function __main__.geodesic_udf(a, b)>

###### Here is an example of how to call the udf from SparkSQL....

In [None]:
%%sql
SELECT geodesic_udf(array(30.56954,-96.287102), array(29.53952,-95.53614))

###### For our service tech example, we have two Delta tables (ServiceTech and ZipCode) loaded via DataFlows G2....
* We are going to join these two tables together (via a CTE), and then persist (as a new delta table) all ServiceTech to ZipCode (i.e. Location) distances that are less than a given threshold (e.g., < 50 miles>)
* This new table can then be used (within a Power BI dataset) to locate Service Techs that are within a certain distance.

In [6]:
%%sql
WITH    CTE_SpatDist AS (
		SELECT  S.IdServiceTech, S.ServiceTech, S.ZipCode AS SvcTechZipCode, S.ZipArea AS SvcTechZipArea, 
				Z.ZipCode, Z.ZipArea,
				round(geodesic_udf(array(S.Latitude,S.Longitude), array(Z.Latitude,Z.Longitude)), 2) AS TheDistance
		FROM    ServiceTech S, ZipCode Z
		)
--INSERT INTO [ServiceTechLocationDistance] (IdServiceTech, ZipCode, Distance)
SELECT	IdServiceTech, ZipCode, TheDistance 
FROM	CTE_SpatDist
WHERE	TheDistance <= 50.00
ORDER BY IdServiceTech, TheDistance;

StatementMeta(, a6d38ac9-24dc-4fd7-8f02-e1ee3451b1d6, 10, Finished, Available)

<Spark SQL result set with 80 rows and 3 fields>

In [5]:
sqlStmt = """
WITH    CTE_SpatDist AS (
		SELECT  S.IdServiceTech, S.ServiceTech, S.ZipCode AS SvcTechZipCode, S.ZipArea AS SvcTechZipArea, 
				Z.ZipCode, Z.ZipArea,
				round(geodesic_udf(array(S.Latitude,S.Longitude), array(Z.Latitude,Z.Longitude)), 2) AS TheDistance
		FROM    ServiceTech S, ZipCode Z
		)
--INSERT INTO [ServiceTechLocationDistance] (IdServiceTech, ZipCode, Distance)
SELECT	IdServiceTech, ZipCode, TheDistance AS Distance
FROM	CTE_SpatDist
WHERE	TheDistance <= 50.00
ORDER BY IdServiceTech, TheDistance;
"""
ServiceTechLocationDistance = spark.sql(sqlStmt)
ServiceTechLocationDistance.write.mode("overwrite").format("delta").option("overwriteSchema", "true").save("Tables/ServiceTechLocationDistance")

StatementMeta(, 43a54dbc-2f7e-43e9-b97d-f93b4aa90e64, 9, Finished, Available)

###### Optional Example with a dummy dataset....

In [4]:
DF = spark.createDataFrame([[114.038696,  22.5315,  114.047302, 22.531799], [ 114.027901, 22.5228, 114.026299, 22.5238], [ 114.026299, 22.5238,114.024597,22.5271], [114.024597,  22.5271,114.024696,22.527201]], list('ABCD'))
DF.show()

StatementMeta(, 8e3266b7-e405-4326-b665-b7390f672eb6, 8, Finished, Available)

+----------+-------+----------+---------+
|         A|      B|         C|        D|
+----------+-------+----------+---------+
|114.038696|22.5315|114.047302|22.531799|
|114.027901|22.5228|114.026299|  22.5238|
|114.026299|22.5238|114.024597|  22.5271|
|114.024597|22.5271|114.024696|22.527201|
+----------+-------+----------+---------+



In [5]:
# Now, we will call the udf to calculate mileage between the two points (note: B&D are Lat, A&C are Long)
DF = DF.withColumn('Lengths/m', geodesic_udf(F.array("B", "A"), F.array("D", "C")))
DF.show()

StatementMeta(, 8e3266b7-e405-4326-b665-b7390f672eb6, 9, Finished, Available)

+----------+-------+----------+---------+-----------+
|         A|      B|         C|        D|  Lengths/m|
+----------+-------+----------+---------+-----------+
|114.038696|22.5315|114.047302|22.531799| 0.55049914|
|114.027901|22.5228|114.026299|  22.5238|0.123379074|
|114.026299|22.5238|114.024597|  22.5271|  0.2517901|
|114.024597|22.5271|114.024696|22.527201|0.009399389|
+----------+-------+----------+---------+-----------+



In [7]:
# If we wanted to persist the dataframe to a temporary view - so we can use it with SparkSQL....
DF.createOrReplaceTempView("DFEg")

StatementMeta(, 8e3266b7-e405-4326-b665-b7390f672eb6, 11, Finished, Available)

In [8]:
%%sql
SELECT current_date() As CurrDate, DF.*, geodesic_udf(array(B,A), array(D,C)) As TheDistance
FROM DFEg AS DF

StatementMeta(, 8e3266b7-e405-4326-b665-b7390f672eb6, 12, Finished, Available)

<Spark SQL result set with 4 rows and 7 fields>