# Load Datasets and Check Schema

In [1]:
from pyspark.sql import SparkSession
from sedona.spark import *
from pyspark.sql.functions import col, sum as spark_sum, regexp_replace, round, count
import time

# Create SparkSession
spark = SparkSession.builder \
    .appName("Query3") \
    .config("spark.dynamicAllocation.enabled", "true") \
    .config("spark.dynamicAllocation.minExecutors", "2") \
    .config("spark.dynamicAllocation.maxExecutors", "10") \
    .config("spark.executor.memory", "4g") \
    .config("spark.eventLog.enabled", "true") \
    .config("spark.eventLog.dir", "s3://groups-bucket-dblab-905418150721/group7/logs/") \
    .getOrCreate()
spark.conf.set("spark.sql.shuffle.partitions", "200")

# Create sedona context
sedona = SedonaContext.create(spark)

# Start timer
start_time = time.time()

# Load GeoJSON file (2010 Census Blocks)
geojson_path = "s3://initial-notebook-data-bucket-dblab-905418150721/2010_Census_Blocks.geojson"
blocks_df = sedona.read.format("geojson") \
    .option("multiLine", "true").load(geojson_path) \
    .selectExpr("explode(features) as features") \
    .select("features.*")

# Check geojson datatypes
print("Schema of GeoJSON data:")
blocks_df.printSchema()

# Flatten GeoJSON properties
flattened_df = blocks_df.select(
    [col(f"properties.{col_name}").alias(col_name) for col_name in
     blocks_df.schema["properties"].dataType.fieldNames()] + ["geometry"]
).drop("properties").drop("type")

# Load dataset 2 with median income
income_data_path = "s3://initial-notebook-data-bucket-dblab-905418150721/LA_income_2015.csv"
income_df = spark.read.csv(income_data_path, header=True, inferSchema=True).select("Zip Code", "Estimated Median Income")

# Load crime datasets
crime_data_path_1 = "s3://initial-notebook-data-bucket-dblab-905418150721/CrimeData/Crime_Data_from_2010_to_2019_20241101.csv"
crime_data_path_2 = "s3://initial-notebook-data-bucket-dblab-905418150721/CrimeData/Crime_Data_from_2020_to_Present_20241101.csv"

# Read both datasets
data_1 = spark.read.csv(crime_data_path_1, header=True, inferSchema=True)
data_2 = spark.read.csv(crime_data_path_2, header=True, inferSchema=True)

# Calculate dataset preparation time
elapsed_time = time.time() - start_time
print(f"Dataset loading time: {elapsed_time:.4f} seconds")

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
3302,application_1732639283265_3258,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Schema of GeoJSON data:
root
 |-- geometry: geometry (nullable = true)
 |-- properties: struct (nullable = true)
 |    |-- BG10: string (nullable = true)
 |    |-- BG10FIP10: string (nullable = true)
 |    |-- BG12: string (nullable = true)
 |    |-- CB10: string (nullable = true)
 |    |-- CEN_FIP13: string (nullable = true)
 |    |-- CITY: string (nullable = true)
 |    |-- CITYCOM: string (nullable = true)
 |    |-- COMM: string (nullable = true)
 |    |-- CT10: string (nullable = true)
 |    |-- CT12: string (nullable = true)
 |    |-- CTCB10: string (nullable = true)
 |    |-- HD_2012: long (nullable = true)
 |    |-- HD_NAME: string (nullable = true)
 |    |-- HOUSING10: long (nullable = true)
 |    |-- LA_FIP10: string (nullable = true)
 |    |-- OBJECTID: long (nullable = true)
 |    |-- POP_2010: long (nullable = true)
 |    |-- PUMA10: string (nullable = true)
 |    |-- SPA_2012: long (nullable = true)
 |    |-- SPA_NAME: string (nullable = true)
 |    |-- SUP_DIST: string (n

# Calculate median income per person

In [2]:
# Clean Census Dataset
la_census_df = flattened_df.filter(col("CITY") == "Los Angeles").select("ZCTA10", "HOUSING10", "POP_2010", "COMM")
la_census_cleaned = la_census_df.filter(col("ZCTA10") != " ")
la_census_cleaned = la_census_cleaned.filter(col("COMM").isNotNull())

# Group by ZCTA10 and COMM and aggregate Housing and Population
merged_df = la_census_cleaned.groupBy("ZCTA10", "COMM").agg(
    spark_sum("HOUSING10").alias("Total Housing"),
    spark_sum("POP_2010").alias("Total Population"))

# Partition dataset to avoid shuffling
income_df_cleaned = income_df.repartition(200, col("Zip Code"))

# Join census dataset with income
merged_df_with_income = merged_df.hint("BROADCAST").join(
    income_df_cleaned, 
    merged_df["ZCTA10"] == income_df_cleaned["Zip Code"], 
    "left").drop("Zip Code")

# Show the execution plan
print("Execution plan for Census - Income Join:")
merged_df_with_income.explain()

# Cleanup Estimated Median Income: Removes $, commas, and converts to double
merged_df_with_income = merged_df_with_income.withColumn(
    "Total Income",
    regexp_replace(col("Estimated Median Income"), r"[$,]", "").cast("double") * col("Total Housing"))

# Group by COMM and calculate total income and population
final_result = merged_df_with_income.groupBy("COMM").agg(
    spark_sum("Total Income").alias("Total Income"),
    spark_sum("Total Population").alias("Total Population"))

# Calculate median income per person for each COMM
final_result = final_result.withColumn("Median Income Per Person", col("Total Income") / col("Total Population"))

# Round the values to 5 decimals
final_result = final_result.withColumn(
    "Total Income", round(col("Total Income"), 5)
    ).withColumn("Median Income Per Person", round(col("Median Income Per Person"), 5))

# Show results
final_result.show(truncate=False)

# Print half query timer
elapsed_time = time.time() - start_time
print(f"Median Income per person time: {elapsed_time:.4f} seconds")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Execution plan for Census - Income Join:
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [ZCTA10#66, COMM#49, Total Housing#354L, Total Population#356L, Estimated Median Income#190]
   +- BroadcastHashJoin [cast(ZCTA10#66 as int)], [Zip Code#188], LeftOuter, BuildRight, false
      :- HashAggregate(keys=[ZCTA10#66, COMM#49], functions=[sum(HOUSING10#55L), sum(POP_2010#58L)], schema specialized)
      :  +- Exchange hashpartitioning(ZCTA10#66, COMM#49, 200), ENSURE_REQUIREMENTS, [plan_id=149]
      :     +- HashAggregate(keys=[ZCTA10#66, COMM#49], functions=[partial_sum(HOUSING10#55L), partial_sum(POP_2010#58L)], schema specialized)
      :        +- Project [features#33.properties.ZCTA10 AS ZCTA10#66, features#33.properties.HOUSING10 AS HOUSING10#55L, features#33.properties.POP_2010 AS POP_2010#58L, features#33.properties.COMM AS COMM#49]
      :           +- Filter ((isnotnull(features#33.properties.CITY) AND isnotnull(features#33.properties.ZCTA10)) AND ((features#

## Save results 

In [3]:
# Έλεγχος τύπων αποτελέσματος
print("Schema of Final Result:")
final_result.printSchema()

# Αποθήκευση των αποτελεσμάτων σε αρχείο CSV
output_path = "s3://groups-bucket-dblab-905418150721/group7/q3_results"  # Replace with your desired path
final_result.write \
    .option("header", True) \
    .mode("overwrite") \
    .csv(f"{output_path}/median_income_per_person")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Schema of Final Result:
root
 |-- COMM: string (nullable = true)
 |-- Total Income: double (nullable = true)
 |-- Total Population: long (nullable = true)
 |-- Median Income Per Person: double (nullable = true)

# Calculate Crimes per person

In [4]:
#Specify LA city
flattened_df = flattened_df.filter(col("CITY") == "Los Angeles")

# Aggregate population by COMM to ensure no duplicates
agg_population_df = flattened_df.groupBy("COMM").agg(
    spark_sum("POP_2010").alias("total_population"),  # Sum population for each COMM
    ST_Union_Aggr("geometry").alias("geometry"))  # Combine geometries for each COMM

# Standardize column names (trim spaces)
data_1 = data_1.select([col(c).alias(c.strip()) for c in data_1.columns])
data_2 = data_2.select([col(c).alias(c.strip()) for c in data_2.columns])

# Combine crime datasets into one
combined_crime_df = data_1.unionByName(data_2)

# Ensure geometry points from crime data, drop rows with missing coordinates
combined_crime_df = combined_crime_df.withColumn(
    "geom", ST_Point(col("LON"), col("LAT"))
).dropna(subset=["LON", "LAT"])

# Spatial join to associate crimes with aggregated COMM data
joined_df = combined_crime_df.hint("MERGE").join(agg_population_df, ST_Within(combined_crime_df.geom, agg_population_df.geometry), "inner")

# Show the execution plan
print("Execution plan for Crimes - Census Join:")
joined_df.explain()

# Aggregate crime counts per COMM
crime_counts = joined_df.groupBy("COMM").agg(count("DR_NO").alias("crime_count"))

# Combine crime counts with aggregated population
pop_with_crime = crime_counts.join(agg_population_df, "COMM", "inner")
pop_with_crime = pop_with_crime.withColumn(
    "crimes_per_person", col("crime_count") / col("total_population"))

# Registering geometry column for manual WKT conversion
pop_with_crime.createOrReplaceTempView("pop_with_crime")

# Exclude geometry column if WKT conversion is not possible
final_result2 = pop_with_crime.filter(col("total_population") > 0) \
    .select("COMM", "crime_count", "total_population", "crimes_per_person")

# Show results
final_result2.show(truncate=False)
    
# Print Total Query timer
elapsed_time = time.time() - start_time
print(f"Both Queries time: {elapsed_time:.4f} seconds")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Execution plan for Crimes - Census Join:
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- RangeJoin geom#698: geometry, geometry#540: geometry, WITHIN
   :- Union
   :  :- Project [DR_NO#215 AS DR_NO#556, Date Rptd#216 AS Date Rptd#557, DATE OCC#217 AS DATE OCC#558, TIME OCC#218 AS TIME OCC#559, AREA #219 AS AREA#560, AREA NAME#220 AS AREA NAME#561, Rpt Dist No#221 AS Rpt Dist No#562, Part 1-2#222 AS Part 1-2#563, Crm Cd#223 AS Crm Cd#564, Crm Cd Desc#224 AS Crm Cd Desc#565, Mocodes#225 AS Mocodes#566, Vict Age#226 AS Vict Age#567, Vict Sex#227 AS Vict Sex#568, Vict Descent#228 AS Vict Descent#569, Premis Cd#229 AS Premis Cd#570, Premis Desc#230 AS Premis Desc#571, Weapon Used Cd#231 AS Weapon Used Cd#572, Weapon Desc#232 AS Weapon Desc#573, Status#233 AS Status#574, Status Desc#234 AS Status Desc#575, Crm Cd 1#235 AS Crm Cd 1#576, Crm Cd 2#236 AS Crm Cd 2#577, Crm Cd 3#237 AS Crm Cd 3#578, Crm Cd 4#238 AS Crm Cd 4#579, ... 5 more fields]
   :  :  +- Filter (atleastnnonnulls(

## Save results

In [5]:
# Save without geometry
output_path = "s3://groups-bucket-dblab-905418150721/group7/q3_results"
final_result2.write \
    .option("header", True) \
    .mode("overwrite") \
    .csv(f"{output_path}/crimes_per_person")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Join Results in one Table

In [6]:
# Join the two DataFrames on the "COMM" column
final_combined_result = final_result.hint("MERGE").join(final_result2, on="COMM", how="inner")

# Show the execution plan
print("Execution plan for Final Result:")
final_combined_result.explain()

# Add a new column to calculate the ratio of crimes per person
final_combined_result = final_combined_result.select(
    "COMM",
    round(col("Total Income"), 5).alias("Total Income"),
    round(col("Median Income Per Person"), 5).alias("Median Income Per Person"),
    "crime_count",
    round((col("crime_count") / col("total_population")), 5).alias("Crimes Per Person Ratio"))


# Print Total Time
elapsed_time = time.time() - start_time
print(f"Total time: {elapsed_time:.4f} seconds")

final_combined_result.show(truncate=False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Execution plan for Final Result:
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [COMM#49, Total Income#413, Total Population#404L, Median Income Per Person#418, crime_count#865L, total_population#535L, crimes_per_person#914]
   +- SortMergeJoin [COMM#49], [COMM#1024], Inner
      :- Sort [COMM#49 ASC NULLS FIRST], false, 0
      :  +- Project [COMM#49, round(Total Income#402, 5) AS Total Income#413, Total Population#404L, round((Total Income#402 / cast(Total Population#404L as double)), 5) AS Median Income Per Person#418]
      :     +- HashAggregate(keys=[COMM#49], functions=[sum(Total Income#388), sum(Total Population#356L)], schema specialized)
      :        +- Exchange hashpartitioning(COMM#49, 200), ENSURE_REQUIREMENTS, [plan_id=2679]
      :           +- HashAggregate(keys=[COMM#49], functions=[partial_sum(Total Income#388), partial_sum(Total Population#356L)], schema specialized)
      :              +- Project [COMM#49, Total Population#356L, (cast(regexp_r

## Save final results

In [7]:
# Optional: Save the final DataFrame as a CSV file for further analysis
output_path = "s3://groups-bucket-dblab-905418150721/group7/q3_results"
final_combined_result.write \
    .option("header", True) \
    .mode("overwrite") \
    .csv(f"{output_path}/final_combined_results")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…