# Load 
Loading data and removing whitespace from header

In [1]:
import os
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = SparkSession.builder.master("local").getOrCreate()
samples = spark.read.csv(
    os.path.expanduser("~/data/DataSample.csv"),
    header=True,
    sep=",",
    inferSchema="True",
)
points = spark.read.csv(
    os.path.expanduser("~/data/POIList.csv"), header=True, sep=",", inferSchema="True"
)
# removing excess whitespace in headers
for each in samples.schema.names:
    samples = samples.withColumnRenamed(each, each.strip())
for each in points.schema.names:
    points = points.withColumnRenamed(each, each.strip())
samples = samples.withColumn("TimeSt", F.to_timestamp("TimeSt"))

In [2]:
samples.show(5, truncate=False)
points.show(truncate=False)
print(f"There are {samples.count()} rows in DataSample.csv")
print(f"There are {points.count()} rows in the POIList.csv")
samples.dtypes, points.dtypes

+-------+-----------------------+-------+--------+---------+--------+---------+
|_ID    |TimeSt                 |Country|Province|City     |Latitude|Longitude|
+-------+-----------------------+-------+--------+---------+--------+---------+
|4516516|2017-06-21 00:00:00.143|CA     |ON      |Waterloo |43.49347|-80.49123|
|4516547|2017-06-21 18:00:00.193|CA     |ON      |London   |42.9399 |-81.2709 |
|4516550|2017-06-21 15:00:00.287|CA     |ON      |Guelph   |43.5776 |-80.2201 |
|4516600|2017-06-21 15:00:00.307|CA     |ON      |Stratford|43.3716 |-80.9773 |
|4516613|2017-06-21 15:00:00.497|CA     |ON      |Stratford|43.3716 |-80.9773 |
+-------+-----------------------+-------+--------+---------+--------+---------+
only showing top 5 rows

+-----+---------+-----------+
|POIID|Latitude |Longitude  |
+-----+---------+-----------+
|POI1 |53.546167|-113.485734|
|POI2 |53.546167|-113.485734|
|POI3 |45.521629|-73.566024 |
|POI4 |45.22483 |-63.232729 |
+-----+---------+-----------+

There are 2202

([('_ID', 'int'),
  ('TimeSt', 'timestamp'),
  ('Country', 'string'),
  ('Province', 'string'),
  ('City', 'string'),
  ('Latitude', 'double'),
  ('Longitude', 'double')],
 [('POIID', 'string'), ('Latitude', 'double'), ('Longitude', 'double')])

# 1. Cleanup

Find the sample dataset of request logs in `data/DataSample.csv`. We consider records with identical `geoinfo` and `timest` as suspicious. Please clean up the sample dataset by filtering out those questionable request records.

# Q1 Solution

In [3]:
# removing all of the duplicated request
samples = samples.join(
    samples.groupBy("Latitude", "Longitude", "TimeSt")
    .count()
    .where("count=1")
    .drop("count"),
    on=["Latitude", "Longitude", "TimeSt"],
)
# points = points.join(
#     points.groupBy("Latitude", "Longitude").count().where("count=1").drop("count"),
#     on=["Latitude", "Longitude"],
# )
print(f"There are {samples.count()} rows in the cleaned DataSample.csv")
# print(f"There are {points.count()} rows in the cleaned POIList.csv")

There are 17973 rows in the cleaned DataSample.csv
There are 2 rows in the cleaned POIList.csv


# 2. Label

Assign each *request* (from `data/DataSample.csv`) to the closest (i.e., minimum distance) *POI* (from `data/POIList.csv`).

Note: a *POI* is a geographical Point of Interest.

## Notes

- Need to convert coordinates to radians and use angular distance because the earth isnt flat...

### haversine
$$d=\theta r$$ 
Where **d** is the arc length of the corresponding angle $\theta$ of sphere with radius **r**
Can compute **d** by computing $hav(\theta)$, which can be computed as the difference of the longitude and latitude of two points defining the arc.

With $$\begin{aligned} h&=hav(\theta) \\
&=\sin^2{\frac{\theta}{2}}\\
&=\frac{1-\cos{\theta}}{2}\\
&=hav(\phi_2-\phi_1)+(1-hav(\phi_1-\phi_2)-hav(\phi_1+\phi_2))\cdot hav(\lambda_2-\lambda_1) \end{aligned}$$

Where $\phi$ is the latitude and $\lambda$ is the longitude

Solving for **d**:
$$\begin{aligned} d&=2r\arcsin{\sqrt{h}} \\
&= 2r\arcsin{\sqrt{\sin^2{\frac{\phi_2-\phi_1}{2}}+\cos{\phi_1}\cdot\cos{\phi_2}\cdot\sin^2{\frac{\lambda_2-\lambda_1}{2}}}} \end{aligned}$$

- Answers have errors that are no better than 0.5% due to variation of earth's radius (needed citation)
- $h\in [0,1]$ for $d\in \mathbb{R}$
- R=6371 KM

```python
from sklearn.metrics import DistanceMetric
import numpy as np


dist = DistanceMetric.get_metric('haversine')
def closest_point(point, points):
    """ Find closest point from a list of points. """
    return points[dist.pairwise([point], points).argmin()]

ds['point'] = [(x, y) for x,y in zip(np.radians(ds['Latitude']), np.radians(ds['Longitude']))]
poi['point'] = [(x, y) for x,y in zip(np.radians(poi['Latitude']), np.radians(poi['Longitude']))]
ds['closest'] = [closest_point(x, list(poi['point'])) for x in ds['point']]
ds = ds.merge(poi[['POIID', 'point']].round(4), how='left', validate = 'm:1',
         left_on='closest', right_on='point').drop(['closest', 'point_x', 'point_y'], axis=1)
```

Pandas implementation relatively easy, need to figure out how to implement in spark without looping over the columns

# Q2 Solution

In [4]:
points.createOrReplaceTempView("points")
samples.createOrReplaceTempView("samples")

In [5]:
# query to calculate distance, uses crossjoin..
query = """
SELECT _ID,
        TimeSt,
        Country,
        Province,
        City,
        s.Latitude,
        s.Longitude,
        p.POIID,
        2 * 6371 * ASIN(SQRT(POW(SIN((RADIANS(p.Latitude)-RADIANS(s.Latitude)) * 0.5), 2)
            +COS(RADIANS(s.Latitude))*COS(RADIANS(p.Latitude))
            *POW(SIN((RADIANS(p.Longitude)-RADIANS(s.Longitude)) * 0.5),2))) as distance_km
FROM samples s CROSS JOIN 
(
SELECT  POIID,
        Latitude,
        Longitude
FROM points) AS p ON 1=1
ORDER BY s._ID"""

In [6]:
# calculate distance and store in new dataframe
df = spark.sql(query)

In [7]:
# group request from same location and order them by distance (descending)
# assign row number and show only the first (closest) POIID
from pyspark.sql.window import Window

w = Window.partitionBy(["_ID", "Latitude", "Longitude"]).orderBy("distance_km")
df_shortest = (
    df.withColumn("rn", F.row_number().over(w))
    .where(F.col("rn") == 1)
    .drop("rn")
    .orderBy("_ID", ascending=False)
)

In [3]:
df_shortest.show(10)

NameError: name 'df_shortest' is not defined

In [None]:
# write assigned requests to new file
df_shortest.write.csv(
    os.path.expanduser("~/data/DataSample_Assigned.csv"),
    header=True,
    sep=",",
)

In [9]:
# Not going to consider the milisec difference in timestamp as suspicious
# since 12 other requests were made during that time
t = spark.sql(
    """ 
SELECT * 
FROM samples
WHERE _ID=4516600 OR _ID=4516613
"""
)
t.show(truncate=False, vertical=True)

-RECORD 0----------------------------
 Latitude  | 43.3716                 
 Longitude | -80.9773                
 TimeSt    | 2017-06-21 15:00:00.307 
 _ID       | 4516600                 
 Country   | CA                      
 Province  | ON                      
 City      | Stratford               
-RECORD 1----------------------------
 Latitude  | 43.3716                 
 Longitude | -80.9773                
 TimeSt    | 2017-06-21 15:00:00.497 
 _ID       | 4516613                 
 Country   | CA                      
 Province  | ON                      
 City      | Stratford               

