In [153]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField, StringType, IntegerType 
from pyspark.sql.types import ArrayType, DoubleType, BooleanType
from pyspark.sql.functions import col,array_contains


spark = SparkSession.builder.appName('eq_task').getOrCreate()

In [154]:
dataSample_df = spark.read.options(header=True,inferSchema='True').csv("DataSample.csv")
dataSample_df.printSchema()

root
 |-- _ID: integer (nullable = true)
 |--  TimeSt: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Province: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Latitude: double (nullable = true)
 |-- Longitude: double (nullable = true)



In [155]:
dataSample_df.take(5)

[Row(_ID=4516516,  TimeSt='2017-06-21 00:00:00.143', Country='CA', Province='ON', City='Waterloo', Latitude=43.49347, Longitude=-80.49123),
 Row(_ID=4516547,  TimeSt='2017-06-21 18:00:00.193', Country='CA', Province='ON', City='London', Latitude=42.9399, Longitude=-81.2709),
 Row(_ID=4516550,  TimeSt='2017-06-21 15:00:00.287', Country='CA', Province='ON', City='Guelph', Latitude=43.5776, Longitude=-80.2201),
 Row(_ID=4516600,  TimeSt='2017-06-21 15:00:00.307', Country='CA', Province='ON', City='Stratford', Latitude=43.3716, Longitude=-80.9773),
 Row(_ID=4516613,  TimeSt='2017-06-21 15:00:00.497', Country='CA', Province='ON', City='Stratford', Latitude=43.3716, Longitude=-80.9773)]

In [156]:
dataSample_df = dataSample_df.dropDuplicates([dataSample_df.columns[1],dataSample_df.columns[5],dataSample_df.columns[6]])

In [157]:
dataSample_df.printSchema()

root
 |-- _ID: integer (nullable = true)
 |--  TimeSt: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Province: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Latitude: double (nullable = true)
 |-- Longitude: double (nullable = true)



In [158]:
POI_df = spark.read.options(header=True,inferSchema='True').csv("POIList.csv")
POI_df.printSchema()

root
 |-- POIID: string (nullable = true)
 |--  Latitude: double (nullable = true)
 |-- Longitude: double (nullable = true)



In [159]:
POI_collect = POI_df.collect()
print(POI_collect)
for row in POI_collect:
        row_dict = row.asDict()
        print(row_dict)

[Row(POIID='POI1',  Latitude=53.546167, Longitude=-113.485734), Row(POIID='POI2',  Latitude=53.546167, Longitude=-113.485734), Row(POIID='POI3',  Latitude=45.521629, Longitude=-73.566024), Row(POIID='POI4',  Latitude=45.22483, Longitude=-63.232729)]
{'POIID': 'POI1', ' Latitude': 53.546167, 'Longitude': -113.485734}
{'POIID': 'POI2', ' Latitude': 53.546167, 'Longitude': -113.485734}
{'POIID': 'POI3', ' Latitude': 45.521629, 'Longitude': -73.566024}
{'POIID': 'POI4', ' Latitude': 45.22483, 'Longitude': -63.232729}


In [163]:
from math import sin, cos, sqrt, atan2, radians
import numpy as np
from pyspark.sql import Row
import geopy.distance

R = 6373.0
POI_collect = POI_df.collect()

def compute_distance(lat1, lon1, lat2, lon2):
    coords_1 = (lat1, lon1)
    coords_2 = (lat2, lon2)
    distance = geopy.distance.distance(coords_1, coords_2).km
    return distance

def assign_POI(lat1, lon1, POI_collect):
    min_distance = -1
    min_POIID = ""
    for row in POI_collect:
        row_dict = row.asDict()
        lat2 = float((row_dict[' Latitude']))
        lon2 = float((row_dict['Longitude']))
        POI_ID = row_dict['POIID']
        distance = compute_distance(lat1,lon1,lat2,lon2)
        if min_distance == -1:
            min_POIID = POI_ID
            min_distance = distance
        elif distance < min_distance:
            min_POIID = POI_ID
            min_distance = distance
    return min_POIID,min_distance
    
def rowwise_function(row):
    row_dict = row.asDict()
    lat1 = float((row_dict['Latitude']))
    lon1 = float((row_dict['Longitude']))
    row_dict['POI_label'],row_dict['POI_distance'] = assign_POI(lat1, lon1, POI_collect)
    newrow = Row(**row_dict)
    # return new row
    return newrow

data_rdd = dataSample_df.rdd
data_rdd_new = data_rdd.map(lambda row: rowwise_function(row))
label_df = spark.createDataFrame(data_rdd_new)
label_df.show(10)

+-------+--------------------+-------+--------+-----------+--------+----------+---------+------------------+
|    _ID|              TimeSt|Country|Province|       City|Latitude| Longitude|POI_label|      POI_distance|
+-------+--------------------+-------+--------+-----------+--------+----------+---------+------------------+
|5613403|2017-06-21 00:22:...|     CA|      ON|  Etobicoke| 43.6381|  -79.5787|     POI3| 521.2118298887846|
|5013924|2017-06-21 00:34:...|     CA|      ON|    Toronto| 43.6606|  -79.4635|     POI3| 511.7568283517813|
|5122425|2017-06-21 00:42:...|     CA|      AB|    Calgary| 51.1188| -113.9471|     POI1| 271.9251282789503|
|4571908|2017-06-21 01:01:...|     CA|      AB|    Calgary| 51.0876| -114.0214|     POI1|  275.998563527738|
|5447065|2017-06-21 01:08:...|     CA|      ON|    Toronto|43.66341| -79.38597|     POI3| 505.9988433145477|
|4843104|2017-06-21 01:22:...|     CA|      AB|   Edmonton| 53.4154| -113.4178|     POI1|15.236548938345878|
|5030419|2017-06-21

In [164]:
label_df.groupby('POI_label').agg({"POI_distance":"avg"}).show()
label_df.groupby('POI_label').agg({"POI_distance":"stddev"}).show()

+---------+------------------+
|POI_label| avg(POI_distance)|
+---------+------------------+
|     POI4|498.35931618839777|
|     POI1| 302.3116858567266|
|     POI3|452.65941750369603|
+---------+------------------+

+---------+--------------------+
|POI_label|stddev(POI_distance)|
+---------+--------------------+
|     POI4|  1476.2929907179434|
|     POI1|   412.9909775077995|
|     POI3|  224.12312197115074|
+---------+--------------------+



In [165]:
radius_df = label_df.groupby('POI_label').agg({"POI_distance":"max"})
count_df = label_df.groupby('POI_label').agg({"POI_distance":"count"})
density_df = radius_df.join(count_df,on=['POI_label'],how='inner')
def calculate_density(row):
    row_dict = row.asDict()
    radius = float((row_dict['max(POI_distance)']))
    count = float((row_dict['count(POI_distance)']))
    row_dict['density'] = count/math.pi*(radius**2)
    newrow = Row(**row_dict)
    # return new row
    return newrow
density_rdd = density_df.rdd
density_rdd_new = density_rdd.map(lambda row: calculate_density(row))
density_df = spark.createDataFrame(density_rdd_new)
density_df.show()

+---------+------------------+-------------------+--------------------+
|POI_label| max(POI_distance)|count(POI_distance)|             density|
+---------+------------------+-------------------+--------------------+
|     POI4|  9365.33569265043|                477|1.331726997756410...|
|     POI1| 11541.82947035032|               9726|4.124142845205549...|
|     POI3|1500.4532420098176|               9796| 7.020108690208324E9|
+---------+------------------+-------------------+--------------------+



In [166]:
import chart_studio.plotly as py 
import plotly.graph_objs as go #importing graphical objects
from plotly.offline import download_plotlyjs, init_notebook_mode, plot, iplot

In [167]:
colorsIdx = {'POI1': 'red', 'POI3': 'green', 'POI4':'blue'}
#cols      = label_df['POI_label'].map(colorsIdx)
data = go.Scattergeo(
        lon = [float(row.Longitude) for row in label_df.collect()],
        lat = [float(row.Latitude) for row in label_df.collect()],
        mode = 'markers',
        marker=dict(size=2, color=[colorsIdx[row.POI_label] for row in label_df.collect()])
        )

layout = dict(title = 'POI distibution',
              geo_scope = 'north america'
             )
choromap = go.Figure(data = [data],layout = layout)
iplot(choromap)

In [161]:
!pip3 install geopy

Collecting geopy
  Downloading geopy-2.1.0-py3-none-any.whl (112 kB)
[K     |████████████████████████████████| 112 kB 3.3 MB/s eta 0:00:01
[?25hCollecting geographiclib<2,>=1.49
  Downloading geographiclib-1.50-py3-none-any.whl (38 kB)
Installing collected packages: geographiclib, geopy
Successfully installed geographiclib-1.50 geopy-2.1.0
