In [1]:
%%configure -f
{
    "conf": {
        "spark.executor.instances": "2",
        "spark.executor.memory": "8g",
        "spark.executor.cores": "4"
    }
}

ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
1644,application_1765289937462_1629,pyspark,idle,Link,Link,,
1647,application_1765289937462_1632,pyspark,idle,Link,Link,,
1654,application_1765289937462_1638,pyspark,idle,Link,Link,,
1667,application_1765289937462_1651,pyspark,idle,Link,Link,,
1671,application_1765289937462_1655,pyspark,idle,Link,Link,,
1673,,pyspark,starting,,,,


In [1]:
%%configure -f
{
    "conf": {
        "spark.executor.instances": "4",
        "spark.executor.memory": "4g",
        "spark.executor.cores": "2"
    }
}

ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
1644,application_1765289937462_1629,pyspark,idle,Link,Link,,
1647,application_1765289937462_1632,pyspark,idle,Link,Link,,
1654,application_1765289937462_1638,pyspark,idle,Link,Link,,
1667,application_1765289937462_1651,pyspark,idle,Link,Link,,
1671,application_1765289937462_1655,pyspark,idle,Link,Link,,
1673,application_1765289937462_1657,pyspark,idle,Link,Link,,
1675,application_1765289937462_1659,pyspark,idle,Link,Link,,


In [1]:
%%configure -f
{
    "conf": {
        "spark.executor.instances": "8",
        "spark.executor.memory": "2g",
        "spark.executor.cores": "1"
    }
}

ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
1644,application_1765289937462_1629,pyspark,idle,Link,Link,,
1667,application_1765289937462_1651,pyspark,idle,Link,Link,,
1673,application_1765289937462_1657,pyspark,idle,Link,Link,,
1680,application_1765289937462_1664,pyspark,idle,Link,Link,,
1681,application_1765289937462_1665,pyspark,idle,Link,Link,,
1682,application_1765289937462_1666,pyspark,idle,Link,Link,,
1686,application_1765289937462_1670,pyspark,idle,Link,Link,,
1687,application_1765289937462_1671,pyspark,idle,Link,Link,,


In [2]:
import time
from pyspark.sql.types import StructType, StructField, StringType, LongType, DoubleType
from sedona.spark import *
import json
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import col, count, desc, split, avg, row_number, year, to_timestamp, expr, lit, to_date, regexp_replace

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
1688,application_1765289937462_1672,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [3]:
CRIMES_PATH = "s3://initial-notebook-data-bucket-dblab-905418150721/project_data/LA_Crime_Data/LA_Crime_Data_2020_2025.csv"
CENSUS_PATH = "s3://initial-notebook-data-bucket-dblab-905418150721/project_data/LA_Census_Blocks_2020.geojson"
INCOME_PATH = "s3://initial-notebook-data-bucket-dblab-905418150721/project_data/LA_income_2021.csv"

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [4]:
spark = SparkSession.builder.appName("Query 5").getOrCreate()
sedona = SedonaContext.create(spark)

def load_crime_data(path):
    df = spark.read.csv(path, header=True)
    df = df.select("LAT", "LON", "DATE OCC")
    df = df.withColumn("LAT", F.col("LAT").cast("float")) \
           .withColumn("LON", F.col("LON").cast("float")) \
           .filter((F.col("LAT") != 0) & (F.col("LON") != 0)) \
           .filter(F.col("LAT").isNotNull() & F.col("LON").isNotNull())
  
    df = df.withColumn("ts_occ", F.to_timestamp(F.col("DATE OCC"), "yyyy MMM dd hh:mm:ss a")) \
           .withColumn("year", F.year("ts_occ")) \
           .filter(F.col("year").isin([2020, 2021]))
    
    return df.withColumn("geom", F.expr("ST_Point(LON, LAT)"))

def load_census_full(path):

    df = sedona.read.format("geojson").option("multiLine", "true").load(path)
    
    df = df.selectExpr("explode(features) as feature") \
           .select(
               F.col("feature.properties.COMM").alias("COMM"),
               F.col("feature.properties.POP20").cast("long").alias("POP20"),
               F.col("feature.properties.HOUSING20").cast("long").alias("HOUSING20"),  # arithmos noikokyriwn
               F.col("feature.properties.ZCTA20").alias("ZCTA20"),
               F.col("feature.geometry").alias("geometry")
           )

    return df.filter(F.col("geometry").isNotNull())

def load_income(path):
    df = spark.read.option("delimiter", ";").option("header", "true").csv(path)
    df = df.select("Estimated Median Income", "Zip Code")
    return df.withColumn("income_cleaned", F.regexp_replace(F.col("Estimated Median Income"), "[$,]", "")) \
             .withColumn("median_income", F.col("income_cleaned").cast("double")) \
             .withColumn("Zip Code", F.trim(F.col("Zip Code")))

crimes = load_crime_data(CRIMES_PATH)
census = load_census_full(CENSUS_PATH)
income = load_income(INCOME_PATH)

start_time = time.time()
# spatial join crimes x census
# Se poia perioxh anhkei kathe egklhma
crimes_geo = crimes.alias("c").join(
    F.broadcast(census.alias("b")),
    F.expr("ST_Contains(b.geometry, c.geom)")
).select(F.col("b.COMM").alias("community"))

# count crimes per area
crime_counts = crimes_geo.groupBy("community").agg(F.count("*").alias("total_crimes"))

# join census x income (Zip Code)
# se kathe block antistoixei to median household income
census_prep = census.withColumn("ZCTA20", F.trim(F.col("ZCTA20")))
census_income = census_prep.join(income, census_prep["ZCTA20"] == income["Zip Code"])

# synoliko eisodhma block
# total block income = median household income * arithmos noikokyriwn 
census_income = census_income.withColumn(
    "block_total_income", 
    F.col("median_income") * F.col("HOUSING20")
)
# ana perioxh
# athroizw pop kai total block income ana perioxh
comm_stats = census_income.groupBy("COMM").agg(
    F.sum("POP20").alias("total_pop"),
    F.sum("block_total_income").alias("total_income_area")
)

# kata kefalhn eisodhma perioxhs = total income perioxhs / total pop perioxhs
comm_stats = comm_stats.withColumn(
    "per_capita_income", 
    F.col("total_income_area") / F.col("total_pop")
)

# join crime counts x comm stats
final_df = crime_counts.join(comm_stats, F.col("community") == F.col("COMM")) \
                       .drop("community")

# ethsia mesh analogia egklhmatwn ana atomo ana perioxh
final_df = final_df.withColumn("crime_rate", (F.col("total_crimes") / 2) / F.col("total_pop"))

final_df = final_df.filter(
    (F.col("total_pop") > 0) & 
    (F.col("per_capita_income").isNotNull()) & 
    (F.col("per_capita_income") > 0)
)

final_df.cache()
print(f"--> Final Rows: {final_df.count()}")

if final_df.count() > 0:
    print("\n--- Final Data Sample (Per Capita) ---")
    final_df.select("COMM", "per_capita_income", "crime_rate").show(5)

    # global corr
    print(f"Global Correlation (Per Capita vs Crime Rate): {final_df.stat.corr('per_capita_income', 'crime_rate')}")

    # top 10 income perioxes
    print("\n--- Top 10 Per Capita Income Areas ---")
    top10 = final_df.orderBy(F.col("per_capita_income").desc()).limit(10)
    top10.select("COMM", "per_capita_income", "crime_rate").show()
    print(f"Top 10 Corr: {top10.stat.corr('per_capita_income', 'crime_rate')}")

    # bottom 10 income perioxes
    print("\n--- Bottom 10 Per Capita Income Areas ---")
    bot10 = final_df.orderBy(F.col("per_capita_income").asc()).limit(10)
    bot10.select("COMM", "per_capita_income", "crime_rate").show()
    print(f"Bottom 10 Corr: {bot10.stat.corr('per_capita_income', 'crime_rate')}")
else:
    print("!!! ERROR: Table is empty. !!!")

end_time = time.time()  

execution_duration = end_time - start_time
print(f"Total Execution Time: {execution_duration} seconds")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

--> Final Rows: 154

--- Final Data Sample (Per Capita) ---
+--------------------+------------------+--------------------+
|                COMM| per_capita_income|          crime_rate|
+--------------------+------------------+--------------------+
|Rosewood/East Gar...| 16560.82904884319|0.011568123393316195|
|      Toluca Terrace|25786.809278350516|0.017304860088365244|
|        Elysian Park|26141.354852410277| 0.07825440645572308|
|            Longwood|20725.211884284596|0.048605681522022416|
|       Green Meadows| 11848.86150789897| 0.07541859805127235|
+--------------------+------------------+--------------------+
only showing top 5 rows

Global Correlation (Per Capita vs Crime Rate): -0.1676344553502217

--- Top 10 Per Capita Income Areas ---
+-------------------+-----------------+--------------------+
|               COMM|per_capita_income|          crime_rate|
+-------------------+-----------------+--------------------+
|     Marina del Rey| 97983.0745900194|0.002953623699523..

| Category | Correlation Value |
| :--- | :--- |
| Global Correlation | -0.16763445535022178 |
| Top 10 Income Areas | -0.5005562422035678 |
| Bottom 10 Income Areas | 0.20103891349035236 |

| Configuration | Execution Time (s) |
| :--- | :--- |
| 4 cores, 8GB mem | 49.2229700088501 |
| 2 cores, 4GB mem | 54.48193287849426 |
| 1 cores, 2GB mem | 55.66944622993469 |

In [5]:
final_df.explain()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- InMemoryTableScan [total_crimes#222L, COMM#140, total_pop#271L, total_income_area#273, per_capita_income#277, crime_rate#304]
      +- InMemoryRelation [total_crimes#222L, COMM#140, total_pop#271L, total_income_area#273, per_capita_income#277, crime_rate#304], StorageLevel(disk, memory, deserialized, 1 replicas)
            +- AdaptiveSparkPlan isFinalPlan=true
               +- == Final Plan ==
                  *(6) Project [total_crimes#222L, COMM#140, total_pop#271L, total_income_area#273, per_capita_income#277, ((cast(total_crimes#222L as double) / 2.0) / cast(total_pop#271L as double)) AS crime_rate#304]
                  +- *(6) BroadcastHashJoin [community#218], [COMM#140], Inner, BuildRight, false
                     :- *(6) HashAggregate(keys=[community#218], functions=[count(1)], schema specialized)
                     :  +- ShuffleQueryStage 0
                     :     +- Exchange hashpartitioning(community#218, 