In [8]:
# NOTES ON CSV FILE EDITS
# - header of DataSample.csv manually edited to remove an unexpected space
# - header of POIList.csv manually edited to remove an unexpected space
#   and to provide header names different from DataSample.csv

import math
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.functions import udf
from pyspark.sql.types import FloatType
from geopy import distance

spark = SparkSession.builder.master("local[*]") \
                    .getOrCreate()

df_data = spark.read.option("header",True) \
                    .csv("DataSample.csv")
df_data.createOrReplaceTempView("data")

# clean data by collecting duplicates together
# and only keeping the first entry
df_data = spark.sql("SELECT * FROM data x \
                    WHERE x._ID IN \
                      (SELECT _ID FROM \
                         (SELECT _ID, ROW_NUMBER() OVER \
                            (PARTITION BY data.TimeSt, data.Country, data.Province, data.City, \
                                data.Latitude, data.Longitude ORDER BY _ID) dup \
                            FROM data) \
                            WHERE dup < 2);")

# filter out implausible points via a box bounding Canada's extremes
df_data = df_data.filter(df_data["Latitude"]>41.681389)
df_data = df_data.filter(df_data["Latitude"]<83.111389)
df_data = df_data.filter(df_data["Longitude"]>-141.001944)
df_data = df_data.filter(df_data["Longitude"]<-52.619444)

# output for question 1
df_data.write.option("header",True) \
             .csv("cleaned-data")

df_pois = spark.read.options(header='True',inferSchema='True',delimiter=',') \
                    .csv("POIList.csv")
df_pois.createOrReplaceTempView("pois")

# remove duplicate POIs from the list of POIs
# depending on what POIs represent, we might understand that
# there could be multiple POIs in one location, and requests
# should be distributed evenly among them, but here I will
# assume that duplicates are just a mistake
df_pois = spark.sql("SELECT * FROM pois x \
                    WHERE x.POIID IN \
                      (SELECT POIID FROM \
                         (SELECT POIID, ROW_NUMBER() OVER \
                            (PARTITION BY pois.POILatitude, \
                            pois.POILongitude ORDER BY POIID) dup \
                            FROM pois) \
                            WHERE dup < 2);")

# redefining distance function so it's usable in dataframe
def dist(a,b,x,y):
    return distance.distance((a,b),(x,y)).km
udf_dist = udf(dist, FloatType())

# create a new dataframe of size count(POIs)*requests, 
# annotated with distance between each request and each POI
distances = df_data.join(df_pois).withColumn('Distance', udf_dist(df_data.Latitude,df_data.Longitude, \
                                                                  df_pois.POILatitude,df_pois.POILongitude))
# find min distance per request
min_distances = distances.groupBy('_ID').min('Distance')
# annotate distances with min distances, then filter out all non-min POIs
# and delete unnecessary columns
distances = distances.join(min_distances, distances._ID == min_distances._ID) \
                     .select(distances["*"],min_distances["min(Distance)"])
distances = distances.filter(distances["Distance"]==distances["min(Distance)"])
df_data = distances.drop("min(Distance)", "POILatitude", "POILongitude")

# output for question 2
df_data.write.option("header",True) \
             .csv("assigned-data")

def density(count, radius):
    return count / (math.pi*(radius**2.0))
udf_density = udf(density, FloatType())

# aggregating desired statistics together in a single dataframe
stats = df_data.groupBy('POIID').count()
poistats = df_pois.join(stats, df_pois.POIID == stats.POIID).select(df_pois["*"],stats["count"])
stats = df_data.groupBy('POIID').agg({'Distance': 'mean'})
poistats = poistats.join(stats, poistats.POIID == stats.POIID).select(poistats["*"],stats["avg(Distance)"])
stats = df_data.groupBy('POIID').agg({'Distance': 'stddev'})
poistats = poistats.join(stats, poistats.POIID == stats.POIID).select(poistats["*"],stats["stddev(Distance)"])
stats = df_data.groupBy('POIID').agg({'Distance': 'max'})
poistats = poistats.join(stats, poistats.POIID == stats.POIID).select(poistats["*"],stats["max(Distance)"])
poistats = poistats.withColumn("Density", udf_density(poistats["count"], poistats["max(Distance)"]))

# write to file, commented out line produces a single .csv file
# poistats.repartition(1).write.option("header",True).csv("poi-stats", sep=',')
poistats.write.option("header",True) \
              .csv("poi-stats")

In [7]:
import folium
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = SparkSession.builder.master("local[*]") \
                    .getOrCreate()

# read in statistics file produced earlier
poistats = spark.read.format("csv") \
                .options(header='True',inferSchema='True',delimiter=',') \
                .load("./poi-stats/*.csv")
poistats.createOrReplaceTempView("data")

# generate map
poi_map = folium.Map(location=[57.4, -96.466667], \
               zoom_start=3)

# iterate over each POI mapping desired features
for f in poistats.collect(): 
    folium.Circle(
    radius=f["max(Distance)"]*1000.0, # convert from km to m
    location=[f.POILatitude, f.POILongitude],
    popup=f.POIID,
    color='crimson',
    fill=False,
    ).add_to(poi_map)
    
poi_map.save('map.html')

# output for question 3
poistats.show()
poi_map

+-----+-----------+------------+-----+-----------------+------------------+-------------+------------+
|POIID|POILatitude|POILongitude|count|    avg(Distance)|  stddev(Distance)|max(Distance)|     Density|
+-----+-----------+------------+-----+-----------------+------------------+-------------+------------+
| POI1|  53.546167| -113.485734| 9349|294.7689993162023|287.90310608983543|    1689.5304|0.0010425173|
| POI3|  45.521629|  -73.566024| 9228|451.7778564812135|224.50555635814914|    1500.4532|0.0013047063|
| POI4|   45.22483|  -63.232729|  454|238.8843975781344|224.98301488742166|    857.25476|1.9664648E-4|
+-----+-----------+------------+-----+-----------------+------------------+-------------+------------+

