In [12]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from sedona.register import SedonaRegistrator
from sedona.sql.types import GeometryType
from sedona.spark import *

# Initialize Spark session
spark = SparkSession.builder \
    .appName("IncomeCriminalityPerCommunity") \
    .master("yarn") \
    .config("spark.executor.instances", "4") \
    .config("spark.sql.extensions", "org.apache.sedona.sql.SedonaSqlExtensions") \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .config("spark.kryo.registrator", "org.apache.sedona.core.serde.SedonaKryoRegistrator") \
    .getOrCreate()

# Register Sedona functions
SedonaRegistrator.registerAll(spark)

# Create sedona context
sedona = SedonaContext.create(spark)

census_path = "s3://initial-notebook-data-bucket-dblab-905418150721/2010_Census_Blocks.geojson"
census_raw = sedona.read.format("geojson") \
            .option("multiLine", "true").load(census_path) \
            .selectExpr("explode(features) as features") \
            .select("features.*")      
census_data = census_raw.select( \
                [col(f"properties.{col_name}").alias(col_name) for col_name in \
                census_raw.schema["properties"].dataType.fieldNames()] + ["geometry"]) \
            .drop("properties") \
            .drop("type")

income_path = "s3://initial-notebook-data-bucket-dblab-905418150721/LA_income_2015.csv"
income_data = spark.read.csv(
    income_path,
    header=True
)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [13]:
from pyspark.sql.functions import col, regexp_replace, expr

# Transform census_data
census_transformed = census_data.select(
        col("COMM").alias("community"),
        col("POP_2010").alias("population"),
        col("ZCTA10").alias("zipcode"),
        col("geometry"),
        col("HOUSING10").alias("households"),
        col("CITY").alias("city")
)

# Transform income_data
income_transformed = income_data.select(
        regexp_replace(
            regexp_replace(col("Estimated Median Income"), r"\$", ""), 
            r",", ""
        ).alias("household_income"),
        col("Zip Code").alias("zipcode"),
        col("Community").alias("community"))
    

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [24]:
c = census_transformed.filter((col("city") == "Los Angeles"))
i = income_transformed.filter(col("community").startswith("Los Angeles"))

# Count unique zip codes and communities in Census Data
unique_zipcodes_census = c.select("zipcode").distinct().count()
unique_communities_census = c.select("community").distinct().count()
blocks_census = c.count()

# Count unique zip codes and communities in Income Data
unique_zipcodes_income = i.select("zipcode").distinct().count()
unique_communities_income = i.select("community").distinct().count()
rows_income = i.count()

# Print the results
print(f"Rows (Blocks) in Densus Data (LA City): {blocks_census}")
print(f"Unique Zip Codes in Census Data (LA City): {unique_zipcodes_census}")
print(f"Unique Communities in Census Data (LA City): {unique_communities_census}")
print(f"Rows in Income Data (LA City): {rows_income}")
print(f"Unique Zip Codes in Income Data (LA City): {unique_zipcodes_income}")
print(f"Unique Communities in Income Data (LA City): {unique_communities_income}")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Rows (Blocks) in Densus Data (LA City): 30637
Unique Zip Codes in Census Data (LA City): 135
Unique Communities in Census Data (LA City): 139
Rows in Income Data (LA City): 98
Unique Zip Codes in Income Data (LA City): 98
Unique Communities in Income Data (LA City): 89

In [15]:
# Load Crime CSV Files
crime_raw_1 = spark.read.csv(
    "s3://initial-notebook-data-bucket-dblab-905418150721/CrimeData/Crime_Data_from_2010_to_2019_20241101.csv",
    header=True
)
crime_raw_2 = spark.read.csv(
    "s3://initial-notebook-data-bucket-dblab-905418150721/CrimeData/Crime_Data_from_2020_to_Present_20241101.csv",
    header=True
)

crimes = crime_raw_1.union(crime_raw_2).select(ST_Point("LON", "LAT").alias("point"))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [16]:
# Perform the join, aggregation, and calculation
results = (
    census_transformed
    .filter((col("city") == "Los Angeles"))
    .join(
            income_transformed.filter(col("community").startswith("Los Angeles")), 
            on="zipcode", 
            how="inner")
    .select(census_transformed["zipcode"], 
            census_transformed["community"], 
            census_transformed["geometry"],
            census_transformed["population"],
            census_transformed["households"],
            income_transformed["household_income"])
    .groupBy("community")
    .agg(
        sum("population").alias("total_population"),
        (sum(col("households") * col("household_income")) / sum("population")).alias("average_income"),
        ST_Union_Aggr("geometry").alias("geometry"))
    .join(crimes, ST_Within("point", "geometry"), "inner")
    .groupBy("community")
    .agg(
        first("average_income").alias("Average Income"),
        (count("*") / first("total_population")).alias("Average Crimes Per Person")
    )
)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [17]:
results.explain(mode="formatted")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

== Physical Plan ==
AdaptiveSparkPlan (28)
+- HashAggregate (27)
   +- Exchange (26)
      +- HashAggregate (25)
         +- Project (24)
            +- RangeJoin (23)
               :- Filter (15)
               :  +- ObjectHashAggregate (14)
               :     +- Exchange (13)
               :        +- ObjectHashAggregate (12)
               :           +- Project (11)
               :              +- BroadcastHashJoin Inner BuildRight (10)
               :                 :- Project (5)
               :                 :  +- Filter (4)
               :                 :     +- Generate (3)
               :                 :        +- Filter (2)
               :                 :           +- Scan geojson  (1)
               :                 +- BroadcastExchange (9)
               :                    +- Project (8)
               :                       +- Filter (7)
               :                          +- Scan csv  (6)
               +- Union (22)
                  :- Proj

In [18]:
results.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------------------+------------------+-------------------------+
|         community|    Average Income|Average Crimes Per Person|
+------------------+------------------+-------------------------+
|    Toluca Terrace|20167.531898539586|      0.23059185242121444|
|      Elysian Park|13871.322764382001|       0.8173533320675906|
|          Longwood|13420.052256532066|        0.730166270783848|
|     Green Meadows| 8027.096412895414|       1.2095757025377125|
|  Cadillac-Corning|19572.784696174043|       0.6669167291822956|
|          Mid-city| 21734.64899923286|       0.8071692586651789|
|   Lincoln Heights|10902.134793218598|       0.6226239404058567|
|          Van Nuys|14488.189551145677|       0.9170415838361292|
|    Gramercy Place|14936.698098639128|       1.0759579191197761|
| Faircrest Heights|20908.511762997387|       1.2802788266047052|
|     Boyle Heights|7451.4033262510075|       0.7583217329718905|
|  Lafayette Square|16700.833409821018|       0.9137218907755851|
|     Gran

In [19]:
from pyspark.sql.functions import round
df = results
# Round the columns
rounded_df = df.withColumn("Average Income", round(df["Average Income"], 0)) \
               .withColumn("Average Crimes Per Person", round(df["Average Crimes Per Person"], 3))

# Show all rows with rounded values
rounded_df.show(n=rounded_df.count(), truncate=False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------------------+--------------+-------------------------+
|community              |Average Income|Average Crimes Per Person|
+-----------------------+--------------+-------------------------+
|Toluca Terrace         |20168.0       |0.231                    |
|Elysian Park           |13871.0       |0.817                    |
|Longwood               |13420.0       |0.73                     |
|Green Meadows          |8027.0        |1.21                     |
|Cadillac-Corning       |19573.0       |0.667                    |
|Mid-city               |21735.0       |0.807                    |
|Lincoln Heights        |10902.0       |0.623                    |
|Van Nuys               |14488.0       |0.917                    |
|Gramercy Place         |14937.0       |1.076                    |
|Faircrest Heights      |20909.0       |1.28                     |
|Boyle Heights          |7451.0        |0.758                    |
|Lafayette Square       |16701.0       |0.914                 

In [20]:
import time
c = census_transformed.filter((col("city") == "Los Angeles"))
i = income_transformed.filter(col("community").startswith("Los Angeles"))

def test(df, name, iterations=10):
    t = 0
    for iter in range(iterations):
        # Introduce a dummy transformation to force recomputation
        temp_df = df.withColumn(f"dummy{iter}", df["zipcode"] + iter)
        t0 = time.time()
        temp_df.collect()
        t = t + time.time() - t0
    t = t / iterations
    print(f"{name}: {t:.2f} seconds")

    
broadcast_i = c.join(i.hint("BROADCAST"), on="zipcode", how="inner")
broadcast_c = c.hint("BROADCAST").join(i, on="zipcode", how="inner")
merge_i = c.join(i.hint("MERGE"), on="zipcode", how="inner")
merge_c = c.hint("MERGE").join(i, on="zipcode", how="inner")
shuffle_hash_i = c.join(i.hint("SHUFFLE_HASH"), on="zipcode", how="inner")
shuffle_hash_c = c.hint("SHUFFLE_HASH").join(i, on="zipcode", how="inner")
shuffle_replicate_nl_i = c.join(i.hint("SHUFFLE_REPLICATE_NL"), on="zipcode", how="inner")
shuffle_replicate_nl_c = c.hint("SHUFFLE_REPLICATE_NL").join(i, on="zipcode", how="inner")

test(broadcast_i, "Broadcast Hash Join (Income Table)")
test(broadcast_c, "Broadcast Hash Join (Census Blocks)")
test(merge_i, "Sort Merge Join (Income Table)")
test(merge_c, "Sort Merge Join (Census Blocks)")
test(shuffle_hash_i, "Shuffle Hash Join (Income Table)")
test(shuffle_hash_c, "Shuffle Hash Join (Census Blocks)")
test(shuffle_replicate_nl_i, "Cartesian Product Join (Income Table)")
test(shuffle_replicate_nl_c, "Cartesian Product Join (Census Blocks)")



FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Broadcast Hash Join (Income Table): 12.19 seconds
Broadcast Hash Join (Census Blocks): 11.12 seconds
Sort Merge Join (Income Table): 10.53 seconds
Sort Merge Join (Census Blocks): 11.28 seconds
Shuffle Hash Join (Income Table): 10.37 seconds
Shuffle Hash Join (Census Blocks): 11.51 seconds
Cartesian Product Join (Income Table): 11.03 seconds
Cartesian Product Join (Census Blocks): 10.69 seconds

In [21]:
shuffle_replicate_nl_c.explain("formatted")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

== Physical Plan ==
* Project (10)
+- CartesianProduct Inner (9)
   :- * Project (5)
   :  +- * Filter (4)
   :     +- * Generate (3)
   :        +- * Filter (2)
   :           +- Scan geojson  (1)
   +- * Project (8)
      +- * Filter (7)
         +- Scan csv  (6)


(1) Scan geojson 
Output [1]: [features#412]
Batched: false
Location: InMemoryFileIndex [s3://initial-notebook-data-bucket-dblab-905418150721/2010_Census_Blocks.geojson]
PushedFilters: [IsNotNull(features)]
ReadSchema: struct<features:array<struct<geometry:binary,properties:struct<BG10:string,BG10FIP10:string,BG12:string,CB10:string,CEN_FIP13:string,CITY:string,CITYCOM:string,COMM:string,CT10:string,CT12:string,CTCB10:string,HD_2012:bigint,HD_NAME:string,HOUSING10:bigint,LA_FIP10:string,OBJECTID:bigint,POP_2010:bigint,PUMA10:string,SPA_2012:bigint,SPA_NAME:string,SUP_DIST:string,SUP_LABEL:string,ShapeSTArea:double,ShapeSTLength:double,ZCTA10:string>,type:string>>>

(2) Filter [codegen id : 1]
Input [1]: [features#412]
Cond