In [1]:
# SHUFFLE_HASH Implemantasion for Joins
from sedona.spark import *
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, StructType, IntegerType, FloatType, StringType, TimestampType
from pyspark.sql.functions import col, broadcast, date_format,  to_timestamp, round, format_number, concat, lit, when, count, sum, regexp_replace, trim
from pyspark.sql import Row
import time
import csv

hint_results = []
join_strategy = "SHUFFLE_HASH"
# Create spark Session
spark = SparkSession.builder \
    .appName("SHUFFLE_HASH Hint") \
    .getOrCreate()
spark.catalog.clearCache()

# Create sedona context
sedona = SedonaContext.create(spark)
# Read the file from s3
geojson_path = "s3://initial-notebook-data-bucket-dblab-905418150721/2010_Census_Blocks.geojson"
blocks_df = sedona.read.format("geojson") \
            .option("multiLine", "true").load(geojson_path) \
            .selectExpr("explode(features) as features") \
            .select("features.*")
# Formatting magic
flattened_df = blocks_df.select( \
                [col(f"properties.{col_name}").alias(col_name) for col_name in \
                blocks_df.schema["properties"].dataType.fieldNames()] + ["geometry"]) \
            .drop("properties") \
            .drop("type") \
            .filter(col("CITY") == "Los Angeles")

#Query 3 Initialisation with Dataframe
start_time = time.time()

# Crime dataframe with parquet for quicker esults
crime_df = spark.read.parquet("s3://groups-bucket-dblab-905418150721/group12/main_dataset_parquet")\
                .filter((col("LON") != 0) | (col("LAT") != 0))  \
                .withColumn("point",ST_Point("LON", "LAT")) \
                .select("point")

# Income dataframe with median income
income_df = spark.read.csv(
    "s3://initial-notebook-data-bucket-dblab-905418150721/LA_income_2015.csv",
    header=True,
    inferSchema=True
)

res_income = income_df.withColumn("Estimated_Median_Income",
    regexp_replace(trim(col("Estimated Median Income")), "[^0-9]", "").cast(IntegerType())
    ).drop("Community","Estimated Median Income")

# Filter population for total population anf houses
zip_comm_population = flattened_df.select("ZCTA10", "COMM", "POP_2010", "HOUSING10", "geometry") \
    .filter((col("ZCTA10") > 0) & (col("POP_2010") > 0) & (col("HOUSING10") > 0) & (trim(col("COMM")) != "")) \
    .groupBy("ZCTA10", "COMM") \
    .agg(
        sum("POP_2010").alias("Total_Population_Zip_COMM"),
        sum("HOUSING10").alias("Total_Housing"),
        ST_Union_Aggr("geometry").alias("geometry")
    )
    
# Join incoma and cansus using zip coded

joined_income_df = zip_comm_population.hint(join_strategy).join(
   res_income,      
    zip_comm_population["ZCTA10"] == res_income["Zip Code"],
    "inner"  
    ) \
    .drop("ZCTA10","Zip Code") \
    .withColumn("Total income",col("Total_Housing") * col("Estimated_Median_Income")) \
    .drop("Estimated_Median_Income") \
    .groupBy("COMM") \
    .agg(sum("Total_Population_Zip_COMM")
    .alias("Total_Population"),sum("Total income")
    .alias("Total Income"),ST_Union_Aggr("geometry")
    .alias("geometry")) 


results = joined_income_df.hint(join_strategy).join(
                crime_df,    
                ST_Within(crime_df.point, 
                joined_income_df.geometry), "inner") \
                .groupBy("COMM","Total_Population","Total Income") \
                .agg(count(col("*")).alias("Total crimes")) \
                .withColumn("crime_rate",format_number((col("Total crimes")/col("Total_Population")),4)) \
                .withColumn("estimated_median_income",concat(lit("$"),format_number(round(col("Total Income") / col("Total_Population")), 0))) \
                .drop("Total Income","Total_Population","Total crimes") \
                .withColumnRenamed("COMM", "Location") \
                .orderBy(col("crime_rate").desc())
end_time = time.time()
print(f"Execution Time : {end_time - start_time} seconds")
# Store Results
hint_results.append(Row(
        Join_Strategy = join_strategy,
        Execution_Time = end_time - start_time)
                   )

hint_df = spark.createDataFrame(hint_results)

hint_df.write.mode("append").json("s3://groups-bucket-dblab-905418150721/group12/Q3/hint_res.json")

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
4118,application_1732639283265_4058,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Execution Time : 6.636886835098267 seconds

In [2]:
# Explain for Shuffle_Hash
results.explain(mode="formatted")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

== Physical Plan ==
AdaptiveSparkPlan (30)
+- Sort (29)
   +- Exchange (28)
      +- HashAggregate (27)
         +- Exchange (26)
            +- HashAggregate (25)
               +- Project (24)
                  +- RangeJoin (23)
                     :- Filter (19)
                     :  +- ObjectHashAggregate (18)
                     :     +- Exchange (17)
                     :        +- ObjectHashAggregate (16)
                     :           +- Project (15)
                     :              +- ShuffledHashJoin Inner BuildLeft (14)
                     :                 :- Exchange (9)
                     :                 :  +- ObjectHashAggregate (8)
                     :                 :     +- Exchange (7)
                     :                 :        +- ObjectHashAggregate (6)
                     :                 :           +- Project (5)
                     :                 :              +- Filter (4)
                     :                 :                 +-