In [None]:

import pyspark.sql.functions as F


#set distance value (5,80,200)
R= 200

#read files from HDFS
hotels = spark.read.option("delimiter", "|").csv('hdfs://master:9000/user/student/input/hotels-ver1.txt',inferSchema=True,header=False)
restaurants = spark.read.option("delimiter", "|").csv('hdfs://master:9000/user/student/input/restaurants-ver1.txt',inferSchema=True,header=False)

#create flags for hotels and restaurants
hotels = hotels.withColumn("Label", F.expr("'H'")) 
restaurants = restaurants.withColumn("Label", F.expr("'R'"))
#rename variables 
hotels= hotels.select("_c1", "_c4", "_c5","Label").withColumnRenamed("_c1","Name").withColumnRenamed("_c4","Lat").withColumnRenamed("_c5","Lon")
restaurants = restaurants.select("_c1", "_c3", "_c4","Label").withColumnRenamed("_c1","Name").withColumnRenamed("_c3","Lat").withColumnRenamed("_c4","Lon")

#merge files to one dataframe
merged_df = hotels.union(restaurants)

#create 4 partitions according to lat variable
df_part=merged_df.repartitionByRange(4,(F.col("Lat")))
#create a new column with the partition id
df_part= df_part.withColumn("Partition",F.expr("spark_partition_id()"))

#check partitions
#df_part.write.csv("part13")

#create duplicates with Horizontal Separation method
df_part.createOrReplaceTempView("df")
max_lat_A = spark.sql("SELECT MAX(Lat) FROM df WHERE  Label='R' and Partition = '0' ").collect()[0][0]
min_lat_B = spark.sql("SELECT MIN(Lat) FROM df WHERE  Label='R' and Partition = '1' ").collect()[0][0]
max_lat_B = spark.sql("SELECT MAX(Lat) FROM df WHERE  Label='R' and Partition = '1' ").collect()[0][0]
min_lat_C = spark.sql("SELECT MIN(Lat) FROM df WHERE  Label='R' and Partition = '2' ").collect()[0][0]
max_lat_C = spark.sql("SELECT MAX(Lat) FROM df WHERE  Label='R' and Partition = '2' ").collect()[0][0]
min_lat_D = spark.sql("SELECT MIN(Lat) FROM df WHERE  Label='R' and Partition = '3' ").collect()[0][0]

#create duplicates for partition A
df_AB = spark.sql("SELECT Name,Lat,Lon,Label FROM df where Label = 'H' and Partition = '0' and {}+ Lat> {}".format(R,min_lat_B))
df_AC = spark.sql("SELECT Name,Lat,Lon,Label FROM df where Label = 'H' and Partition = '0' and {}+ Lat> {}".format(R,min_lat_C))
df_AD = spark.sql("SELECT Name,Lat,Lon,Label FROM df where Label = 'H' and Partition = '0' and {}+ Lat> {}".format(R,min_lat_D))
#create duplicates for partition B
df_BA = spark.sql("SELECT Name,Lat,Lon,Label FROM df where Label = 'H' and Partition= '1' and Lat - {}<{}".format(R,max_lat_A))
df_BC = spark.sql("SELECT Name,Lat,Lon,Label FROM df where Label = 'H' and Partition = '1' and {}+ Lat> {}".format(R,min_lat_C))
df_BD = spark.sql("SELECT Name,Lat,Lon,Label FROM df where Label = 'H' and Partition = '1' and {}+ Lat> {}".format(R,min_lat_D))
#create duplicates for partition C
df_CD = spark.sql("SELECT Name,Lat,Lon,Label FROM df where Label = 'H' and Partition = '2' and {}+ Lat> {}".format(R,min_lat_D))
df_CB = spark.sql("SELECT Name,Lat,Lon,Label FROM df where Label = 'H' and Partition= '2' and Lat - {}<{}".format(R,max_lat_B))
df_CA = spark.sql("SELECT Name,Lat,Lon,Label FROM df where Label = 'H' and Partition= '2' and Lat - {}<{}".format(R,max_lat_A))
#create duplicates for partition D
df_DC = spark.sql("SELECT Name,Lat,Lon,Label FROM df where Label = 'H' and Partition= '3' and Lat - {}<{}".format(R,max_lat_C))
df_DB = spark.sql("SELECT Name,Lat,Lon,Label FROM df where Label = 'H' and Partition= '3' and Lat - {}<{}".format(R,max_lat_B))
df_DA = spark.sql("SELECT Name,Lat,Lon,Label FROM df where Label = 'H' and Partition= '3' and Lat - {}<{}".format(R,max_lat_A))

#merge all duplicates with the same destination 
to_A = df_BA.union(df_CA).union(df_DA)
to_B = df_AB.union(df_CB).union(df_DB)
to_C = df_AC.union(df_BC).union(df_DC)
to_D = df_AD.union(df_BD).union(df_CD)

#set the partition id to duplicates 
to_A = to_A.withColumn("Partition", F.expr("'0'"))
to_B = to_B.withColumn("Partition", F.expr("'1'"))
to_C = to_C.withColumn("Partition", F.expr("'2'"))
to_D = to_D.withColumn("Partition", F.expr("'3'"))

#create the final dataframe
df_part= df_part.union(to_A).union(to_B).union(to_C).union(to_D)

#create 4 partitions according to partition id
df_part=df_part.repartition(4, F.col("Partition"))
#df_part.write.csv("part14")

#run tne join query using cross join method
df_part.createOrReplaceTempView("df")
dfjoin = spark.sql("select t1.Name,t2.Name  from df t1 cross join df t2 where t1.Label='H'and t1.Partition=t2.Partition and t2.Label='R' and 6371 * acos( cos( radians(t2.lat) ) * cos( radians( t1.lat ) ) * cos( radians( t1.lon  ) - radians(t2.lon) ) + sin( radians(t2.lat) ) * sin(radians(t1.lat)) ) <= {}".format(R))
dfjoin.explain(True)
dfjoin.count()

#check the performormace for R=5, R=80, R=200 (http://127.0.0.1:4040/SQL)

