In [0]:
import pyspark.sql as sq
from pyspark.sql.functions import *
import os
import numpy as np

spark = sq.SparkSession.builder.master("local").appName("my app").config("spark.some.config.option", "some-value").getOrCreate()

In [0]:
poi = spark.read.csv(os.path.join(path,"POIList.csv"),header=True,inferSchema = True)
data = spark.read.csv(os.path.join(path,"DataSample.csv"),header=True,inferSchema = True)

In [0]:
data.count()

In [0]:
data.printSchema()

In [0]:
#***********************************
#********Data Cleanup***************
#***********************************
#Finding the suspicous IDs
suspicoius_data=(
                  data
                      .groupBy([' TimeSt', 'Latitude', 'Longitude'])
                      .agg(collect_list("_ID").alias("_ID2"))
                      .where(size("_ID2") > 1)
                ).select(explode("_ID2").alias("_ID"))
#Removing the suspicous IDs
data=data.join(aa, data._ID == aa._ID, "left_anti").drop(aa._ID)

In [0]:
data.count()

In [0]:
poi.show()

In [0]:
#Defining a function to calculate the distance between two point based on latitude and longitude
def dis(lat1,lon1,lat2,lon2):
    R=6371
    lon1=toRadians(lon1)
    lat1=toRadians(lat1)
    lon2=toRadians(lon2)
    lat2=toRadians(lat2)
    a=sin((lat1-lat2)/2)**2+cos(lat1)*cos(lat2)*sin((lon1-lon2)/2)**2
    c=2*atan2(sqrt(a),sqrt(1-a))
    d=R*c
    return d

In [0]:
#***************************************************
#***************Label*******************************
#***************************************************
#changing the cloumn name for poi list
poi = poi.select('POIID',col(" Latitude").alias("poiLatitude"), col("Longitude").alias("poiLongitude"))
#Cross joining the poi and data dfs
data=data.crossJoin(poi)
#Calculating the distance between two points and creating a new column
data=data.withColumn("Poidistance", dis(data['Latitude'],data['Longitude'], data['poiLatitude'],data['poiLongitude']))
data.show()

In [0]:
#Assign each request to the closest poi
data_with_min_dis=data.groupBy('_ID').min('Poidistance')
data=data.join(data_with_min_dis,(data['_ID'] == data_with_min_dis['_ID']) & (data['Poidistance'] == data_with_min_dis['min(Poidistance)'])).drop(data_with_min_dis._ID)

data=data.drop('Poidistance')


In [0]:
data.show()

In [0]:
#************************************************
#*****************Analysis***********************
#************************************************
#calculate the average and standard deviation of the distance between the POI to each of its assigned requests.
analysis1=data.groupBy('POIID').agg(avg("min(Poidistance)").alias('Average'), stddev("min(Poidistance)").alias('Standard_Deviation'))
analysis1.show()

In [0]:
#Calculate the radius and density (requests/area) for each POI.
analysis2=data.groupBy('POIID').agg(max("min(Poidistance)").alias('Radius'), count("min(Poidistance)").alias('Count'))
analysis2=analysis2.withColumn('density',analysis2['Count']/(analysis2['Radius']**2*np.pi))
analysis2.show()

In [0]:
#***************************************************
#*****************Model*****************************
#***************************************************
#From analysis 2, we have calculated the density and radius of each POI.
#To visualize the popularity of each POI, density has to be mapped between -10 and 10.
#Mathematical formula for that is f(x)=(b-a)*(x-min)/(max-min)+a; b=10, a=-10

analysis3=analysis2.toPandas() 
minn=np.min(analysis3['density'])
maxx=np.max(analysis3['density'])
analysis3['density(mapped)']=20*(analysis3['density']-minn)/(maxx-minn)-10
analysis3

Unnamed: 0,POIID,Radius,Count,density,density(mapped)
0,POI4,9349.57277,422,2e-06,-10.0
1,POI2,11531.820832,8749,2.1e-05,-9.698443
2,POI1,11531.820832,8749,2.1e-05,-9.698443
3,POI3,1474.580962,8802,0.001289,10.0
