# Spatial Joins With Pyspark

## Installations for Pyspark

In [1]:
!sudo apt update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://dlcdn.apache.org/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz
!tar xf spark-3.2.1-bin-hadoop3.2.tgz
!pip install -q findspark
!pip install pyspark
!pip install py4j

[33m0% [Working][0m            Hit:1 http://archive.ubuntu.com/ubuntu jammy InRelease
[33m0% [Waiting for headers] [Waiting for headers] [Connected to cloud.r-project.or[0m                                                                               Hit:2 http://archive.ubuntu.com/ubuntu jammy-updates InRelease
                                                                               Hit:3 http://security.ubuntu.com/ubuntu jammy-security InRelease
                                                                               Hit:4 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease
[33m0% [Waiting for headers] [Connecting to ppa.launchpadcontent.net (185.125.190.8[0m                                                                               Hit:5 http://archive.ubuntu.com/ubuntu jammy-backports InRelease
Hit:6 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease
Hit:7 https://ppa.launchpadcontent.net/c2d4u.te

In [2]:
import os
import sys

import findspark
findspark.init()
findspark.find()

'/usr/local/lib/python3.10/dist-packages/pyspark'

## Install Haversine

In [3]:
pip install haversine



## Imports

In [4]:
from pyspark.sql import SparkSession

In [5]:
from pyspark.sql.types import StructType, StructField, DoubleType, StringType

In [6]:
from pyspark.sql.functions import col

In [7]:
from haversine import haversine

## Create SparkSession

In [8]:
spark = SparkSession.builder.appName('PysparkRestHotels').getOrCreate()

## Read datasets

In [9]:
restaurants_schema = StructType(fields=[StructField('_c0', StringType(), False),
                                        StructField('_c1', StringType(), True),
                                        StructField('_c2', StringType(), True),
                                        StructField('_c3', DoubleType(), True),
                                        StructField('_c4', DoubleType(), True),
                                        StructField('_c5', StringType(), True)
])

In [10]:
hotels_schema = StructType(fields=[StructField('_c0', StringType(), False),
                                        StructField('_c1', StringType(), True),
                                        StructField('_c2', StringType(), True),
                                        StructField('_c3', StringType(), True),
                                        StructField('_c4', DoubleType(), True),
                                        StructField('_c5', DoubleType(), True),
                                        StructField('_c6', StringType(), True)
])

In [11]:
restaurants_df = spark.read \
.schema(restaurants_schema) \
.option('delimiter', '|') \
.csv('restaurants-ver1.txt')

In [12]:
restaurants_df.show(2)

+---+-------------------+---+-----------+-----------------+--------------------+
|_c0|                _c1|_c2|        _c3|              _c4|                 _c5|
+---+-------------------+---+-----------+-----------------+--------------------+
|  1|Daphne's Greek Cafe|3.5|34.19198813|      -118.937563|Greek, Mediterran...|
|  2|Daphne's Greek Cafe|2.5|  37.755557|-122.250360846519|Greek, Mediterranean|
+---+-------------------+---+-----------+-----------------+--------------------+
only showing top 2 rows



In [13]:
restaurants_df.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: double (nullable = true)
 |-- _c4: double (nullable = true)
 |-- _c5: string (nullable = true)



In [14]:
restaurants_df.count()

78981

In [15]:
hotels_df = spark.read \
.schema(hotels_schema) \
.option('delimiter', '|') \
.csv('hotels-ver1.txt')

In [16]:
hotels_df.show(2)

+---+--------------------+---+---+---------+-----------+--------------------+
|_c0|                 _c1|_c2|_c3|      _c4|        _c5|                 _c6|
+---+--------------------+---+---+---------+-----------+--------------------+
|  1|AAE S.F. European...| 50| 20|37.776181|-122.413414|laundry_service,c...|
|  2|Ambassador Motor ...|188| 27|38.959716|-119.945595|cable_tv,secretar...|
+---+--------------------+---+---+---------+-----------+--------------------+
only showing top 2 rows



In [17]:
hotels_df.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: double (nullable = true)
 |-- _c5: double (nullable = true)
 |-- _c6: string (nullable = true)



In [18]:
hotels_df.count()

25463

## Select, Rename Columns and Add New Column

In [19]:
restaurants_selected_df = restaurants_df.select(col('_c0'), col('_c1'), col('_c3'), col('_c4')) \
                                        .withColumnRenamed('_c0', 'Restaurant_Id') \
                                        .withColumnRenamed('_c1', 'Restaurant_name') \
                                        .withColumnRenamed('_c3', 'Restaurant_latitude') \
                                        .withColumnRenamed('_c4', 'Restaurant_longitude') \
                                        .withColumn('Restaurant_latitude_int', col('Restaurant_latitude').cast('int'))

In [20]:
restaurants_selected_df.show(2)

+-------------+-------------------+-------------------+--------------------+-----------------------+
|Restaurant_Id|    Restaurant_name|Restaurant_latitude|Restaurant_longitude|Restaurant_latitude_int|
+-------------+-------------------+-------------------+--------------------+-----------------------+
|            1|Daphne's Greek Cafe|        34.19198813|         -118.937563|                     34|
|            2|Daphne's Greek Cafe|          37.755557|   -122.250360846519|                     37|
+-------------+-------------------+-------------------+--------------------+-----------------------+
only showing top 2 rows



In [21]:
hotels_selected_df = hotels_df.select(col('_c0'), col('_c1'), col('_c4'), col('_c5')) \
                              .withColumnRenamed('_c0', 'Hotel_Id') \
                              .withColumnRenamed('_c1', 'Hotel_name') \
                              .withColumnRenamed('_c4', 'Hotel_latitude') \
                              .withColumnRenamed('_c5', 'Hotel_longitude') \
                              .withColumn('Hotel_latitude_int', col('Hotel_latitude').cast('int'))

In [22]:
hotels_selected_df.show(2)

+--------+--------------------+--------------+---------------+------------------+
|Hotel_Id|          Hotel_name|Hotel_latitude|Hotel_longitude|Hotel_latitude_int|
+--------+--------------------+--------------+---------------+------------------+
|       1|AAE S.F. European...|     37.776181|    -122.413414|                37|
|       2|Ambassador Motor ...|     38.959716|    -119.945595|                38|
+--------+--------------------+--------------+---------------+------------------+
only showing top 2 rows



## Sort the two Dataframes By Latitude
Sort the two dataframes by latitude (default is ascending)

In [23]:
restaurants_sorted_df = restaurants_selected_df.orderBy('Restaurant_latitude').cache()

In [24]:
hotels_sorted_df = hotels_selected_df.orderBy('Hotel_latitude').cache()

## Create New Dataframes Partitioned by Range

In [25]:
restaurants_repartitioned_df = restaurants_sorted_df.repartitionByRange(4, col('Restaurant_latitude'))

In [26]:
hotels_repartitioned_df = hotels_sorted_df.repartitionByRange(4, col('Hotel_latitude'))

In [27]:
print('print the length of each partition for restaurants partitions')
rdd_rests = restaurants_repartitioned_df.rdd
print(rdd_rests.glom().map(len).collect())

print the length of each partition for restaurants partitions
[19798, 19683, 19669, 19831]


In [28]:
print('print the length of each partition for hotels partitions')
rdd_hotels = hotels_repartitioned_df.rdd
print(rdd_hotels.glom().map(len).collect())

print the length of each partition for hotels partitions
[6351, 6365, 6377, 6370]


## Join

In [29]:
joined_df = restaurants_repartitioned_df.join(hotels_repartitioned_df,
                                              restaurants_repartitioned_df['Restaurant_latitude_int'] == hotels_repartitioned_df['Hotel_latitude_int'],
                                              'full')

In [30]:
joined_df.show()

+-------------+--------------------+-------------------+--------------------+-----------------------+--------+--------------------+--------------+---------------+------------------+
|Restaurant_Id|     Restaurant_name|Restaurant_latitude|Restaurant_longitude|Restaurant_latitude_int|Hotel_Id|          Hotel_name|Hotel_latitude|Hotel_longitude|Hotel_latitude_int|
+-------------+--------------------+-------------------+--------------------+-----------------------+--------+--------------------+--------------+---------------+------------------+
|         6071|Gourmet Pie and Cafe|         -42.118331|        -118.0396435|                    -42|    NULL|                NULL|          NULL|           NULL|              NULL|
|        39741|  Brio Tuscan Grille|          26.003874|           -80.33781|                     26|   11656|Greenbriar Beach ...|       26.0002|        -80.117|                26|
|        39741|  Brio Tuscan Grille|          26.003874|           -80.33781|             

In [31]:
joined_df.count()

130094027

## Data Cleaning
Drop the rows with nulls

In [32]:
joined_cleaned_df = joined_df.na.drop()

## Find Haversine Distance

In [33]:
from pyspark.sql.functions import udf

In [34]:
def haver(x,y,w,z):
  return haversine((x,y),(w,z))

In [35]:
udf_hav = udf(haver, DoubleType())

In [36]:
haversine_df = joined_cleaned_df.withColumn('Haversine_distance', udf_hav('Restaurant_latitude', 'Restaurant_longitude','Hotel_latitude', 'Hotel_longitude'))

In [37]:
haversine_df.show()

+-------------+------------------+-------------------+--------------------+-----------------------+--------+--------------------+--------------+---------------+------------------+------------------+
|Restaurant_Id|   Restaurant_name|Restaurant_latitude|Restaurant_longitude|Restaurant_latitude_int|Hotel_Id|          Hotel_name|Hotel_latitude|Hotel_longitude|Hotel_latitude_int|Haversine_distance|
+-------------+------------------+-------------------+--------------------+-----------------------+--------+--------------------+--------------+---------------+------------------+------------------+
|        39741|Brio Tuscan Grille|          26.003874|           -80.33781|                     26|   11656|Greenbriar Beach ...|       26.0002|        -80.117|                26|22.071473191652224|
|        39741|Brio Tuscan Grille|          26.003874|           -80.33781|                     26|   13063|     Richard's Motel|     26.000638|     -80.142609|                26| 19.51161778579138|
|    

## Filter Dataframe With 0.5 or Less Haversine Distance

In [38]:
final_df = haversine_df.select(col('Restaurant_Id'), col('Restaurant_name'), col('Hotel_Id'), col('Hotel_name'), col('Haversine_distance')) \
                       .filter(col('Haversine_distance') < 0.5)

In [39]:
final_df.show()

+-------------+--------------------+--------+--------------------+-------------------+
|Restaurant_Id|     Restaurant_name|Hotel_Id|          Hotel_name| Haversine_distance|
+-------------+--------------------+--------+--------------------+-------------------+
|        39601|Sal's Italian Ris...|   13745|The Grand Palms R...| 0.4534427636635714|
|        34767|       Panda Express|   13745|The Grand Palms R...| 0.4382017571145667|
|        40097|Big Tomato Market...|   13110|Residence Inn by ...| 0.4312599165175942|
|        34261|    Chuck E Cheese's|   13110|Residence Inn by ...|0.44521219966749853|
|        35341|Nami Japanese Res...|   13110|Residence Inn by ...|0.34898717289150877|
|        37333|    New China Buffet|   13110|Residence Inn by ...| 0.3443114434543708|
|        38570|Sushi-Thai On The...|   13366|    Southwinds Motel|0.20873294229953782|
|        38570|Sushi-Thai On The...|   12192|Hollywood by The ...| 0.0472071636986774|
|        38570|Sushi-Thai On The...|   1224