In [76]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import mean,stddev,col, max, count
from rtree import index
import geopy.distance
import numpy as np
from math import pi

In [48]:
spark = SparkSession.builder.appName('pandasToSparkDF').getOrCreate()

In [49]:
df = spark.read.option("header",True).csv("data/DataSample.csv")
POI_df = spark.read.option("header",True).csv("data/POIList.csv")
df = df.withColumnRenamed(' TimeSt', 'TimeSt')
POI_df = POI_df.withColumnRenamed(' Latitude','Latitude')

In [50]:
df = df.drop_duplicates(subset=['TimeSt','Longitude','Latitude'])

In [51]:
idx = index.Index()
label_list = []
for i,row in enumerate(POI_df.rdd.toLocalIterator()):
    left = float(row['Latitude'])
    bottom = float(row['Longitude'])
    idx.insert(i, (left, bottom, left, bottom), obj=row['POIID'])

for i,row in enumerate(data.rdd.toLocalIterator()):
    lat = float(row['Latitude'])
    lon = float(row['Longitude'])
    label = [(i.object) for i in idx.nearest((lat, lon, lat, lon), 1, objects=True)]
    label_list.append(label[0])

In [52]:
pd_df = df.toPandas()
pd_POI = POI_df.toPandas()

In [53]:
pd_df['label'] = label_list
merged_df = pd_df.merge(pd_POI,left_on='label',right_on='POIID',how='inner').drop(['POIID'],axis=1)

In [54]:
distance =[]
for index, row in merged_df.iterrows():
    coord1 = (row['Latitude_x'],row['Longitude_x'])
    coord2 = (row['Latitude_y'],row['Longitude_y'])
    distance.append(geopy.distance.distance(coord1, coord2).km)

In [55]:
merged_df['distance'] = distance

In [57]:
df = spark.createDataFrame(merged_df)
df.show(5)

+-------+--------------------+-------+--------+-----------+----------+-----------+-----+----------+-----------+------------------+
|    _ID|              TimeSt|Country|Province|       City|Latitude_x|Longitude_x|label|Latitude_y|Longitude_y|          distance|
+-------+--------------------+-------+--------+-----------+----------+-----------+-----+----------+-----------+------------------+
|4771330|2017-06-21 00:16:...|     CA|      ON|  Kitchener|  43.41340|  -80.49600| POI3| 45.521629| -73.566024| 598.8795982954658|
|5413314|2017-06-21 01:04:...|     CA|      ON|Scarborough|  43.83598|  -79.22789| POI3| 45.521629| -73.566024|486.27942482395355|
|5440770|2017-06-21 01:07:...|     CA|      ON|  Shelburne|  44.07890|  -80.20620| POI3| 45.521629| -73.566024|  549.099294076385|
|5457784|2017-06-21 01:09:...|     CA|      QC|      Laval|  45.62360|  -73.68610| POI3| 45.521629| -73.566024|14.706970835770374|
|4912468|2017-06-21 01:28:...|     CA|      QC|   Montréal|  45.53080|  -73.55560| 

In [69]:
stats_df = df.groupBy('label').agg(stddev('distance').alias('std_distance'), mean('distance').alias('avg_distance'))
stats_df.show(5)

+-----+------------------+-----------------+
|label|      std_distance|     avg_distance|
+-----+------------------+-----------------+
| POI4|2186.7477855935313|694.7196420061714|
| POI1| 280.5154449923558|291.7158270749158|
| POI3|229.35317294726875|454.9645162746186|
+-----+------------------+-----------------+



In [44]:
merged_df.groupby(['label']).agg({'distance':['mean',np.std]})

Unnamed: 0_level_0,distance,distance
Unnamed: 0_level_1,mean,std
label,Unnamed: 1_level_2,Unnamed: 2_level_2
POI1,291.715827,280.515445
POI3,454.964516,229.353173
POI4,694.719642,2186.747786


In [89]:
radius_df = df.groupBy('label').agg(max('distance').alias('radius'))
requests_df = df.groupBy('label').agg(count('_ID').alias('requests'))
radius_df.show(5)
requests_df.show(5)

+-----+------------------+
|label|            radius|
+-----+------------------+
| POI1|1689.5303476129404|
| POI3|1551.9711029118484|
| POI4|14204.310487196022|
+-----+------------------+

+-----+--------+
|label|requests|
+-----+--------+
| POI1|    9698|
| POI3|    9817|
| POI4|     484|
+-----+--------+



In [92]:
radius_requests_df = radius_df.join(requests_df,radius_df.label == requests_df.label,"inner").show(truncate=False)
#radius_requests_df = radius_requests_df['radius'].map(lambda x: pi*x**2)
radius_requests_df = radius_requests_df.withColumn('abc', pi*col(radius)**2)
radius_requests_df.show(5)
# idx = 0
# for row in radius_df.rdd.toLocalIterator():
#     area=pi*row.radius**2
#     density.append(requests[index]/area)

+-----+------------------+-----+--------+
|label|radius            |label|requests|
+-----+------------------+-----+--------+
|POI4 |14204.310487196022|POI4 |484     |
|POI1 |1689.5303476129404|POI1 |9698    |
|POI3 |1551.9711029118484|POI3 |9817    |
+-----+------------------+-----+--------+



TypeError: 'NoneType' object is not subscriptable

In [45]:
radius_df = merged_df.groupby(['label']).agg(radius = ('distance','max')).reset_index()
requests = merged_df.groupby(['label'])['_ID'].count()
density = []
for index, row in radius_df.iterrows():
    area=pi*row['radius']**2
    density.append(requests[index]/area) 
radius_df['density'] = density
radius_df

Unnamed: 0,label,radius,density
0,POI1,1689.530348,0.001081435
1,POI3,1551.971103,0.001297363
2,POI4,14204.310487,7.635811e-07
