In [1]:
import pyspark
import numpy as np
import pandas as pd
import os
from math import sin, cos, radians, sqrt, asin
from pyspark.sql.types import *
from pyspark.sql.window import Window as w
from pyspark.sql.functions import (
    dense_rank, monotonically_increasing_id, 
    udf, col, broadcast, row_number
)
from pyspark.sql import SparkSession

In [2]:
def distance(lat1, lon1, lat2, lon2):
    '''
    returns distance based on Haversine formula
    Note that this formula has up to 0.5% error 
    due to the Earth radius variance between Pole and Equator
    '''
    # The Earth Radius in Km
    r = 6371
    lat1, lon1, lat2, lon2 = list(map(radians, [lat1,lon1,lat2,lon2]))    
    d = 2*r*asin(sqrt(sin((lat2-lat1)/2)**2 + cos(lat1)*cos(lat2)*sin((lon2 - lon1)/2)**2))
    
    return d

udf_distance = udf(distance, DoubleType())

In [3]:
def col_trim(col:list):
    return list(map(str.strip, col))

In [4]:
spark = SparkSession.builder.master('local').getOrCreate()

In [5]:
df = spark.read.options(
    header='True',
    inferSchema='True',
    delimiter=',',
).csv(os.path.expanduser('DataSample.csv'))

In [6]:
df.show(10, truncate=False)

+-------+-----------------------+-------+--------+---------+--------+---------+
|_ID    | TimeSt                |Country|Province|City     |Latitude|Longitude|
+-------+-----------------------+-------+--------+---------+--------+---------+
|4516516|2017-06-21 00:00:00.143|CA     |ON      |Waterloo |43.49347|-80.49123|
|4516547|2017-06-21 18:00:00.193|CA     |ON      |London   |42.9399 |-81.2709 |
|4516550|2017-06-21 15:00:00.287|CA     |ON      |Guelph   |43.5776 |-80.2201 |
|4516600|2017-06-21 15:00:00.307|CA     |ON      |Stratford|43.3716 |-80.9773 |
|4516613|2017-06-21 15:00:00.497|CA     |ON      |Stratford|43.3716 |-80.9773 |
|4516693|2017-06-21 14:00:00.597|CA     |ON      |Kitchener|43.4381 |-80.5099 |
|4516771|2017-06-21 10:00:00.873|CA     |ON      |Sarnia   |42.961  |-82.373  |
|4516831|2017-06-21 12:00:00.950|CA     |ON      |London   |43.0091 |-81.1765 |
|4516915|2017-06-21 15:00:01.310|CA     |ON      |London   |43.0091 |-81.1765 |
|4516953|2017-06-21 16:00:01.700|CA     

In [7]:
df.printSchema()

root
 |-- _ID: integer (nullable = true)
 |--  TimeSt: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Province: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Latitude: double (nullable = true)
 |-- Longitude: double (nullable = true)



In [8]:
df.summary().show()

+-------+-----------------+--------------------+-------+--------+--------------+-----------------+------------------+
|summary|              _ID|              TimeSt|Country|Province|          City|         Latitude|         Longitude|
+-------+-----------------+--------------------+-------+--------+--------------+-----------------+------------------+
|  count|            22025|               22025|  22025|   22025|         22025|            22025|             22025|
|   mean|5067292.081770715|                null|   null|    null|          null| 47.9834844476742|-94.96728097525624|
| stddev|317557.7385258704|                null|   null|    null|          null|4.165092926499571|19.092000589488826|
|    min|          4516516|2017-06-21 00:00:...|     CA|      AB|108 Mile Ranch|          6.92742|        -136.53957|
|    25%|          4792092|                null|   null|    null|          null|          43.7476|          -113.814|
|    50%|          5067774|                null|   null|

In [9]:
# trim col names
df = df.toDF(*col_trim(df.columns))

In [10]:
df = df.dropDuplicates(['TimeSt','Latitude','Longitude'])

In [11]:
df.count()

19999

In [12]:
df_poi = spark.read.options(
    header='True',
    inferSchema='True',
    delimiter=',',
).csv(os.path.expanduser('POIList.csv'))

In [13]:
df_poi.show(10,truncate=False)

+-----+---------+-----------+
|POIID| Latitude|Longitude  |
+-----+---------+-----------+
|POI1 |53.546167|-113.485734|
|POI2 |53.546167|-113.485734|
|POI3 |45.521629|-73.566024 |
|POI4 |45.22483 |-63.232729 |
+-----+---------+-----------+



In [14]:
df_poi.printSchema()

root
 |-- POIID: string (nullable = true)
 |--  Latitude: double (nullable = true)
 |-- Longitude: double (nullable = true)



In [15]:
# trim col names
df_poi = df_poi.toDF(*col_trim(df_poi.columns))

In [16]:
df_poi = df_poi.dropDuplicates(['Latitude','Longitude'])

In [17]:
df_poi.show()

+-----+---------+-----------+
|POIID| Latitude|  Longitude|
+-----+---------+-----------+
| POI4| 45.22483| -63.232729|
| POI3|45.521629| -73.566024|
| POI1|53.546167|-113.485734|
+-----+---------+-----------+



In [18]:
df = df.withColumn('Request_id',row_number().over(w.orderBy(monotonically_increasing_id()))-1)

In [19]:
df.show(10)

+-------+--------------------+-------+--------+---------+--------+---------+----------+
|    _ID|              TimeSt|Country|Province|     City|Latitude|Longitude|Request_id|
+-------+--------------------+-------+--------+---------+--------+---------+----------+
|4516516|2017-06-21 00:00:...|     CA|      ON| Waterloo|43.49347|-80.49123|         0|
|4519209|2017-06-21 00:00:...|     CA|      ON|  Hanover| 44.1517| -81.0266|         1|
|4518130|2017-06-21 00:00:...|     CA|      ON|   London| 43.0004| -81.2343|         2|
|5368841|2017-06-21 00:00:...|     CA|      ON|   Nepean| 45.2778| -75.7563|         3|
|4521574|2017-06-21 00:00:...|     CA|      ON|Brantford| 43.1508| -80.2094|         4|
|4523455|2017-06-21 00:00:...|     CA|      ON|   London| 43.0091| -81.1765|         5|
|4522231|2017-06-21 00:00:...|     CA|      ON|  Chatham| 42.4247| -82.1755|         6|
|4522376|2017-06-21 00:00:...|     CA|      ON| Waterloo| 43.4634| -80.5201|         7|
|4524947|2017-06-21 00:00:...|  

In [20]:
# building cross join table and calculate distance for every possile pairs
df_crossjoin = df.select('Request_id','Latitude','Longitude').crossJoin(
    broadcast(
        df_poi.select(
            'POIID', 
            col('Latitude').alias('POI_Lat'), 
            col('Longitude').alias('POI_Lon')
        )
    )
)\
.withColumn(
    'Distance',
    udf_distance(
        'Latitude',
        'Longitude',
        'POI_Lat',
        'POI_Lon'
    )
)

In [21]:
# rank the distances for each user and pick the colsest one
result = df_crossjoin\
.withColumn(
    'distance_rank', 
    dense_rank()\
    .over(
        w.partitionBy('Request_id')\
        .orderBy(col('Distance'))
    )
)\
.filter(col('distance_rank') == 1)\
.drop('distance_rank')

In [22]:
output = df.alias('a').join(result.alias('b'), col('a.Request_id')==col('b.Request_id'), 'inner')\
.select([col('a.*')] + [col('b.'+i) for i in ['POIID','POI_Lat','POI_Lon','Distance']])

In [23]:
output.show(10)

+-------+--------------------+-------+--------+---------+--------+---------+----------+-----+---------+-----------+------------------+
|    _ID|              TimeSt|Country|Province|     City|Latitude|Longitude|Request_id|POIID|  POI_Lat|    POI_Lon|          Distance|
+-------+--------------------+-------+--------+---------+--------+---------+----------+-----+---------+-----------+------------------+
|4516516|2017-06-21 00:00:...|     CA|      ON| Waterloo|43.49347|-80.49123|         0| POI3|45.521629| -73.566024| 590.2189157995682|
|4519209|2017-06-21 00:00:...|     CA|      ON|  Hanover| 44.1517| -81.0266|         1| POI3|45.521629| -73.566024|173.13874570016765|
|4518130|2017-06-21 00:00:...|     CA|      ON|   London| 43.0004| -81.2343|         2| POI3|45.521629| -73.566024| 835.1729098141858|
|5368841|2017-06-21 00:00:...|     CA|      ON|   Nepean| 45.2778| -75.7563|         3| POI3|45.521629| -73.566024| 269.7406912064589|
|4521574|2017-06-21 00:00:...|     CA|      ON|Brantfor