# Query 3

In [1]:
from sedona.spark import *
from pyspark.sql.functions import col, sum, collect_list, regexp_replace, first
from pyspark.sql import SparkSession
from pyspark.sql.functions import min, count, avg
import time

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
2455,application_1732639283265_2414,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [4]:
def measure_join_performance(df, description):
    start_time = time.time()
    df.count()  # Trigger the join
    end_time = time.time()
    print(f"{description} took {end_time - start_time:.2f} seconds")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [5]:
# Initialize Spark session
spark = SparkSession.builder \
    .appName("Query 3") \
    .getOrCreate()

# Create sedona context
sedona = SedonaContext.create(spark)

# Load the crime data, LA income and RE codes csv 
crime_data_1 = spark.read.csv("s3://initial-notebook-data-bucket-dblab-905418150721/CrimeData/Crime_Data_from_2010_to_2019_20241101.csv", header=True, inferSchema=True,)
crime_data_2 = spark.read.csv("s3://initial-notebook-data-bucket-dblab-905418150721/CrimeData/Crime_Data_from_2020_to_Present_20241101.csv", header=True, inferSchema=True,)
la_income = spark.read.csv("s3://initial-notebook-data-bucket-dblab-905418150721/LA_income_2015.csv", header=True, inferSchema=True)
police_stations = spark.read.csv("s3://initial-notebook-data-bucket-dblab-905418150721/LA_Police_Stations.csv", header=True, inferSchema=True)
explanation = spark.read.csv("s3://initial-notebook-data-bucket-dblab-905418150721/2010_Census_Blocks_fields.csv", header=True, inferSchema=True)
crime_data = crime_data_1.union(crime_data_2)


geojson_path = "s3://initial-notebook-data-bucket-dblab-905418150721/2010_Census_Blocks.geojson"
blocks_df = sedona.read.format("geojson") \
            .option("multiLine", "true").load(geojson_path) \
            .selectExpr("explode(features) as features") \
            .select("features.*")

# Formatting magic
flattened_blocks_df = blocks_df.select( \
                [col(f"properties.{col_name}").alias(col_name) for col_name in \
                blocks_df.schema["properties"].dataType.fieldNames()] + ["geometry"]) \
            .drop("properties") \
            .drop("type")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [10]:
# Filter out unpopulated regions or regions without a name
populated_blocks = flattened_blocks_df.filter((flattened_blocks_df.COMM != " "))

# Calculate total population per region-Zip Code pair (e.g., "Community-Zip Code", "COMM-ZCTA10")
comm_pop = populated_blocks.groupBy("COMM", "ZCTA10").agg(
    sum("POP_2010").alias("total_population"),
    sum("HOUSING10").alias("total_households"),
    ST_Union_Aggr("geometry").alias("geometry_array")
)
# Clean the 'Estimated Median Income' column from $ characters
la_income_clean = la_income.withColumn(
    "Estimated Median Income",
    regexp_replace(col("Estimated Median Income"), "[^0-9]", "").cast("double")
)

# Join the dataframe containing the total_population and total_housing with the income
comm_income = comm_pop.join(
    la_income_clean, 
    ((la_income_clean["Community"].contains(comm_pop["COMM"])) 
      & (comm_pop["ZCTA10"] == la_income_clean["Zip Code"])), "inner"
).drop(la_income_clean["Community"])

comm_income_first = comm_income.select(
    col("COMM").alias("region"),
    col("ZCTA10").alias("Zip Code"),
    "total_population",
    col("Estimated Median Income").alias("median_income"),
    "total_households"
)

# Keep only the columns of interest in the comm_income
comm_income_final = comm_income_first.groupBy("region").agg(
    sum(col("total_population")).alias("region_total_population_income"),
    sum(col("total_households")*col("median_income")).alias("region_total_household_income")
)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Crimes Per Person

In [11]:
# Create geometry column for crime data
crime_data = crime_data.filter((col("LON") != 0) | (col("LAT") != 0)) 
crime_data_geom = crime_data.withColumn("geom", ST_Point(col("LON"), col("LAT")))
crime_data_location = crime_data_geom.join(comm_pop, ST_Within(crime_data_geom["geom"], comm_pop["geometry_array"]), "inner")

# Group the crime_pop dataframe by "COMM" column, and count the number of crimes commited per region.
crime_num = crime_data_location.groupBy("COMM", "ZCTA10").agg(
    first("total_population").alias("total_population"),
    first("total_households").alias("total_households"),
    count("*").alias("total_crime_number")
)

# Calculate total region population
region_totals = crime_num.groupBy("COMM").agg(
    sum("total_population").alias("region_total_population_crime"),
    sum("total_crime_number").alias("region_total_crime_number")
)


income_result = comm_income_final.withColumn(
    "income_per_person",
    (col("region_total_household_income") / col("region_total_population_income"))
)

crime_result = region_totals.withColumn(
    "crimes_per_person",
    (col("region_total_crime_number")/col("region_total_population_crime"))
)

result = income_result.join(
    crime_result,
    (crime_result["COMM"] == income_result["region"]), "inner"
).select("region", "income_per_person", "crimes_per_person")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
result.orderBy(col("income_per_person").desc()).show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Testing 
 Find the join strategies chosen by the Catalyst Optimizer using .explain() and test with different strategies using .hint() for every join 

In [12]:
######################################## 1st Join ###############################

# Use the "explain" function to find out the optimizer's choice on join strategy
comm_income.explain(True)

# Experiment with join strategies 
comm_income_broadcast = comm_pop.join(
    la_income_clean.hint("broadcast"),
    ((la_income_clean["Community"].contains(comm_pop["COMM"])) 
      & (comm_pop["ZCTA10"] == la_income_clean["Zip Code"])), "inner"
)

comm_income_merge = comm_pop.join(
    la_income_clean.hint("merge"),
   ((la_income_clean["Community"].contains(comm_pop["COMM"])) 
      & (comm_pop["ZCTA10"] == la_income_clean["Zip Code"])), "inner"
)


comm_income_shuffle_hash = comm_pop.join(
    la_income_clean.hint("shuffle_hash"),
  ((la_income_clean["Community"].contains(comm_pop["COMM"])) 
      & (comm_pop["ZCTA10"] == la_income_clean["Zip Code"])), "inner"
)

comm_income_shuffle_replicate_nl = comm_pop.join(
    la_income_clean.hint("shuffle_replicate_nl"),
   ((la_income_clean["Community"].contains(comm_pop["COMM"])) 
      & (comm_pop["ZCTA10"] == la_income_clean["Zip Code"])), "inner"
)

# Measure their performances 
measure_join_performance(comm_income_broadcast, "Broadcast Join")
measure_join_performance(comm_income_merge, "Sort-Merge Join")
measure_join_performance(comm_income_shuffle_hash, "Shuffle Hash Join")
measure_join_performance(comm_income_shuffle_replicate_nl, "Replicated Nested Loop Join")

######################################## 2nd Join ###############################

crime_data_location.explain(True)

crime_data_location_broadcast = crime_data_geom.join(populated_blocks.hint("broadcast"), ST_Within(crime_data_geom["geom"], flattened_blocks_df["geometry"]), "inner")
crime_data_location_broadcast.explain(True)

crime_data_location_shuffle_hash = crime_data_geom.join(populated_blocks.hint("shuffle_hash"), ST_Within(crime_data_geom["geom"], flattened_blocks_df["geometry"]), "inner")
crime_data_location_shuffle_hash.explain(True)

crime_data_location_merge = crime_data_geom.join(populated_blocks.hint("merge"), ST_Within(crime_data_geom["geom"], flattened_blocks_df["geometry"]), "inner")
crime_data_location_merge.explain(True)

crime_data_location_shuffle_replicate_nl = crime_data_geom.join(populated_blocks.hint("shuffle_replicate_nl"), ST_Within(crime_data_geom["geom"], flattened_blocks_df["geometry"]), "inner")
crime_data_location_shuffle_replicate_nl.explain(True)


# Measure Performances
measure_join_performance(crime_data_location_broadcast, "Broadcast Join - crime_data_location")
measure_join_performance(crime_data_location_shuffle_hash, "Sort-Merge Join - crime_data_location")
measure_join_performance(crime_data_location_merge, "Shuffle Hash Join - crime_data_location")
measure_join_performance(crime_data_location_shuffle_replicate_nl, "Replicated Nested Loop Join - crime_data_location")


######################################## 3rd Join ###############################
result.explain()
# Experiment with join strategies 
result_broadcast = income_result.join(
    crime_result.hint("broadcast"),
    (crime_result["COMM"] == income_result["region"]), "inner"
)

result_shuffle_hash = income_result.join(
    crime_result.hint("shuffle_hash"), 
    (crime_result["COMM"] == income_result["region"]), "inner"
)

result_merge = income_result.join(
    crime_result.hint("merge"), 
    (crime_result["COMM"] == income_result["region"]), "inner"
)

result_shuffle_replicate_nl = income_result.join(
    crime_result.hint("shuffle_replicate_nl"), 
    (crime_result["COMM"] == income_result["region"]), "inner"
)

# Measure Performances
measure_join_performance(result_broadcast, "Broadcast Join - result")
measure_join_performance(result_merge, "Sort-Merge Join - result")
measure_join_performance(result_merge, "Shuffle Hash Join - result")
measure_join_performance(result_shuffle_replicate_nl, "Replicated Nested Loop Join - result")


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

== Parsed Logical Plan ==
'DataFrameDropColumns [Community#604]
+- Join Inner, (Contains(Community#604, COMM#716) AND (cast(ZCTA10#733 as int) = Zip Code#603))
   :- Aggregate [COMM#716, ZCTA10#733], [COMM#716, ZCTA10#733, sum(POP_2010#725L) AS total_population#1755L, sum(HOUSING10#722L) AS total_households#1757L, st_union_aggr(geometry#703, org.apache.spark.sql.sedona_sql.expressions.ST_Union_Aggr@54c5b8bd, class[value[0]: geometry], class[value[0]: array<geometry>], true, true, 0, 0, None) AS geometry_array#1762]
   :  +- Filter NOT (COMM#716 =  )
   :     +- Project [properties#704.BG10 AS BG10#709, properties#704.BG10FIP10 AS BG10FIP10#710, properties#704.BG12 AS BG12#711, properties#704.CB10 AS CB10#712, properties#704.CEN_FIP13 AS CEN_FIP13#713, properties#704.CITY AS CITY#714, properties#704.CITYCOM AS CITYCOM#715, properties#704.COMM AS COMM#716, properties#704.CT10 AS CT10#717, properties#704.CT12 AS CT12#718, properties#704.CTCB10 AS CTCB10#719, properties#704.HD_2012 AS HD_2

In [51]:
row_count = result.count()
print(f"The number of rows in result is: {row_count}")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

The number of rows in result is: 163