In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, count, desc, rank, sum as _sum
from pyspark.sql.types import *
from pyspark.sql.window import Window

# Start Spark Session
spark = SparkSession.builder \
    .appName("Query 3") \
    .config("spark.executor.memory", "4g") \
    .config("spark.executor.cores", "4") \
    .getOrCreate()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
2568,application_1732639283265_2527,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [1]:
# paths
crime_data_2010_2019 = "s3://initial-notebook-data-bucket-dblab-905418150721/CrimeData/Crime_Data_from_2010_to_2019_20241101.csv"
crime_data_2020 = "s3://initial-notebook-data-bucket-dblab-905418150721/CrimeData/Crime_Data_from_2020_to_Present_20241101.csv"
income_data = "s3://initial-notebook-data-bucket-dblab-905418150721/LA_income_2015.csv"
census_blocks = "s3://initial-notebook-data-bucket-dblab-905418150721/2010_Census_Blocks.geojson"

# Load datasets
crime_data = spark.read.csv(crime_data_2010_2019, header=True, inferSchema=True).union(
    spark.read.csv(crime_data_2020, header=True, inferSchema=True))
income_data = spark.read.csv(income_data, header=True, inferSchema=True)
census_data = spark.read.json(census_blocks)

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
2649,application_1732639283265_2608,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [17]:
from sedona.spark import *
from pyspark.sql.functions import col
from pyspark.sql import SparkSession

# # Create spark Session
# spark = SparkSession.builder \
#     .appName("GeoJSON read") \
#     .getOrCreate()

# Create sedona context
sedona = SedonaContext.create(spark)
blocks_data = sedona.read.format("geojson") \
            .option("multiLine", "true").load(census_blocks) \
            .selectExpr("explode(features) as features") \
            .select("features.*")
# Formatting magic
flattened_data = blocks_data.select( \
                [col(f"properties.{col_name}").alias(col_name) for col_name in \
                blocks_data.schema["properties"].dataType.fieldNames()] + ["geometry"]) \
            .drop("properties") \
            .drop("type")
# Print schema
flattened_data.printSchema()

#flattened_data.show(2)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- BG10: string (nullable = true)
 |-- BG10FIP10: string (nullable = true)
 |-- BG12: string (nullable = true)
 |-- CB10: string (nullable = true)
 |-- CEN_FIP13: string (nullable = true)
 |-- CITY: string (nullable = true)
 |-- CITYCOM: string (nullable = true)
 |-- COMM: string (nullable = true)
 |-- CT10: string (nullable = true)
 |-- CT12: string (nullable = true)
 |-- CTCB10: string (nullable = true)
 |-- HD_2012: long (nullable = true)
 |-- HD_NAME: string (nullable = true)
 |-- HOUSING10: long (nullable = true)
 |-- LA_FIP10: string (nullable = true)
 |-- OBJECTID: long (nullable = true)
 |-- POP_2010: long (nullable = true)
 |-- PUMA10: string (nullable = true)
 |-- SPA_2012: long (nullable = true)
 |-- SPA_NAME: string (nullable = true)
 |-- SUP_DIST: string (nullable = true)
 |-- SUP_LABEL: string (nullable = true)
 |-- ShapeSTArea: double (nullable = true)
 |-- ShapeSTLength: double (nullable = true)
 |-- ZCTA10: string (nullable = true)
 |-- geometry: geometry (nulla

In [18]:
from pyspark.sql.functions import col, sum as _sum
from pyspark.sql.functions import col, sum as _sum, expr
from pyspark.sql.functions import regexp_replace, col

# 1. Normalize and Aggregate Population Data
# Aggregate population data by ZCTA10
population_data = flattened_data.select(
    col("ZCTA10").alias("Zip_Code"),
    col("POP_2010").alias("Population")
).groupBy("Zip_Code").agg(_sum("Population").alias("Total_Population"))

# Show the result to verify
population_data.show(5, truncate=False)

income_data = income_data.withColumnRenamed("Estimated Median Income", "Median_Income") \
                     .withColumnRenamed("Zip Code", "Zip_Code") \
                     .withColumn("Median_Income", regexp_replace(col("Median_Income"), "[$,]", "").cast("double"))

# Show the result to verify
income_data.show(5, truncate=False)

# Aggregate crimes by area name
crime_areas_data = crime_data.groupBy("AREA NAME").agg({"DR_NO": "count"}).withColumnRenamed("count(DR_NO)", "Total_Crimes")

# Show the result to verify
crime_areas_data.show(5, truncate=False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------+----------------+
|Zip_Code|Total_Population|
+--------+----------------+
|90807   |31481           |
|91326   |33708           |
|90094   |5464            |
|91766   |66902           |
|90026   |67869           |
+--------+----------------+
only showing top 5 rows

+--------+--------------------------------------------------------------------------------------------+-------------+
|Zip_Code|Community                                                                                   |Median_Income|
+--------+--------------------------------------------------------------------------------------------+-------------+
|90001   |Los Angeles (South Los Angeles), Florence-Graham                                            |33887.0      |
|90002   |Los Angeles (Southeast Los Angeles, Watts)                                                  |30413.0      |
|90003   |Los Angeles (South Los Angeles, Southeast Los Angeles)                                      |30805.0      |
|90004   |Los A

In [19]:
#create geometry points for crimes using LAT and LON
crime_data = crime_data.withColumn("geom", ST_Point("LON", "LAT"))

# Aggregate geometries by ZIP code
zip_geometries = flattened_data.groupBy("ZCTA10").agg(
    ST_Union_Aggr("geometry").alias("geometry")
).withColumnRenamed("ZCTA10", "Zip_Code")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [6]:
# # Join crimes with ZIP code geometries using ST_Within
# crime_with_zip = crime_data.join(
#     zip_geometries,
#     ST_Within(crime_data["geom"], zip_geometries["geometry"]),
#     "inner"
# )

# # Aggregate total crimes by ZIP code
# crime_by_zip = crime_with_zip.groupBy("Zip_Code").agg( count("DR_NO").alias("Total_Crimes") )

# # Join population and income data
# pop_income_data = population_data.join(income_data, on="Zip_Code", how="inner")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [20]:
# Geospatial join: Assign ZIP codes to AREA NAMEs using crime data
zip_to_area = crime_data.select("AREA NAME", "geom").join(
    zip_geometries,
    ST_Within(crime_data["geom"], zip_geometries["geometry"]),
    "inner"
).select("Zip_Code", "AREA NAME").distinct()

# Show the mapping
zip_to_area.show()



FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------+----------+
|Zip_Code| AREA NAME|
+--------+----------+
|   90266|Hollenbeck|
|   90008|  Wilshire|
|   90045|    Harbor|
|   90304|   Pacific|
|   90024|   Pacific|
|   90404|    Newton|
|   90232|  Wilshire|
|   90073|   West LA|
|   90403|  Wilshire|
|   90245|   Olympic|
|   90401|Hollenbeck|
|   90049|   West LA|
|   90302|   Pacific|
|   90405|   Pacific|
|   90405|   Central|
|   90043|   Pacific|
|   90245|   Pacific|
|   90018| Southwest|
|   90291|   Olympic|
|   90005|  Wilshire|
+--------+----------+
only showing top 20 rows

In [24]:
# Combine geospatial join and crime aggregation
zip_to_area_crime = crime_data.join(
    zip_geometries,
    ST_Within(crime_data["geom"], zip_geometries["geometry"]),
    "inner"
).groupBy("Zip_Code", "AREA NAME").agg(
    count("DR_NO").alias("Total_Crimes")
)

# Show results for verification
# zip_to_area_crime.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [25]:
# Join ZIP code-based population and income with crime data
area_population_income = zip_to_area_crime.join(population_data, on="Zip_Code", how="inner") \
    .join(income_data, on="Zip_Code", how="inner")

# Aggregate total population, average income, and crimes by AREA NAME
final_area_data = area_population_income.groupBy("AREA NAME").agg(
    _sum("Total_Population").alias("Total_Population"),
    (_sum("Median_Income") / _sum("Total_Population")).alias("Avg_Income_Per_Person"),
    _sum("Total_Crimes").alias("Total_Crimes")
)

# Show results for verification
# final_area_data.show(truncate=False)


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [26]:
# Calculate metrics: Average income per person and crimes per person
metrics_by_area = final_area_data.select(
    col("AREA NAME"),
    col("Avg_Income_Per_Person"),
    (col("Total_Crimes") / col("Total_Population")).alias("Crimes_Per_Person")
)

# Show the final result
metrics_by_area.show(truncate=False)


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------+---------------------+-------------------+
|AREA NAME  |Avg_Income_Per_Person|Crimes_Per_Person  |
+-----------+---------------------+-------------------+
|Harbor     |1.4934860695460446   |0.0566400113534792 |
|N Hollywood|1.3563236944348895   |0.11537073481563351|
|Van Nuys   |1.912627858121802    |0.12540546833246022|
|West Valley|1.7451880929912311   |0.10015834679547672|
|Wilshire   |1.5595552388254463   |0.0876788768698825 |
|Topanga    |1.8691518914045722   |0.11288243232127776|
|Foothill   |1.6931473127605596   |0.09648812181490662|
|Devonshire |1.4373115668557885   |0.10632421404220281|
|Pacific    |1.5193721551315027   |0.0457701780115032 |
|Central    |1.2982976174407537   |0.07162145829528843|
|West LA    |2.518749265028811    |0.10631103260752178|
|Northeast  |1.4655195533922605   |0.08425209474024864|
|Mission    |1.457785754352768    |0.09859339630219838|
|Southeast  |0.978406514446836    |0.07837444443200792|
|Olympic    |1.6105674771799032   |0.10683180560

In [28]:
# Explain the query plan
metrics_by_area.explain(mode="formatted")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

== Physical Plan ==
AdaptiveSparkPlan (44)
+- HashAggregate (43)
   +- Exchange (42)
      +- HashAggregate (41)
         +- Project (40)
            +- BroadcastHashJoin Inner BuildRight (39)
               :- Project (34)
               :  +- SortMergeJoin Inner (33)
               :     :- Sort (23)
               :     :  +- Exchange (22)
               :     :     +- HashAggregate (21)
               :     :        +- Exchange (20)
               :     :           +- HashAggregate (19)
               :     :              +- Project (18)
               :     :                 +- RangeJoin (17)
               :     :                    :- Union (7)
               :     :                    :  :- Project (3)
               :     :                    :  :  +- Filter (2)
               :     :                    :  :     +- Scan csv  (1)
               :     :                    :  +- Project (6)
               :     :                    :     +- Filter (5)
               :     :      

In [29]:
import time

start_time = time.time()

# Broadcast join
broadcast_join = zip_to_area_crime.hint("BROADCAST").join(
    population_data.hint("BROADCAST"),
    on="Zip_Code",
    how="inner"
).join(
    income_data.hint("BROADCAST"),
    on="Zip_Code",
    how="inner"
)

broadcast_join.show(5, truncate=False)
broadcast_join.explain(True)

# End timing
end_time = time.time()
elapsed_time_broadcast = end_time - start_time
print(f"Broadcast Join Time: {elapsed_time_broadcast:.2f} seconds")


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------+----------+------------+----------------+-------------------------------------------------------------------------------------------------------+-------------+
|Zip_Code|AREA NAME |Total_Crimes|Total_Population|Community                                                                                              |Median_Income|
+--------+----------+------------+----------------+-------------------------------------------------------------------------------------------------------+-------------+
|90723   |Hollenbeck|2           |54099           |Paramount                                                                                              |45792.0      |
|90806   |Harbor    |1           |42399           |Long Beach                                                                                             |44144.0      |
|91105   |Pacific   |4           |11254           |Pasadena                                                                                           

In [30]:
import time

start_time = time.time()

# Merge join
merge_join = zip_to_area_crime.hint("MERGE").join(
    population_data.hint("MERGE"),
    on="Zip_Code",
    how="inner"
).join(
    income_data.hint("MERGE"),
    on="Zip_Code",
    how="inner"
)

merge_join.show(5, truncate=False)
merge_join.explain(True)

# End timing
end_time = time.time()
elapsed_time_merge = end_time - start_time
print(f"Merge Join Time: {elapsed_time_merge:.2f} seconds")


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------+-----------+------------+----------------+------------------------------------------------+-------------+
|Zip_Code|AREA NAME  |Total_Crimes|Total_Population|Community                                       |Median_Income|
+--------+-----------+------------+----------------+------------------------------------------------+-------------+
|90001   |77th Street|9872        |57110           |Los Angeles (South Los Angeles), Florence-Graham|33887.0      |
|90001   |Southwest  |2           |57110           |Los Angeles (South Los Angeles), Florence-Graham|33887.0      |
|90001   |Harbor     |1           |57110           |Los Angeles (South Los Angeles), Florence-Graham|33887.0      |
|90001   |Newton     |1759        |57110           |Los Angeles (South Los Angeles), Florence-Graham|33887.0      |
|90001   |Southeast  |882         |57110           |Los Angeles (South Los Angeles), Florence-Graham|33887.0      |
+--------+-----------+------------+----------------+--------------------

In [31]:
import time

start_time = time.time()

# Shuffle hash join
shuffle_hash_join = zip_to_area_crime.hint("SHUFFLE_HASH").join(
    population_data.hint("SHUFFLE_HASH"),
    on="Zip_Code",
    how="inner"
).join(
    income_data.hint("SHUFFLE_HASH"),
    on="Zip_Code",
    how="inner"
)

shuffle_hash_join.show(5, truncate=False)
shuffle_hash_join.explain(True)

# End timing
end_time = time.time()
elapsed_time_shuffle_hash = end_time - start_time
print(f"Shuffle Hash Join Time: {elapsed_time_shuffle_hash:.2f} seconds")


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------+---------+------------+----------------+------------------------------------------------+-------------+
|Zip_Code|AREA NAME|Total_Crimes|Total_Population|Community                                       |Median_Income|
+--------+---------+------------+----------------+------------------------------------------------+-------------+
|90069   |Hollywood|2371        |20483           |Los Angeles (Hollywood, Melrose), West Hollywood|78979.0      |
|90069   |Olympic  |2           |20483           |Los Angeles (Hollywood, Melrose), West Hollywood|78979.0      |
|90069   |Wilshire |2356        |20483           |Los Angeles (Hollywood, Melrose), West Hollywood|78979.0      |
|90094   |Pacific  |3599        |5464            |Los Angeles (Playa Vista)                       |104367.0     |
|90304   |Pacific  |129         |28210           |Inglewood, Lennox                               |36412.0      |
+--------+---------+------------+----------------+--------------------------------------

In [32]:
import time

start_time = time.time()

# Shuffle replicate nested loop join
shuffle_replicate_join = zip_to_area_crime.hint("SHUFFLE_REPLICATE_NL").join(
    population_data.hint("SHUFFLE_REPLICATE_NL"),
    on="Zip_Code",
    how="inner"
).join(
    income_data.hint("SHUFFLE_REPLICATE_NL"),
    on="Zip_Code",
    how="inner"
)

shuffle_replicate_join.show(5, truncate=False)
shuffle_replicate_join.explain(True)

# End timing
end_time = time.time()
elapsed_time_shuffle_replicate = end_time - start_time
print(f"Shuffle Replicate NL Join Time: {elapsed_time_shuffle_replicate:.2f} seconds")


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------+-----------+------------+----------------+----------------------------------------------------------------+-------------+
|Zip_Code|AREA NAME  |Total_Crimes|Total_Population|Community                                                       |Median_Income|
+--------+-----------+------------+----------------+----------------------------------------------------------------+-------------+
|90003   |77th Street|29994       |66266           |Los Angeles (South Los Angeles, Southeast Los Angeles)          |30805.0      |
|90044   |Southeast  |24003       |89779           |Athens, Los Angeles (South Los Angeles)                         |29206.0      |
|90275   |Hollenbeck |1           |41804           |Palos Verdes Estates, Rancho Palos Verdes, Rolling Hills Estates|118790.0     |
|90044   |77th Street|51293       |89779           |Athens, Los Angeles (South Los Angeles)                         |29206.0      |
|90044   |Newton     |2           |89779           |Athens, Los Angeles (Sou

In [34]:
print(f"Broadcast Join Time: {elapsed_time_broadcast:.2f} seconds")
print(f"Merge Join Time: {elapsed_time_merge:.2f} seconds")
print(f"Suffle Hash Join Time: {elapsed_time_shuffle_hash:.2f} seconds")
print(f"Shuffle Replicate NL Join Time: {elapsed_time_shuffle_replicate:.2f} seconds")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Broadcast Join Time: 635.54 seconds
Merge Join Time: 433.70 seconds
Suffle Hash Join Time: 565.34 seconds
Shuffle Replicate NL Join Time: 521.14 seconds