In [1]:
%%configure -f
{
    "conf": {
        "spark.executor.instances": "2",
        "spark.executor.memory": "2g",
        "spark.executor.cores": "1",
        "spark.driver.memory": "2g"
    }
}

ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
2944,application_1732639283265_2903,pyspark,idle,Link,Link,,
2971,application_1732639283265_2929,pyspark,idle,Link,Link,,
2975,application_1732639283265_2933,pyspark,idle,Link,Link,,
2976,application_1732639283265_2934,pyspark,idle,Link,Link,,
3000,application_1732639283265_2958,pyspark,idle,Link,Link,,
3010,application_1732639283265_2968,pyspark,idle,Link,Link,,
3017,application_1732639283265_2975,pyspark,idle,Link,Link,,
3021,application_1732639283265_2979,pyspark,idle,Link,Link,,
3028,application_1732639283265_2986,pyspark,idle,Link,Link,,
3029,application_1732639283265_2987,pyspark,idle,Link,Link,,


In [2]:
from sedona.spark import *
from pyspark.sql.functions import col, sum, avg, regexp_replace, broadcast, trim, format_number, concat, lit, round
from pyspark.sql import SparkSession
import time
from pyspark.sql.types import FloatType,IntegerType
import pandas as pd
import csv



spark = SparkSession.builder \
    .appName("Q4") \
    .getOrCreate()

# Access configuration
conf = spark.sparkContext.getConf()

# Print relevant executor settings
print("Executor Instances:", conf.get("spark.executor.instances"))
print("Executor Memory:", conf.get("spark.executor.memory"))
print("Executor Cores:", conf.get("spark.executor.cores"))




Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
3102,application_1732639283265_3058,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Executor Instances: 2
Executor Memory: 2g
Executor Cores: 1

In [3]:

sedona = SedonaContext.create(spark)


spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "50MB")

geojson_path = "s3://initial-notebook-data-bucket-dblab-905418150721/2010_Census_Blocks.geojson"
blocks_df = sedona.read.format("geojson") \
    .option("multiLine", "true").load(geojson_path) \
    .selectExpr("explode(features) as features") \
    .select("features.*")

flattened_df = blocks_df.select(
    [col(f"properties.{col_name}").alias(col_name) for col_name in
     blocks_df.schema["properties"].dataType.fieldNames()] + ["geometry"]) \
    .drop("properties") \
    .drop("type")


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [4]:

income_df = spark.read.csv(
    "s3://initial-notebook-data-bucket-dblab-905418150721/LA_income_2015.csv",
    header=True
).withColumnRenamed("Zip Code", "ZCTA10").withColumn(
    "Estimated_Median_Income",
    regexp_replace(col("Estimated Median Income"), "[$,]", "").cast("double")
    ).drop("Community","Estimated Median Income")

crime_df = spark.read.csv(
    [
        "s3://initial-notebook-data-bucket-dblab-905418150721/CrimeData/Crime_Data_from_2010_to_2019_20241101.csv",
        "s3://initial-notebook-data-bucket-dblab-905418150721/CrimeData/Crime_Data_from_2020_to_Present_20241101.csv"
    ],
    header=True
).filter((col("LAT").isNotNull()) & (col("LON").isNotNull()))

valid_population_df = flattened_df.select("ZCTA10","COMM","POP_2010","HOUSING10","geometry").filter(
    (col("CITY") == "Los Angeles") &
    (col("ZCTA10") > 0) &
    ((col("POP_2010") > 0) & (col("HOUSING10") > 0)) &
    (trim(col("COMM")) != "")
    ).groupBy("ZCTA10","COMM").agg(sum("POP_2010").alias("Total Population"),sum("HOUSING10").alias("Total Housing"),ST_Union_Aggr("geometry").alias("geometry")
)

crime_with_geometry = crime_df.withColumn("geometry", ST_Point("LON", "LAT")).select("geometry", "Vict Descent") #Vict Descent for Query 4

start_time = time.time()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [5]:
parquet_s3_path = "s3://groups-bucket-dblab-905418150721/group17/query4_data.parquet"

final_calculations_df = spark.read.parquet(parquet_s3_path)


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [6]:

crime_2015_with_geometry = crime_with_geometry.filter(col("DATE OCC").contains("2015"))

crime_with_comm_2015 = flattened_df.select("COMM", "geometry").join(
    crime_2015_with_geometry,
    ST_Within(crime_2015_with_geometry["geometry"], flattened_df["geometry"]),
    "inner"
).select("COMM", "Vict Descent")

final_calculations_df_numeric = final_calculations_df.withColumn(
    "Median Income Per Person Numeric",
    regexp_replace(col("Median Income Per Person"), "[$,]", "").cast("double")
)

top_3_comm = final_calculations_df_numeric.orderBy(col("Median Income Per Person Numeric").desc()).select("COMM").limit(3)
bottom_3_comm = final_calculations_df_numeric.orderBy(col("Median Income Per Person Numeric").asc()).select("COMM").limit(3)

crime_top_3 = crime_with_comm_2015.join(top_3_comm, on="COMM", how="inner") #.dropDuplicates()
crime_bottom_3 = crime_with_comm_2015.join(bottom_3_comm, on="COMM", how="inner") #.dropDuplicates()

race_codes_df = spark.read.csv(
    "s3://initial-notebook-data-bucket-dblab-905418150721/RE_codes.csv",
    header=True
).withColumnRenamed("Vict Descent", "Vict_Descent") \
 .withColumnRenamed("Vict Descent Full", "Vict_Descent_Full")


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [7]:
race_codes_df = broadcast(race_codes_df)
#broadcast since it is small to reduce suffle
def get_victim_profile(crime_df, race_codes_df):
    return crime_df.groupBy("Vict Descent").count() \
        .join(race_codes_df, col("Vict Descent") == race_codes_df["Vict_Descent"], "left") \
        .filter(col("Vict_Descent_Full").isNotNull()) \
        .select("Vict_Descent_Full", "count") \
        .orderBy(col("count").desc())


top_3_victim_profile = get_victim_profile(crime_top_3, race_codes_df)
bottom_3_victim_profile = get_victim_profile(crime_bottom_3, race_codes_df)

# Display results
print("Top 3 Income Areas Victim Profile for 2015:")
top_3_victim_profile.show(3)

print("Bottom 3 Income Areas Victim Profile for 2015:")
bottom_3_victim_profile.show(3)

end_time = time.time()
elapsed_time = end_time - start_time
print(f"Time taken: {elapsed_time:.2f} seconds")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Top 3 Income Areas Victim Profile for 2015:
+--------------------+-----+
|   Vict_Descent_Full|count|
+--------------------+-----+
|               White|  695|
|               Other|   86|
|Hispanic/Latin/Me...|   77|
+--------------------+-----+
only showing top 3 rows

Bottom 3 Income Areas Victim Profile for 2015:
+--------------------+-----+
|   Vict_Descent_Full|count|
+--------------------+-----+
|Hispanic/Latin/Me...| 3191|
|               Black|  872|
|               White|  430|
+--------------------+-----+
only showing top 3 rows

Time taken: 81.70 seconds

In [None]:
# 1 core 2gb: 81.70
# 2 cores 4gb: 64.49
# 4 cores 8gb: 58.90