In [1]:
from pyspark.sql.functions import udf, lit, col, broadcast, count, sum
from math import radians, cos, sin, asin, sqrt
from pyspark.sql.types import FloatType
from pyspark.sql.types import *
import pyspark 
from delta import *
from delta.tables import DeltaTable

# This took 9mins with 10k rows

def main():

    # Constants
    DISTANCE = 2 # km
    TABLE_PATH = "hdfs:///project/data/business_data/delta_table_proximity_count"
    CSV_PATH = "hdfs:///project/data/business_data/yelp_academic_dataset_business.csv"

    # Calculate the haversine distance between two sets of latitudes and longitudes
    @udf(returnType=FloatType())
    def haversine_distance(lat1, lon1, lat2, lon2) -> float:
        R = 6371  # radius of the earth in kilometers
        lat1, lon1, lat2, lon2 = map(radians, [lat1, lon1, lat2, lon2])
        dlat = lat2 - lat1
        dlon = lon2 - lon1
        a = sin(dlat/2)**2 + cos(lat1) * cos(lat2) * sin(dlon/2)**2
        c = 2 * asin(sqrt(a))
        distance = R * c
        return distance
    
    @udf(returnType=IntegerType())
    def proximity(distance):
        if distance <= DISTANCE: return 1
        return 0
    
    # Initiate spark
    builder = pyspark.sql.SparkSession.builder.appName("spark_proximity_count_naive") \
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
        .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
        .config("spark.executor.cores", 4)
    spark = configure_spark_with_delta_pip(builder).getOrCreate()
    
    # Create dataframe from csv file
    schema = "Business_id STRING, Latitude FLOAT, Longitude FLOAT, Stars FLOAT"
    df = spark.read.csv(CSV_PATH, schema=schema).repartition(200, "Business_id")

    # Combination of all business pairs
    df_pairs = df.toDF("Business_id1", "Latitude1", "Longitude1", "Stars1").crossJoin(df.toDF("Business_id2", "Latitude2", "Longitude2", "Stars2"))

    # Calculate haversine distances
    df_pairs = df_pairs.withColumn("Proximity", proximity(haversine_distance(df_pairs.Latitude1, df_pairs.Longitude1, df_pairs.Latitude2, df_pairs.Longitude2)))
    df_proximity = df_pairs.groupBy(col("Business_id1")).agg(sum("Proximity").alias("ProximityCount"))

    df = df.join(df_proximity, on=df.Business_id == df_proximity.Business_id1, how="inner").drop("Business_id1")


    # If no delta table exists, save and exit
    if not DeltaTable.isDeltaTable(spark, TABLE_PATH):
        df.write.format("delta").save(TABLE_PATH)
        return
        
    # # Upsert delta table
    delta_table = DeltaTable.forPath(spark, TABLE_PATH)
    delta_table.alias("old") \
        .merge(
            df.alias("new"),
            "old.Business_id = new.Business_id"
        ) \
        .whenMatchedUpdate(set=
            {
                "Business_id": "new.Business_id",
                "Latitude": "new.Latitude",
                "Longitude": "new.Longitude",
                "Stars": "new.Stars",
                "ProximityCount": "new.ProximityCount"
            }
        ) \
        .whenNotMatchedInsert(values=
            {
                "Business_id": "new.Business_id",
                "Latitude": "new.Latitude",
                "Longitude": "new.Longitude",
                "Stars": "new.Stars",
                "ProximityCount": "new.ProximityCount"
            }
        ) \
        .execute()


if __name__ == "__main__":
    main()

:: loading settings :: url = jar:file:/home/ubuntu/spark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/ubuntu/.ivy2/cache
The jars for the packages stored in: /home/ubuntu/.ivy2/jars
io.delta#delta-core_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-5d528ba5-31af-40fe-b35e-b1dd6839a020;1.0
	confs: [default]
	found io.delta#delta-core_2.12;2.3.0 in central
	found io.delta#delta-storage;2.3.0 in central
	found org.antlr#antlr4-runtime;4.8 in central
:: resolution report :: resolve 231ms :: artifacts dl 9ms
	:: modules in use:
	io.delta#delta-core_2.12;2.3.0 from central in [default]
	io.delta#delta-storage;2.3.0 from central in [default]
	org.antlr#antlr4-runtime;4.8 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   3   |   0   |   0

23/05/04 15:48:53 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.
23/05/04 15:49:02 WARN Client: Same path resource file:///home/ubuntu/.ivy2/jars/io.delta_delta-core_2.12-2.3.0.jar added multiple times to distributed cache.
23/05/04 15:49:02 WARN Client: Same path resource file:///home/ubuntu/.ivy2/jars/io.delta_delta-storage-2.3.0.jar added multiple times to distributed cache.
23/05/04 15:49:02 WARN Client: Same path resource file:///home/ubuntu/.ivy2/jars/org.antlr_antlr4-runtime-4.8.jar added multiple times to distributed cache.


ERROR:root:Exception while sending command.                      (15 + 8) / 200]
Traceback (most recent call last):
  File "/home/ubuntu/.local/lib/python3.8/site-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
RuntimeError: reentrant call inside <_io.BufferedReader name=65>

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/ubuntu/.local/lib/python3.8/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/home/ubuntu/.local/lib/python3.8/site-packages/py4j/clientserver.py", line 539, in send_command
    raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while sending or receiving
ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/home/ubuntu/.local/lib/python3.8/site-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart

Py4JError: An error occurred while calling o133.execute

[Stage 9:====>                                                   (16 + 8) / 200]