In [110]:
import sys
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext, HiveContext
import os
from pyspark.sql.functions import *
from pyspark.sql import functions as sqlf 
import pyspark.storagelevel 
import numpy as np
import pandas as pd
import datetime
from pyspark.sql.types import *
import pdb
import subprocess # Used for executing linux commands, for writing to teradata
import sys

from pyspark.sql.types import *
from pyspark.sql.functions import col, udf, min, max
from datetime import datetime, timedelta
conf = SparkConf().setMaster("local").setAppName("storeSales")
sc = SparkContext.getOrCreate(conf = conf)
sqlContext = SQLContext(sc)
hiveContext = HiveContext(sc)

In [113]:
from pyspark.sql.types import *
data_fields = StructType([StructField("_ID", IntegerType(), True),
       StructField("TimeSt", StringType(), True),
       StructField("Country", StringType(), True),
       StructField("Province", StringType(), True),
       StructField("City", StringType(), True),
       StructField("Latitude", FloatType(), True),
       StructField("Longitude", FloatType(), True)])
poi_fields = StructType([StructField("POIID", StringType(), True),
       StructField("Latitude", FloatType(), True),
       StructField("Longitude", FloatType(), True)])

In [121]:
data_sample = hiveContext.read.format("com.databricks.spark.csv") \
  .option("header", "true") \
  .option("inferSchema", "true") \
  .load("file:///home/vijay/DATA_SCIENCE/BPF/BPF/work-samples-master/data-mr/data/DataSample.csv",schema=data_fields)
poi_df = hiveContext.read.format("com.databricks.spark.csv") \
  .option("header", "true") \
  .option("inferSchema", "true") \
  .load("file:///home/vijay/DATA_SCIENCE/BPF/BPF/work-samples-master/data-mr/data/POIList.csv",schema=poi_fields)

In [85]:
data_sample.show(4)

+-------+--------------------+-------+--------+---------+--------+---------+
|    _ID|              TimeSt|Country|Province|     City|Latitude|Longitude|
+-------+--------------------+-------+--------+---------+--------+---------+
|4516516|2017-06-21 00:00:...|     CA|      ON| Waterloo|43.49347|-80.49123|
|4516547|2017-06-21 18:00:...|     CA|      ON|   London| 42.9399| -81.2709|
|4516550|2017-06-21 15:00:...|     CA|      ON|   Guelph| 43.5776| -80.2201|
|4516600|2017-06-21 15:00:...|     CA|      ON|Stratford| 43.3716| -80.9773|
+-------+--------------------+-------+--------+---------+--------+---------+
only showing top 4 rows



In [86]:
pd_df = data_sample.toPandas()

In [87]:
pd_df.describe()

Unnamed: 0,_ID,Latitude,Longitude
count,22025.0,22025.0,22025.0
mean,5067292.0,47.983484,-94.967281
std,317557.7,4.165093,19.092001
min,4516516.0,6.92742,-136.539566
25%,4792159.0,43.747601,-113.814003
50%,5067906.0,47.798698,-83.017998
75%,5342586.0,51.128559,-79.399101
max,5615006.0,62.827301,125.702202


In [88]:
pd_df[['Country','Province','City','TimeSt']].describe()

Unnamed: 0,Country,Province,City,TimeSt
count,22025,22025,22025,22025
unique,1,12,656,19972
top,CA,ON,Calgary,2017-06-21 06:11:00.323
freq,22025,9321,3979,3


In [89]:
# There are duplicate TimeSt, City, Province

In [90]:
data_sample.registerTempTable("data_sample_table")

In [103]:
unique_data_sample = hiveContext.sql( '''select Max(_ID) as ID, TimeSt, Latitude, Longitude,concat(Latitude,',',Longitude) as Coordinates,Country,Province,City
                                 from data_sample_table group by TimeSt, Latitude, Longitude,Country,Province,City 
                                 order by 
                                 TimeSt,
                                 Latitude,
                                 Longitude''' )

In [104]:
unique_data_sample.count()

19999

In [105]:
unique_data_sample.show(10)

+-------+--------------------+--------+---------+------------------+-------+--------+---------+
|     ID|              TimeSt|Latitude|Longitude|       Coordinates|Country|Province|     City|
+-------+--------------------+--------+---------+------------------+-------+--------+---------+
|4516516|2017-06-21 00:00:...|43.49347|-80.49123|43.49347,-80.49123|     CA|      ON| Waterloo|
|4519209|2017-06-21 00:00:...| 44.1517| -81.0266|  44.1517,-81.0266|     CA|      ON|  Hanover|
|4518130|2017-06-21 00:00:...| 43.0004| -81.2343|  43.0004,-81.2343|     CA|      ON|   London|
|5368841|2017-06-21 00:00:...| 45.2778| -75.7563|  45.2778,-75.7563|     CA|      ON|   Nepean|
|4521574|2017-06-21 00:00:...| 43.1508| -80.2094|  43.1508,-80.2094|     CA|      ON|Brantford|
|4523455|2017-06-21 00:00:...| 43.0091| -81.1765|  43.0091,-81.1765|     CA|      ON|   London|
|4522231|2017-06-21 00:00:...| 42.4247| -82.1755|  42.4247,-82.1755|     CA|      ON|  Chatham|
|4522376|2017-06-21 00:00:...| 43.4634| 

In [233]:
poi_df.show()

+-----+---------+----------+
|POIID| Latitude| Longitude|
+-----+---------+----------+
| POI1|53.546165|-113.48573|
| POI2|53.546165|-113.48573|
| POI3| 45.52163|-73.566025|
| POI4| 45.22483| -63.23273|
+-----+---------+----------+



#poi_df= hiveContext.sql("SELECT POIID,Latitude,Longitude,CONCAT(Latitude,',',Longitude) AS Coordinates FROM poi_df")
Coordinates_df = poi_df.select(concat(poi_df['Latitude'], lit(','),poi_df['Longitude']).alias('Coordinates'),poi_df['POIID'])
Coordinates_df.show()

poi_df_coords = poi_df.join(Coordinates_df, on = "POIID", how = "inner" )

poi_df_coords.show()

In [234]:
setA_lat = np.array(unique_data_sample.select('Latitude').collect())
setA_lon = np.array(unique_data_sample.select('Longitude').collect())

In [235]:
setB_lat = np.array(poi_df.select('Latitude').collect())
setB_lon = np.array(poi_df.select('Longitude').collect())

In [236]:
setA = np.c_[setA_lat,setA_lon]
setA

array([[  43.49346924,  -80.4912262 ],
       [  44.15169907,  -81.0266037 ],
       [  43.00040054,  -81.23429871],
       ..., 
       [  53.49723053, -113.63462067],
       [  43.77030182,  -79.21649933],
       [  49.78279877,  -94.44110107]])

In [237]:
setB = np.c_[setB_lat,setB_lon]
setB

array([[  53.54616547, -113.48573303],
       [  53.54616547, -113.48573303],
       [  45.52162933,  -73.56602478],
       [  45.22483063,  -63.23273087]])

In [260]:
from scipy.spatial import cKDTree

def closest_pts(setA, setB):
    indx = cKDTree(setB).query(setA,k=1,p=2)[1] # this returns array in position0 and index in position 1
    print(indx[0:100])
    return setB[indx]

In [261]:
c_x = closest_pts(setA, setB)

[2 2 2 2 2 2 2 2 2 2 2 2 2 0 3 3 0 0 2 2 0 0 0 0 0 2 2 2 2 2 2 0 0 2 2 2 2
 2 2 2 2 2 0 2 0 0 0 0 2 2 2 0 2 2 0 2 0 2 2 0 2 2 0 0 0 3 0 0 0 0 0 0 2 2
 0 0 0 0 0 0 0 2 0 0 0 2 0 2 0 0 2 0 0 2 0 0 2 0 2 2]


In [266]:
c_x[0:5]

Unnamed: 0,nearest_lat,nearest_lon
0,45.521629,-73.566025
1,45.521629,-73.566025
2,45.521629,-73.566025
3,45.521629,-73.566025
4,45.521629,-73.566025


In [265]:
c_x.shape

(19999, 2)

In [267]:
columns = ['nearest_lat','nearest_lon']
c_x = pd.DataFrame(c_x,columns=columns)

In [280]:
unique_data_sample_pd = unique_data_sample.toPandas()
nearest_match_pd = nearest_match.toPandas()

In [281]:
unique_data_sample_pd['nearest_lat'] = nearest_match_pd['nearest_lat']
unique_data_sample_pd['nearest_lon'] = nearest_match_pd['nearest_lon']

In [288]:
unique_data_sample_pd[0:30]

Unnamed: 0,ID,TimeSt,Latitude,Longitude,Coordinates,Country,Province,City,nearest_lat,nearest_lon
0,4516516,2017-06-21 00:00:00.143,43.493469,-80.491226,"43.49347,-80.49123",CA,ON,Waterloo,45.521629,-73.566025
1,4519209,2017-06-21 00:00:01.257,44.151699,-81.026604,"44.1517,-81.0266",CA,ON,Hanover,45.521629,-73.566025
2,4518130,2017-06-21 00:00:01.333,43.000401,-81.234299,"43.0004,-81.2343",CA,ON,London,45.521629,-73.566025
3,5368841,2017-06-21 00:00:02.573,45.277802,-75.756302,"45.2778,-75.7563",CA,ON,Nepean,45.521629,-73.566025
4,4521574,2017-06-21 00:00:02.637,43.150799,-80.209396,"43.1508,-80.2094",CA,ON,Brantford,45.521629,-73.566025
5,4523455,2017-06-21 00:00:04.367,43.009102,-81.176498,"43.0091,-81.1765",CA,ON,London,45.521629,-73.566025
6,4522231,2017-06-21 00:00:04.383,42.424702,-82.175499,"42.4247,-82.1755",CA,ON,Chatham,45.521629,-73.566025
7,4522376,2017-06-21 00:00:05.113,43.463402,-80.520103,"43.4634,-80.5201",CA,ON,Waterloo,45.521629,-73.566025
8,4524947,2017-06-21 00:00:06.163,43.430599,-80.487701,"43.4306,-80.4877",CA,ON,Kitchener,45.521629,-73.566025
9,4526697,2017-06-21 00:00:22.950,43.208,-79.965202,"43.208,-79.9652",CA,ON,Ancaster,45.521629,-73.566025


In [283]:
unique_data_sample = hiveContext.createDataFrame(unique_data_sample_pd)

In [285]:
unique_data_sample.show(5)

+-------+--------------------+------------------+------------------+------------------+-------+--------+---------+------------------+------------------+
|     ID|              TimeSt|          Latitude|         Longitude|       Coordinates|Country|Province|     City|       nearest_lat|       nearest_lon|
+-------+--------------------+------------------+------------------+------------------+-------+--------+---------+------------------+------------------+
|4516516|2017-06-21 00:00:...| 43.49346923828125|-80.49122619628906|43.49347,-80.49123|     CA|      ON| Waterloo|45.521629333496094|-73.56602478027344|
|4519209|2017-06-21 00:00:...| 44.15169906616211|-81.02660369873047|  44.1517,-81.0266|     CA|      ON|  Hanover|45.521629333496094|-73.56602478027344|
|4518130|2017-06-21 00:00:...| 43.00040054321289|-81.23429870605469|  43.0004,-81.2343|     CA|      ON|   London|45.521629333496094|-73.56602478027344|
|5368841|2017-06-21 00:00:...|45.277801513671875|-75.75630187988281|  45.2778,-75.