In [1]:
%%configure -f
{
  "conf": {
    "spark.executor.instances": "2",
    "spark.executor.cores": "4",
    "spark.executor.memory": "8g"
  }
}

ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
1720,application_1765289937462_1704,pyspark,idle,Link,Link,,
1751,application_1765289937462_1735,pyspark,idle,Link,Link,,
1753,application_1765289937462_1737,pyspark,idle,Link,Link,,
1756,application_1765289937462_1740,pyspark,idle,Link,Link,,
1758,application_1765289937462_1742,pyspark,idle,Link,Link,,
1759,application_1765289937462_1743,pyspark,idle,Link,Link,,


In [1]:
%%configure -f
{
  "conf": {
    "spark.executor.instances": "4",
    "spark.executor.cores": "2",
    "spark.executor.memory": "4g"
  }
}

ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
1266,application_1765289937462_1256,pyspark,idle,Link,Link,,
1268,application_1765289937462_1258,pyspark,idle,Link,Link,,
1269,application_1765289937462_1259,pyspark,idle,Link,Link,,
1273,application_1765289937462_1263,pyspark,busy,Link,Link,,


In [1]:
%%configure -f
{
  "conf": {
    "spark.executor.instances": "8",
    "spark.executor.cores": "1",
    "spark.executor.memory": "2g"
  }
}

ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
1266,application_1765289937462_1256,pyspark,idle,Link,Link,,
1268,application_1765289937462_1258,pyspark,idle,Link,Link,,
1269,application_1765289937462_1259,pyspark,idle,Link,Link,,
1273,application_1765289937462_1263,pyspark,idle,Link,Link,,


In [2]:
# Configuration switch check
print("executors =", spark.conf.get("spark.executor.instances", "n/a"))
print("cores/exec =", spark.conf.get("spark.executor.cores", "n/a"))
print("mem/exec   =", spark.conf.get("spark.executor.memory", "n/a"))

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
1276,application_1765289937462_1266,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

executors = 8
cores/exec = 1
mem/exec   = 2g

In [2]:
from pyspark.sql import functions as F
from pyspark.sql.types import StringType
from pyspark.sql import Window
from sedona.register import SedonaRegistrator
import json
import time

# --- Timing helpers ---
SHOW_STEP_TIMES = True
_T0 = time.time()
_tprev = _T0
def _lap(label):
    global _tprev
    if SHOW_STEP_TIMES:
        dt = time.time() - _tprev
        print(f">>> [timing] {label} took {dt:.2f}s")
    _tprev = time.time()

SedonaRegistrator.registerAll(spark)
print(">>> Sedona registered OK")


POINT_ORDER = "lonlat"       # "lonlat" (x=lon,y=lat) ή "latlon"
SPATIAL_JOIN = "contains"    # 'contains' / 'covers' / 'intersects'

crime_2010_2019_path = "s3://initial-notebook-data-bucket-dblab-905418150721/project_data/LA_Crime_Data/LA_Crime_Data_2010_2019.csv"
crime_2020_plus_path = "s3://initial-notebook-data-bucket-dblab-905418150721/project_data/LA_Crime_Data/LA_Crime_Data_2020_2025.csv"
census_geojson_path  = "s3://initial-notebook-data-bucket-dblab-905418150721/project_data/LA_Census_Blocks_2020.geojson"
income_path          = "s3://initial-notebook-data-bucket-dblab-905418150721/project_data/LA_income_2021.csv"

crimes_2010_2019 = spark.read.option("header", True).csv(crime_2010_2019_path)
crimes_2020_plus = spark.read.option("header", True).csv(crime_2020_plus_path)
crimes = crimes_2010_2019.unionByName(crimes_2020_plus)

print(">>> crimes total rows:", crimes.count()); _lap("Load & union crimes (count)")
print(">>> crimes schema:")
crimes.printSchema()

crimes = crimes.select("DR_NO", "DATE OCC", "LAT", "LON")
print(">>> crimes selected cols schema:")
crimes.printSchema()

crimes = crimes.withColumn("DATE_OCC_TS", F.to_timestamp("DATE OCC", "yyyy MMM dd hh:mm:ss a")) \
               .withColumn("YEAR_OCC", F.year("DATE_OCC_TS"))

print(">>> crimes with YEAR_OCC sample:")
crimes.select("DR_NO", "DATE OCC", "DATE_OCC_TS", "YEAR_OCC").show(10, truncate=False); _lap("Derive YEAR_OCC (show)")

# 2020–2021
crimes_2020_2021 = crimes.filter((F.col("YEAR_OCC") >= 2020) & (F.col("YEAR_OCC") <= 2021))
print(">>> crimes_2020_2021 rows after year filter:", crimes_2020_2021.count()); _lap("Filter 2020–2021 (count)")

# LAT/LON σε double + φίλτρα 0/Null
crimes_2020_2021 = (crimes_2020_2021
    .withColumn("LAT_D", F.col("LAT").cast("double"))
    .withColumn("LON_D", F.col("LON").cast("double"))
    .filter(F.col("LAT_D").isNotNull() & F.col("LON_D").isNotNull())
    .filter(~((F.col("LAT_D") == 0.0) & (F.col("LON_D") == 0.0)))
)

print(">>> crimes_2020_2021 rows after coord filters:", crimes_2020_2021.count())
crimes_2020_2021.select("DR_NO", "YEAR_OCC", "LAT_D", "LON_D").show(10, truncate=False); _lap("Clean coords (count+show)")

# Γεωμετρία εγκλημάτων (σημεία)
if POINT_ORDER == "latlon":
    crimes_geom = crimes_2020_2021.selectExpr(
        "DR_NO as crime_id", "YEAR_OCC", "ST_Point(LAT_D, LON_D) as crime_geom"
    )
else:
    crimes_geom = crimes_2020_2021.selectExpr(
        "DR_NO as crime_id", "YEAR_OCC", "ST_Point(LON_D, LAT_D) as crime_geom"
    )

print(">>> crimes_geom rows:", crimes_geom.count())
crimes_geom.show(5, truncate=False); _lap("Build crimes_geom (count+show)")
crimes_geom.createOrReplaceTempView("crimes_geom")

# Create Blocks
blocks_raw = spark.read.option("multiline", "true").json(census_geojson_path)
print(">>> blocks_raw schema:")
blocks_raw.printSchema()
blocks_raw.select("crs","features","name","type").show(1, truncate=True)

blocks_features = blocks_raw.selectExpr("explode(features) as feature")
print(">>> blocks_features rows:", blocks_features.count()); _lap("Read census geojson (count)")

# GeoJson Cleaning for Sedona

# Coordinate Cleaning - Target [x,y]
def _parse_pos(v):
    """Return [x,y] or None from: "[x,y]" | ["x","y"] | [x,y]."""
    if v is None:
        return None
    if isinstance(v, str): # if string coords
        t = v.strip()
        if t.startswith("[") and t.endswith("]"): t = t[1:-1]
        parts = [p.strip() for p in t.split(",")]
        if len(parts) >= 2:
            try: return [float(parts[0]), float(parts[1])] # make float
            except: return None
        return None
    if isinstance(v, (list, tuple)) and len(v) >= 2: # if list or tuple coords
        try:
            x = float(v[0]) if isinstance(v[0], (int, float, str)) else None # Return if
            y = float(v[1]) if isinstance(v[1], (int, float, str)) else None # both are numbers
            return [x, y] if x is not None and y is not None else None
        except: return None
    return None

# In case of incomplete ring -> First point is also last
def _close_ring_if_needed(r):
    return r if (len(r) >= 4 and r[0] == r[-1]) else (r + [r[0]] if len(r) >= 3 else r)

# Keep valid(parsable) points per ring
def _clean_polygon_coords(rings):
    if not isinstance(rings, (list, tuple)):
        return None
    cleaned_rings = []
    for ring in rings:
        if not isinstance(ring, (list, tuple)): continue
        pts = [_parse_pos(pos) for pos in ring]
        pts = [p for p in pts if p is not None]
        if len(pts) >= 3:
            pts = _close_ring_if_needed(pts)
            if len(pts) >= 4:
                cleaned_rings.append(pts)
    return cleaned_rings if cleaned_rings else None

def _clean_coords(gtype, coords):
    if gtype == "Polygon":
        return _clean_polygon_coords(coords)
    if gtype == "MultiPolygon": # List of Rings
        if not isinstance(coords, (list, tuple)): return None
        polys = []
        for poly in coords:
            cr = _clean_polygon_coords(poly)
            if cr is not None: polys.append(cr)
        return polys if polys else None
    return None

def fix_geom(geom):
    if geom is None: return None
    gtype  = (geom.get("type") if isinstance(geom, dict) else geom["type"])
    coords = (geom.get("coordinates") if isinstance(geom, dict) else geom["coordinates"])
    cleaned = _clean_coords(gtype, coords)
    if cleaned is None: return None
    return json.dumps({"type": gtype, "coordinates": cleaned})

fix_geom_udf = F.udf(fix_geom, StringType())

blocks_with_geom_json = (blocks_features
    .withColumn("geom_json_str", fix_geom_udf(F.col("feature.geometry")))
    .filter(F.col("geom_json_str").isNotNull())
)

print(">>> sample geom_json_str (fixed):")
blocks_with_geom_json.select("geom_json_str").show(3, truncate=True)

blocks = (
    blocks_with_geom_json
    .select(
        F.col("feature.properties.COMM").alias("COMM"),
        F.col("feature.properties.ZCTA20").cast("string").alias("ZCTA20"),
        F.col("feature.properties.POP20").cast("double").alias("POP20"),
        F.col("feature.properties.HOUSING20").cast("double").alias("HOUSING20"),
        F.expr("ST_MakeValid(ST_GeomFromGeoJSON(geom_json_str))").alias("geom")
    )
    .withColumn("COMM", F.trim(F.col("COMM")))
    .filter(F.col("COMM").isNotNull() & (F.col("COMM") != ""))
    .filter(F.col("POP20").isNotNull() & F.col("HOUSING20").isNotNull())
    .filter(F.col("geom").isNotNull())
    .filter(~F.expr("ST_IsEmpty(geom)"))
).cache()

print(">>> blocks rows after filters:", blocks.count()); _lap("Build blocks (count)")
blocks.printSchema()
blocks.createOrReplaceTempView("blocks_geom")

# QC
blocks.selectExpr("ST_IsValid(geom) AS is_valid","ST_IsEmpty(geom) AS is_empty") \
      .groupBy("is_valid","is_empty").count().show(); _lap("QC blocks (show)")

# Per ZIP Income
income_raw = (spark.read
    .option("header", True)
    .option("delimiter", ";")
    .csv(income_path)
)

print(">>> income schema (raw):")
income_raw.printSchema()
income_raw.show(10, truncate=False)

income = (income_raw
    .select(
        F.trim(F.col("Zip Code")).alias("ZIP"),
        F.col("Community").alias("Community_name"),
        F.col("Estimated Median Income").alias("EstimatedMedianIncome_raw")
    )
    .withColumn("median_income_household",
        F.regexp_replace("EstimatedMedianIncome_raw", "[$,]", "").cast("double")
    )
    .filter(F.col("median_income_household").isNotNull())
    .withColumn("ZIP_norm", F.regexp_replace(F.col("ZIP"), "[^0-9]", ""))
)

print(">>> income cleaned schema:")
income.printSchema()
income.select("ZIP","ZIP_norm","median_income_household").show(10, truncate=False); _lap("Prepare income (show)")


# Income per Community(Blocks) - Join

blocks_norm = (blocks
    .withColumn("ZCTA20_norm", F.regexp_replace(F.trim(F.col("ZCTA20")), "[^0-9]", ""))
    .filter(F.col("ZCTA20_norm").isNotNull() & (F.col("ZCTA20_norm") != ""))
)

income_for_join = income.select("ZIP_norm", "median_income_household")

blocks_income = blocks_norm.join(
    income_for_join,
    blocks_norm["ZCTA20_norm"] == income_for_join["ZIP_norm"],
    "left"
)

print(">>> blocks rows (with ZCTA20_norm):", blocks_norm.count())
print(">>> blocks_income rows (after left join):", blocks_income.count()); _lap("Join ZCTA ⟷ ZIP (counts)")
blocks_income.select("COMM","ZCTA20","ZCTA20_norm","median_income_household").show(10, truncate=False)

-
print("\n== Spark plan: ZCTA ⟷ ZIP (blocks_norm ⋈ income_for_join) ==")
blocks_income.explain(mode="formatted"); _lap("Explain ZCTA ⟷ ZIP")


# Aggregation per COMM

blocks_by_comm = (blocks_income
    .groupBy("COMM")
    .agg(
        F.sum("POP20").alias("total_pop"),
        F.sum("HOUSING20").alias("total_households"),
        (F.sum(F.col("HOUSING20") * F.col("median_income_household")) / F.sum("HOUSING20")).alias("mean_household_income_w"),
        F.sum(F.col("HOUSING20") * F.col("median_income_household")).alias("total_household_income")
    )
    .withColumn("per_capita_income", F.col("total_household_income") / F.col("total_pop"))
)

print(">>> blocks_by_comm rows:", blocks_by_comm.count()); _lap("Aggregate by COMM (count)")
blocks_by_comm.select("COMM","total_pop","total_households","mean_household_income_w","per_capita_income") \
              .orderBy(F.col("per_capita_income").desc_nulls_last()) \
              .show(20, truncate=False)

# Crimes Per Community - Join
print(">>> Running spatial join ...", SPATIAL_JOIN)

if SPATIAL_JOIN.lower() == "covers":
    join_sql = """
        SELECT c.crime_id, c.YEAR_OCC, b.COMM
        FROM crimes_geom c
        JOIN blocks_geom b ON ST_Covers(b.geom, c.crime_geom)
    """
elif SPATIAL_JOIN.lower() == "contains":
    join_sql = """
        SELECT c.crime_id, c.YEAR_OCC, b.COMM
        FROM crimes_geom c
        JOIN blocks_geom b ON ST_Contains(b.geom, c.crime_geom)
    """
else:  # intersects
    join_sql = """
        SELECT c.crime_id, c.YEAR_OCC, b.COMM
        FROM crimes_geom c
        JOIN blocks_geom b ON ST_Intersects(b.geom, c.crime_geom)
    """

crimes_with_comm_raw = spark.sql(join_sql)


print("\n== Spark plan: Spatial join (crimes_geom ⋈ blocks_geom) ==")
crimes_with_comm_raw.explain(mode="formatted"); _lap("Explain spatial join")

# Tie-break (1 COMM ανά crime)
w = Window.partitionBy("crime_id").orderBy(F.col("COMM").asc())
crimes_with_comm = (crimes_with_comm_raw
    .withColumn("rk", F.row_number().over(w))
    .filter(F.col("rk") == 1)
    .drop("rk")
)

print(">>> crimes_with_comm rows:", crimes_with_comm.count()); _lap("Tie-break (count)")
crimes_with_comm.show(20, truncate=False)


print("\n== Spark plan: After tie-break (row_number / filter rk==1) ==")
crimes_with_comm.explain(mode="formatted"); _lap("Explain tie-break")

# Crimes Per Community
crime_counts_by_comm = crimes_with_comm.groupBy("COMM").agg(F.count("*").alias("crime_count_2020_2021"))
print(">>> crime_counts_by_comm rows:", crime_counts_by_comm.count()); _lap("Crime counts per COMM (count)")
crime_counts_by_comm.orderBy(F.col("crime_count_2020_2021").desc()).show(20, truncate=False)

# Final Stat Blocks
area_stats = (blocks_by_comm
    .join(crime_counts_by_comm, "COMM", "left")
    .fillna({"crime_count_2020_2021": 0})
)

print(">>> area_stats rows:", area_stats.count()); _lap("Join area_stats (count)")
area_stats.select("COMM","per_capita_income","total_pop","crime_count_2020_2021") \
          .orderBy(F.col("per_capita_income").desc_nulls_last()).show(30, truncate=False)

# KPI: crimes per 1k people per year
area_stats = (area_stats
    .withColumn(
        "avg_annual_crime_per_person",
        F.when(F.col("total_pop") > 0, F.col("crime_count_2020_2021") / (2.0 * F.col("total_pop"))).otherwise(F.lit(None).cast("double"))
    )
    .withColumn(
        "crime_per_1k_per_year",
        F.when(F.col("total_pop") > 0, 1000.0 * F.col("crime_count_2020_2021") / (2.0 * F.col("total_pop"))).otherwise(F.lit(None).cast("double"))
    )
    .cache()
)

print(">>> area_stats with avg_annual_crime_per_person (sample):")
area_stats.select("COMM","per_capita_income","crime_per_1k_per_year","avg_annual_crime_per_person","total_pop","crime_count_2020_2021") \
          .orderBy("COMM").show(30, truncate=False); _lap("Compute KPIs (show)")

# ----- Pearson (overall) -----
overall_corr = area_stats.select("per_capita_income","avg_annual_crime_per_person") \
                         .na.drop().stat.corr("per_capita_income","avg_annual_crime_per_person")
print("Overall correlation (all COMM):", overall_corr); _lap("Overall correlation")

# ----- Spearman (overall) -----
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.stat import Correlation

_vec = VectorAssembler(inputCols=["per_capita_income","avg_annual_crime_per_person"], outputCol="features")
_overall_vec = _vec.transform(
    area_stats.select("per_capita_income","avg_annual_crime_per_person").na.drop()
).select("features")
_overall_spearman = Correlation.corr(_overall_vec, "features", method="spearman").head()[0].toArray()[0,1]
print("Overall correlation (all COMM) [Spearman]:", _overall_spearman)

top10 = area_stats.orderBy(F.col("per_capita_income").desc_nulls_last()).limit(10)
bottom10 = area_stats.orderBy(F.col("per_capita_income").asc_nulls_last()).limit(10)

print(">>> top10 rows:", top10.count())
print(">>> bottom10 rows:", bottom10.count()); _lap("Top/Bottom selection (counts)")

# ----- Pearson (Top-10 / Bottom-10) -----
top10_corr = top10.select("per_capita_income","avg_annual_crime_per_person").na.drop() \
                  .stat.corr("per_capita_income","avg_annual_crime_per_person")
bottom10_corr = bottom10.select("per_capita_income","avg_annual_crime_per_person").na.drop() \
                        .stat.corr("per_capita_income","avg_annual_crime_per_person")

print("Correlation (Top-10 richest COMM):", top10_corr)
print("Correlation (Bottom-10 poorest COMM):", bottom10_corr); _lap("Top/Bottom correlations")

# ----- Spearman (Top-10 / Bottom-10) -----
_top10_vec = _vec.transform(
    top10.select("per_capita_income","avg_annual_crime_per_person").na.drop()
).select("features")
_bottom10_vec = _vec.transform(
    bottom10.select("per_capita_income","avg_annual_crime_per_person").na.drop()
).select("features")

_top10_spearman = Correlation.corr(_top10_vec, "features", method="spearman").head()[0].toArray()[0,1]
_bottom10_spearman = Correlation.corr(_bottom10_vec, "features", method="spearman").head()[0].toArray()[0,1]

print("Correlation (Top-10 richest COMM) [Spearman]:", _top10_spearman)
print("Correlation (Bottom-10 poorest COMM) [Spearman]:", _bottom10_spearman)

print("\nTop-10 COMM by per_capita_income:")
top10.select("COMM","per_capita_income","crime_per_1k_per_year","avg_annual_crime_per_person","total_pop","crime_count_2020_2021").show(truncate=False)
print("\nBottom-10 COMM by per_capita_income:")
bottom10.select("COMM","per_capita_income","crime_per_1k_per_year","avg_annual_crime_per_person","total_pop","crime_count_2020_2021").show(truncate=False); _lap("Final shows")


TOTAL_SEC = time.time() - _T0
print(f"\n>>> [timing] TOTAL EXECUTION TIME: {TOTAL_SEC:.2f}s")


Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
1760,application_1765289937462_1744,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

>>> Sedona registered OK
>>> crimes total rows: 3138128
>>> [timing] Load & union crimes (count) took 14.03s
>>> crimes schema:
root
 |-- DR_NO: string (nullable = true)
 |-- Date Rptd: string (nullable = true)
 |-- DATE OCC: string (nullable = true)
 |-- TIME OCC: string (nullable = true)
 |-- AREA: string (nullable = true)
 |-- AREA NAME: string (nullable = true)
 |-- Rpt Dist No: string (nullable = true)
 |-- Part 1-2: string (nullable = true)
 |-- Crm Cd: string (nullable = true)
 |-- Crm Cd Desc: string (nullable = true)
 |-- Mocodes: string (nullable = true)
 |-- Vict Age: string (nullable = true)
 |-- Vict Sex: string (nullable = true)
 |-- Vict Descent: string (nullable = true)
 |-- Premis Cd: string (nullable = true)
 |-- Premis Desc: string (nullable = true)
 |-- Weapon Used Cd: string (nullable = true)
 |-- Weapon Desc: string (nullable = true)
 |-- Status: string (nullable = true)
 |-- Status Desc: string (nullable = true)
 |-- Crm Cd 1: string (nullable = true)
 |-- Crm Cd