In [1]:
from sedona.spark import *
from pyspark.sql.functions import col
from pyspark.sql import SparkSession

# Create spark Session
spark = SparkSession.builder \
    .appName("GeoJSON read") \
    .getOrCreate()

# Create sedona context
sedona = SedonaContext.create(spark)
# Read the file from s3
geojson_path = "s3://initial-notebook-data-bucket-dblab-905418150721/2010_Census_Blocks.geojson"
blocks_df = sedona.read.format("geojson") \
            .option("multiLine", "true").load(geojson_path) \
            .selectExpr("explode(features) as features") \
            .select("features.*")
# Formatting magic
flattened_df = blocks_df.select( \
                [col(f"properties.{col_name}").alias(col_name) for col_name in \
                blocks_df.schema["properties"].dataType.fieldNames()] + ["geometry"]) \
            .drop("properties") \
            .drop("type") \
            .filter(col("CITY") == "Los Angeles")



Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
3454,application_1732639283265_3410,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [2]:
## Solution using Dataframe
from sedona.spark import *
from pyspark.sql.functions import date_format, to_timestamp, round, format_number, concat, lit ,when, col, count , sum ,regexp_replace ,trim
from pyspark.sql.types import FloatType,IntegerType
import time


spark = SparkSession.builder.appName("Query 2 DF").getOrCreate()
spark.catalog.clearCache()
conf = spark.sparkContext.getConf()

# Print relevant executor settings
print("Executor Instances:", conf.get("spark.executor.instances"))
print("Executor Memory:", conf.get("spark.executor.memory"))
print("Executor Cores:", conf.get("spark.executor.cores"))

csv_files = [
    "s3://initial-notebook-data-bucket-dblab-905418150721/CrimeData/Crime_Data_from_2020_to_Present_20241101.csv",
    "s3://initial-notebook-data-bucket-dblab-905418150721/CrimeData/Crime_Data_from_2010_to_2019_20241101.csv"
]

crime_data_df = spark.read.csv(csv_files, header=True, inferSchema=True)
income_data = spark.read.csv("s3://initial-notebook-data-bucket-dblab-905418150721/LA_income_2015.csv", header=True, inferSchema=True)
# convert median income to int
start_time = time.time()
income_data_cleaned = income_data.withColumn(
    "Estimated_Median_Income",
    regexp_replace(trim(col("Estimated Median Income")), "[^0-9]", "").cast(IntegerType())
    ).drop("Community","Estimated Median Income")

#Filter basic dataset and find total population and total number of houses
zip_comm_population = flattened_df.select("ZCTA10","COMM","POP_2010","HOUSING10","geometry").filter((col("ZCTA10")>0) & ((col("POP_2010")>0) & (col("HOUSING10")>0)) & (trim(col("COMM"))!="")) \
                    .groupBy("ZCTA10","COMM").agg(sum("POP_2010").alias("Total_Population_Zip_COMM"),sum("HOUSING10").alias("Total_Housing"),ST_Union_Aggr("geometry").alias("geometry"))

# join income data and census blocks data using zip code
# then calculate total income,total population and Estimated_Median_Income for each comm 
joined_income_dataset = zip_comm_population.join(
    income_data_cleaned,
    zip_comm_population["ZCTA10"] == income_data_cleaned["Zip Code"],
    "inner"  
    ) \
    .drop("ZCTA10","Zip Code") \
    .withColumn("Total income",col("Total_Housing") * col("Estimated_Median_Income")) \
    .drop("Estimated_Median_Income") \
    .groupBy("COMM") \
    .agg(sum("Total_Population_Zip_COMM").alias("Total_Population"),sum("Total income").alias("Total Income"),ST_Union_Aggr("geometry").alias("geometry")) 
crime_data_with_geometry = crime_data_df.withColumn("geom", ST_Point("LON", "LAT")).select("geom")
final_dataset = joined_income_dataset.join(
                crime_data_with_geometry, 
                ST_Within(crime_data_with_geometry.geom, 
                joined_income_dataset.geometry), "inner") \
                .groupBy("COMM","Total_Population","Total Income") \
                .agg(count(col("*")).alias("Total crimes")) \
                .withColumn("Crime rate",format_number((col("Total crimes")/col("Total_Population")),4)) \
                .withColumn("Estimated_Median_Income",concat(lit("$"),format_number(round(col("Total Income") / col("Total_Population")), 0))) \
                .drop("Total Income","Total_Population","Total crimes") \
                .orderBy("COMM")
final_dataset.show() 
end_time = time.time()
# End timing
print(f"Execution Time : {end_time - start_time} seconds")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Executor Instances: None
Executor Memory: 4743M
Executor Cores: 2
+--------------------+----------+-----------------------+
|                COMM|Crime rate|Estimated_Median_Income|
+--------------------+----------+-----------------------+
|     Adams-Normandie|    0.7149|                 $8,791|
|              Alsace|    0.5416|                $11,240|
|Angeles National ...|    0.4118|                $33,080|
|    Angelino Heights|    0.5733|                $18,427|
|              Arleta|    0.4265|                $12,111|
|     Atwater Village|    0.5288|                $28,481|
|       Baldwin Hills|    0.9950|                $17,304|
|             Bel Air|    0.3992|                $63,041|
|       Beverly Crest|    0.3690|                $60,947|
|         Beverlywood|    0.5085|                $29,268|
|       Boyle Heights|    0.6172|                 $8,494|
|           Brentwood|    0.4059|                $60,847|
|           Brookside|    0.8857|                $18,139|
|    C

In [3]:
# use explain to get the physican plan of the catalyst optimizer
final_dataset.explain(mode="formatted")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

== Physical Plan ==
AdaptiveSparkPlan (29)
+- Sort (28)
   +- Exchange (27)
      +- HashAggregate (26)
         +- Exchange (25)
            +- HashAggregate (24)
               +- Project (23)
                  +- RangeJoin (22)
                     :- Filter (18)
                     :  +- ObjectHashAggregate (17)
                     :     +- Exchange (16)
                     :        +- ObjectHashAggregate (15)
                     :           +- Project (14)
                     :              +- BroadcastHashJoin Inner BuildRight (13)
                     :                 :- ObjectHashAggregate (8)
                     :                 :  +- Exchange (7)
                     :                 :     +- ObjectHashAggregate (6)
                     :                 :        +- Project (5)
                     :                 :           +- Filter (4)
                     :                 :              +- Generate (3)
                     :                 :                 

In [4]:
# For the report to justify the use of broadcast join by the catalyst optimizer
num_rows = income_data_cleaned.count()
num_columns = len(income_data_cleaned.columns)
print(f"Income_data_cleaned number of columns: {num_columns}")
print(f"Income_data_cleaned number of entries: {num_rows}")


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Income_data_cleaned number of columns: 2
Income_data_cleaned number of entries: 284

In [5]:
from pyspark.sql import Row
import time
#the join strategies tested are the following:
#join_strategies = ["SHUFFLE_REPLICATE_NL","MERGE","BROADCAST", "SHUFFLE_HASH"]

results = []
first_join_strategy = "SHUFFLE_HASH"
second_join_strategy = "SHUFFLE_HASH"
# We apply the hint to set the join strategy used
# the code executes the same query as presented above
spark.catalog.clearCache()
start_time = time.time()
joined_income_dataset = zip_comm_population.hint(first_join_strategy).join(
    income_data_cleaned.hint(first_join_strategy),
    zip_comm_population["ZCTA10"] == income_data_cleaned["Zip Code"],
    "inner"
) \
.drop("ZCTA10", "Zip Code") \
.withColumn("Total income", col("Total_Housing") * col("Estimated_Median_Income")) \
.drop("Estimated_Median_Income") \
.groupBy("COMM") \
.agg(
    sum("Total_Population_Zip_COMM").alias("Total_Population"),
    sum("Total income").alias("Total Income"),
    ST_Union_Aggr("geometry").alias("geometry")
)

crime_data_with_geometry = crime_data_df.withColumn("geom", ST_Point("LON", "LAT")).select("geom")

final_dataset = joined_income_dataset.hint(second_join_strategy).join(
    crime_data_with_geometry.hint(second_join_strategy),
    ST_Within(crime_data_with_geometry.geom, joined_income_dataset.geometry),
    "inner"
) \
.groupBy("COMM", "Total_Population", "Total Income") \
.agg(count(col("*")).alias("Total crimes")) \
.withColumn("Crime rate", format_number((col("Total crimes") / col("Total_Population")), 4)) \
.withColumn(
    "Estimated_Median_Income",
    concat(lit("$"), format_number(round(col("Total Income") / col("Total_Population")), 0))
) \
.drop("Total Income") \
.orderBy("COMM")

final_dataset.collect() 
end_time = time.time()
# Store the result
results.append(Row(
    First_Join_Strategy=first_join_strategy,
    Second_Join_Strategy=second_join_strategy,
    Execution_Time=end_time - start_time
))

results_df = spark.createDataFrame(results)

# Save the DataFrame to S3 as JSON
# remember, set write mode to overwrite the first time you run this in order to overwrite the current result file
# if you dont do this you will get duplicate entries
# after the first execution se it back to append mode
results_df.write.mode("overwrite").json("s3://groups-bucket-dblab-905418150721/group45/q3/res.json")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [6]:
from pyspark.sql.functions import col, format_number
#get the results
results_df = spark.read.json("s3://groups-bucket-dblab-905418150721/group45/q3/results.json")
results_df.select(col("First_Join_Strategy"),col("Second_Join_Strategy"),format_number(col("Execution_Time"), 3).alias("Execution_Time")).orderBy("Execution_Time").show(truncate=False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+--------------------+--------------+
|First_Join_Strategy |Second_Join_Strategy|Execution_Time|
+--------------------+--------------------+--------------+
|BROADCAST           |SHUFFLE_HASH        |18.962        |
|SHUFFLE_HASH        |MERGE               |20.181        |
|SHUFFLE_REPLICATE_NL|BROADCAST           |20.278        |
|BROADCAST           |MERGE               |20.745        |
|SHUFFLE_REPLICATE_NL|SHUFFLE_REPLICATE_NL|21.029        |
|MERGE               |SHUFFLE_HASH        |21.050        |
|SHUFFLE_HASH        |SHUFFLE_HASH        |21.169        |
|BROADCAST           |SHUFFLE_REPLICATE_NL|21.249        |
|SHUFFLE_REPLICATE_NL|MERGE               |21.499        |
|SHUFFLE_HASH        |BROADCAST           |21.720        |
|SHUFFLE_HASH        |SHUFFLE_REPLICATE_NL|22.807        |
|MERGE               |SHUFFLE_REPLICATE_NL|25.419        |
|BROADCAST           |BROADCAST           |25.811        |
|MERGE               |BROADCAST           |26.330       