# Advanced DB Project 2024

## Query 1

In [3]:
%%configure -f
{
    "conf": {
        "spark.executor.instances": "4"
    }
}

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
34,application_1738075734771_0035,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
11,application_1738075734771_0012,pyspark,idle,Link,Link,,
15,application_1738075734771_0016,pyspark,idle,Link,Link,,
18,application_1738075734771_0019,pyspark,idle,Link,Link,,
21,application_1738075734771_0022,pyspark,idle,Link,Link,,
22,application_1738075734771_0023,pyspark,idle,Link,Link,,
26,application_1738075734771_0027,pyspark,idle,Link,Link,,
30,application_1738075734771_0031,pyspark,idle,Link,Link,,
32,application_1738075734771_0033,pyspark,idle,Link,Link,,
34,application_1738075734771_0035,pyspark,idle,Link,Link,,✔


In [4]:
# Spark RDD code
from pyspark.sql import SparkSession
import time

#sc = SparkSession \
#    .builder \
#    .appName("RDD query 1 execution") \
#    .getOrCreate() \
#    .sparkContext
# Access configuration
conf = spark.sparkContext.getConf()
sc = spark.sparkContext

# Print relevant executor settings
print("Executor Instances:", conf.get("spark.executor.instances"))

start_time = time.time()

# Loading crime data from the 2 available datasets (2010-2019, 2020-present)
crime_data_2010_2019 = sc.textFile("s3://initial-notebook-data-bucket-dblab-905418150721/CrimeData/Crime_Data_from_2010_to_2019_20241101.csv") \
                .map(lambda x: (x.split(",")))
crime_data_2020_present = sc.textFile("s3://initial-notebook-data-bucket-dblab-905418150721/CrimeData/Crime_Data_from_2020_to_Present_20241101.csv") \
                .map(lambda x: (x.split(",")))

# Extract the header row
header = crime_data_2010_2019.first()

# Get the indices of the columns to keep
dr_no_index = header.index("DR_NO")
vict_age_index = header.index("Vict Age")
crm_cd_desc_index = header.index("Crm Cd Desc")

# Select only the required columns for each dataset
selected_crime_data_2010_2019 = crime_data_2010_2019.map(
    lambda row: (row[dr_no_index], row[vict_age_index], row[crm_cd_desc_index])
)
selected_crime_data_2020_present = crime_data_2020_present.map(
    lambda row: (row[dr_no_index], row[vict_age_index], row[crm_cd_desc_index])
)

# Keep the rows which contain the term "aggravated assault" on the Crm Cd Desc column
filtered_crime_data_2010_2019 = crime_data_2010_2019.filter(lambda row: "AGGRAVATED ASSAULT" in row[crm_cd_desc_index])
filtered_crime_data_2020_present = crime_data_2020_present.filter(lambda row: "AGGRAVATED ASSAULT" in row[crm_cd_desc_index])

# Merge the filtered data
merged_data = filtered_crime_data_2010_2019.union(filtered_crime_data_2020_present)

# Split into age groups then sort by descending age 
children = merged_data.filter(lambda row: int(row[vict_age_index]) < 18) \
                .sortBy(lambda x: x[1], ascending=False)
young_adults = merged_data.filter(lambda row: int(row[vict_age_index]) >= 18 and int(row[vict_age_index]) <= 24) \
                .sortBy(lambda x: x[1], ascending=False)
adults = merged_data.filter(lambda row: int(row[vict_age_index]) >= 25 and int(row[vict_age_index]) <= 64) \
                .sortBy(lambda x: x[1], ascending=False)
elders = merged_data.filter(lambda row: int(row[vict_age_index]) > 64) \
                .sortBy(lambda x: x[1], ascending=False)

# Test results
print(children.take(1))
print(young_adults.take(1))
print(adults.take(1))
print(elders.take(1))

elapsed_time = time.time()-start_time
print(f"Time taken: {elapsed_time:.2f} seconds")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Executor Instances: 4
[['191522756', '12/31/2019 12:00:00 AM', '10/19/2019 12:00:00 AM', '1500', '15', 'N Hollywood', '1526', '1', '235', 'CHILD ABUSE (PHYSICAL) - AGGRAVATED ASSAULT', '0913 0334 0449 0416 0419 0552 0400 0408', '16', 'F', 'W', '502', '"MULTI-UNIT DWELLING (APARTMENT', ' DUPLEX', ' ETC)"', '205', 'KITCHEN KNIFE', 'IC', 'Invest Cont', '235', '', '', '', '6000    HAZELHURST                   PL', '', '34.1794', '-118.3856']]
[['240404002', '12/31/2023 12:00:00 AM', '12/31/2023 12:00:00 AM', '1700', '04', 'Hollenbeck', '0477', '1', '236', 'INTIMATE PARTNER - AGGRAVATED ASSAULT', '2000 0913 1813 0400 0411', '21', 'M', 'H', '501', 'SINGLE FAMILY DWELLING', '208', 'RAZOR', 'IC', 'Invest Cont', '236', '', '', '', 'WHITTIER', 'SOTO', '34.0344', '-118.2157']]
[['240204018', '12/31/2023 12:00:00 AM', '12/31/2023 12:00:00 AM', '1600', '02', 'Rampart', '0233', '1', '236', 'INTIMATE PARTNER - AGGRAVATED ASSAULT', '2000 0416 1813 0913 2002', '26', 'F', 'H', '502', '"MULTI-UNIT DWELLI

In [5]:
# Spark Dataframe Code 
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, StructType, StringType
from pyspark.sql.functions import col, desc
import time

#spark = SparkSession \
#    .builder \
#    .appName("DF query 1 execution") \
#    .getOrCreate() \
# Access configuration
conf = spark.sparkContext.getConf()

# Print relevant executor settings
print("Executor Instances:", conf.get("spark.executor.instances"))

start_time_df = time.time()

# Loading crime data from the 2 available datasets (2010-2019, 2020-present)
# We select only the columns needed for this query which are DR_NO (id), Vict Age, Crm Cd Desc 
crime_data_2010_2019_df = spark.read.format('csv') \
                .options(header='true') \
                .load("s3://initial-notebook-data-bucket-dblab-905418150721/CrimeData/Crime_Data_from_2010_to_2019_20241101.csv") \
                .select("DR_NO", "Crm Cd Desc", "Vict Age")
crime_data_2020_present_df = spark.read.format('csv') \
                .options(header='true') \
                .load("s3://initial-notebook-data-bucket-dblab-905418150721/CrimeData/Crime_Data_from_2020_to_Present_20241101.csv") \
                .select("DR_NO", "Crm Cd Desc", "Vict Age")

# Filter rows where Crime Cd Desc contains the term "AGGRAVATED ASSAULT"
filtered_crime_data_2010_2019_df = crime_data_2010_2019_df.filter(col("Crm Cd Desc").rlike("AGGRAVATED ASSAULT"))
filtered_crime_data_2020_present_df = crime_data_2020_present_df.filter(col("Crm Cd Desc").rlike("AGGRAVATED ASSAULT"))

# Merge Results
merged_df = filtered_crime_data_2010_2019_df.union(filtered_crime_data_2020_present_df)

# Filtering and Sorting
children = merged_df \
            .filter(col("Vict Age").cast('int') < 18) \
            .sort(desc("Vict Age"))
young_adults = merged_df \
            .filter((col("Vict Age").cast('int') >= 18) & (col("Vict Age").cast('int') <= 24)) \
            .sort(desc("Vict Age"))
adults = merged_df \
            .filter((col("Vict Age").cast('int') > 24) & (col("Vict Age").cast('int') <= 64)) \
            .sort(desc("Vict Age"))
elders = merged_df \
            .filter(col("Vict Age").cast('int') > 64) \
            .sort(desc("Vict Age"))


# Test results
children.show(5)
young_adults.show(5)
adults.show(5)
elders.show(5)

end_time_df = time.time()
elapsed_time = end_time_df - start_time_df
print(f"Time taken: {elapsed_time:.2f} seconds")

children.explain(mode="formatted")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Executor Instances: 4
+---------+--------------------+--------+
|    DR_NO|         Crm Cd Desc|Vict Age|
+---------+--------------------+--------+
|121800853|CHILD ABUSE (PHYS...|       9|
|231314578|INTIMATE PARTNER ...|       9|
|220114625|CHILD ABUSE (PHYS...|       9|
|230321104|ASSAULT WITH DEAD...|       9|
|231917406|CHILD ABUSE (PHYS...|       9|
+---------+--------------------+--------+
only showing top 5 rows

+---------+--------------------+--------+
|    DR_NO|         Crm Cd Desc|Vict Age|
+---------+--------------------+--------+
|190207473|ASSAULT WITH DEAD...|      24|
|220616673|ASSAULT WITH DEAD...|      24|
|231408338|INTIMATE PARTNER ...|      24|
|220610361|ASSAULT WITH DEAD...|      24|
|240705661|ASSAULT WITH DEAD...|      24|
+---------+--------------------+--------+
only showing top 5 rows

+---------+--------------------+--------+
|    DR_NO|         Crm Cd Desc|Vict Age|
+---------+--------------------+--------+
|191810607|ASSAULT WITH DEAD...|      64|
|231

## Query 2

In [24]:
# Question 2, part a) (δηλαδή implementation του 2ου query)
# DataFrame
# part 1 πρώτα, μην ξεχασω τη when κατω todo: remove this
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, when, desc, rank, udf
from pyspark.sql.window import Window
from pyspark.sql.types import BooleanType
import time

# Initialize SparkSession
spark = SparkSession.builder.appName("DF query 2 execution").getOrCreate()

# Define paths to your dataset files
csv_path_2010_2019 = "s3://initial-notebook-data-bucket-dblab-905418150721/CrimeData/Crime_Data_from_2010_to_2019_20241101.csv"
csv_path_2020_present = "s3://initial-notebook-data-bucket-dblab-905418150721/CrimeData/Crime_Data_from_2020_to_Present_20241101.csv"

# Load the datasets and combine them
crime_data_df = spark.read.format("csv").options(header="true").load(csv_path_2010_2019).union(
    spark.read.format("csv").options(header="true").load(csv_path_2020_present)
)

# Start overall execution timer
start_time_df = time.time()

# Define mapping for CaseStatus codes
status_mapping = {
    "IC": False,  # Investigation Continued (not closed)
    "UNK": False,  # Unknown (not closed)
}

# Define UDF to determine if a case is closed
is_closed_udf = udf(lambda status: status_mapping.get(status, True), BooleanType())

#print(f"Step 1 - Filtering rows: {end_step1 - start_step1:.2f} seconds")

# Extract Year and keep relevant columns
start_step2 = time.time()

#select ωστε το output να ειναι ομοιο με τον Πινακα 2: Υποδειγμα αποτελεσματος Query 2
crime_data_filtered = crime_data_df.select(
    col("DATE OCC").substr(7, 4).alias("Year"),
    col("AREA NAME").alias("PoliceStation"),
    col("Status").alias("CaseStatus")
).filter(
    col("Year").isNotNull() & col("PoliceStation").isNotNull() & col("CaseStatus").isNotNull()
).withColumn(
    "IsClosed", is_closed_udf(col("CaseStatus"))  # Add IsClosed column
)
end_step2 = time.time()
#print(f"Step 2 - Extracting and filtering columns: {end_step2 - start_step2:.2f} seconds")

# Calculate closed case rate
start_step3 = time.time()
closed_case_rate = crime_data_filtered.groupBy("Year", "PoliceStation").agg(
    count(when(col("IsClosed") == True, True)).alias("ClosedCases"),
    count("*").alias("TotalCases")
).withColumn(
    "closedCaseRate", (col("ClosedCases") / col("TotalCases")) * 100
)
end_step3 = time.time()
#print(f"Step 3 - Calculating closed case rate: {end_step3 - start_step3:.2f} seconds")

# Rank police stations by closed case rate
start_step4 = time.time()
window_spec = Window.partitionBy("Year").orderBy(desc("ClosedCaseRate"))
ranked_stations = closed_case_rate.withColumn("Rank", rank().over(window_spec))
end_step4 = time.time()
#print(f"Step 4 - Ranking police stations: {end_step4 - start_step4:.2f} seconds")

# Filter top 3 stations per year
start_step5 = time.time()
top_3_stations_df = ranked_stations.filter(col("Rank") <= 3).orderBy("Year", "Rank")
end_step5 = time.time()
#print(f"Step 5 - Filtering top 3 stations: {end_step5 - start_step5:.2f} seconds")

# Show results
top_3_stations_df.select(
    col("Year").alias("year"),
    col("PoliceStation").alias("precinct"),
    col("ClosedCaseRate").alias("closed_case_rate"),
    col("Rank").alias("#")
).show()

# End overall execution timer
end_time_df = time.time()
print(f"Total DataFrame API Execution Time: {end_time_df - start_time_df:.2f} seconds")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----+-----------+------------------+---+
|year|   precinct|  closed_case_rate|  #|
+----+-----------+------------------+---+
|2010|    Rampart| 32.84713448949121|  1|
|2010|    Olympic|31.515289821999087|  2|
|2010|     Harbor| 29.36028339237341|  3|
|2011|    Olympic|35.040060090135206|  1|
|2011|    Rampart|  32.4964471814306|  2|
|2011|     Harbor| 28.51336246316431|  3|
|2012|    Olympic| 34.29708533302119|  1|
|2012|    Rampart| 32.46000463714352|  2|
|2012|     Harbor|29.509585848956675|  3|
|2013|    Olympic| 33.58217940999398|  1|
|2013|    Rampart|  32.1060382916053|  2|
|2013|     Harbor|29.727164887307232|  3|
|2014|   Van Nuys|  32.0215235281705|  1|
|2014|West Valley| 31.49754809505847|  2|
|2014|    Mission|31.224939855653567|  3|
|2015|   Van Nuys|32.265140677157845|  1|
|2015|    Mission|30.463762673676303|  2|
|2015|   Foothill|30.353001803658852|  3|
|2016|   Van Nuys|32.194518462124094|  1|
|2016|West Valley| 31.40146437042384|  2|
+----+-----------+----------------

In [25]:
#same, just SQL API implementation
from pyspark.sql.functions import col
import time

# Start time for SQL API
start_time_sql = time.time()

# Register the DataFrame as a temporary SQL table
crime_data_df.createOrReplaceTempView("crime_data")

# SQL query for Query 2
sql_query = """
    WITH ClosedCases AS (
        SELECT 
            SUBSTRING(`DATE OCC`, 7, 4) AS Year,
            `AREA NAME` AS PoliceStation,
            COUNT(CASE WHEN `Status` NOT IN ('IC', 'UNK') THEN 1 END) AS ClosedCases,
            COUNT(*) AS TotalCases,
            (COUNT(CASE WHEN `Status` NOT IN ('IC', 'UNK') THEN 1 END) * 100.0 / COUNT(*)) AS ClosedCaseRate
        FROM crime_data
        WHERE `DATE OCC` IS NOT NULL
          AND `AREA NAME` IS NOT NULL
          AND `Status` IS NOT NULL
        GROUP BY SUBSTRING(`DATE OCC`, 7, 4), `AREA NAME`
    ),
    RankedStations AS (
        SELECT 
            Year,
            PoliceStation,
            ClosedCaseRate,
            RANK() OVER (PARTITION BY Year ORDER BY ClosedCaseRate DESC) AS Rank
        FROM ClosedCases
    )
    SELECT 
        Year AS year,
        PoliceStation AS precinct,
        ClosedCaseRate AS closed_case_rate,
        Rank AS `#`
    FROM RankedStations
    WHERE Rank <= 3
    ORDER BY Year, Rank
"""

# Execute SQL query
top_3_stations_sql = spark.sql(sql_query)

# Show results
top_3_stations_sql.show()

# End time for SQL API
end_time_sql = time.time()

# Print execution time
print(f"SQL API Execution Time: {end_time_sql - start_time_sql:.2f} seconds")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----+-----------+-----------------+---+
|year|   precinct| closed_case_rate|  #|
+----+-----------+-----------------+---+
|2010|    Rampart|32.84713448949121|  1|
|2010|    Olympic|31.51528982199909|  2|
|2010|     Harbor|29.36028339237341|  3|
|2011|    Olympic|35.04006009013520|  1|
|2011|    Rampart|32.49644718143060|  2|
|2011|     Harbor|28.51336246316431|  3|
|2012|    Olympic|34.29708533302119|  1|
|2012|    Rampart|32.46000463714352|  2|
|2012|     Harbor|29.50958584895668|  3|
|2013|    Olympic|33.58217940999398|  1|
|2013|    Rampart|32.10603829160530|  2|
|2013|     Harbor|29.72716488730724|  3|
|2014|   Van Nuys|32.02152352817050|  1|
|2014|West Valley|31.49754809505847|  2|
|2014|    Mission|31.22493985565357|  3|
|2015|   Van Nuys|32.26514067715784|  1|
|2015|    Mission|30.46376267367630|  2|
|2015|   Foothill|30.35300180365885|  3|
|2016|   Van Nuys|32.19451846212410|  1|
|2016|West Valley|31.40146437042384|  2|
+----+-----------+-----------------+---+
only showing top

In [None]:
#part b of question 2 (2b)
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, when, desc, rank, udf
from pyspark.sql.window import Window
from pyspark.sql.types import BooleanType
import time

# Initialize SparkSession
spark = SparkSession.builder.appName("Query2 Parquet vs CSV Comparison").getOrCreate()

# Define paths
csv_path_2010_2019 = "s3://initial-notebook-data-bucket-dblab-905418150721/CrimeData/Crime_Data_from_2010_to_2019_20241101.csv"
csv_path_2020_present = "s3://initial-notebook-data-bucket-dblab-905418150721/CrimeData/Crime_Data_from_2020_to_Present_20241101.csv"
parquet_path = "s3://groups-bucket-dblab-905418150721/group31/main_dataset_unique.parquet"

# Step 1: Save dataset as a single Parquet file
#print("Saving dataset as a unique Parquet file...")
start_parquet_save = time.time()

# Combine datasets and save as a single Parquet file
crime_data_df = spark.read.format("csv").options(header="true").load(csv_path_2010_2019).union(
    spark.read.format("csv").options(header="true").load(csv_path_2020_present)
)

crime_data_df.coalesce(1).write.mode("overwrite").parquet(parquet_path)
end_parquet_save = time.time()
#print(f"Parquet Save Time: {end_parquet_save - start_parquet_save:.2f} seconds")
#todo : uncomment εαν χρειαζονται αναλυτικοι χρονοι (ζηταται μονο το execution time οποτε μαλλον οχι)

# Step 2: Query 2 on CSV
#print("Executing Query 2 on CSV format...")
start_csv_query = time.time()

# Load CSV files and apply Query 2 logic
crime_data_csv = spark.read.format("csv") \
    .options(header="true") \
    .load(csv_path_2010_2019) \
    .union(
        spark.read.format("csv")
        .options(header="true")
        .load(csv_path_2020_present)
    )

# Define the status mapping for closed cases
status_mapping = {"IC": False, "UNK": False}
is_closed_udf = udf(lambda status: status_mapping.get(status, True), BooleanType())

crime_data_csv_filtered = crime_data_csv.select(
    col("DATE OCC").substr(7, 4).alias("Year"),
    col("AREA NAME").alias("PoliceStation"),
    col("Status").alias("CaseStatus")
).filter(
    col("Year").isNotNull() & col("PoliceStation").isNotNull() & col("CaseStatus").isNotNull()
).withColumn(
    "IsClosed", is_closed_udf(col("CaseStatus"))
)

closed_case_rate = crime_data_csv_filtered.groupBy("Year", "PoliceStation").agg(
    count(when(col("IsClosed") == True, True)).alias("ClosedCases"),
    count("*").alias("TotalCases")
).withColumn(
    "ClosedCaseRate", (col("ClosedCases") / col("TotalCases")) * 100
)

window_spec = Window.partitionBy("Year").orderBy(desc("ClosedCaseRate"))
top_3_stations_csv = closed_case_rate.withColumn("Rank", rank().over(window_spec)) \
    .filter(col("Rank") <= 3).orderBy("Year", "Rank")

# Select only the desired columns
top_3_stations_csv.select(
    col("Year").alias("year"),
    col("PoliceStation").alias("precinct"),
    col("ClosedCaseRate").alias("closed_case_rate"),
    col("Rank").alias("#")
).show()

end_csv_query = time.time()
print(f"CSV Query Execution Time: {end_csv_query - start_csv_query:.2f} seconds")

# Step 3: Query 2 on Parquet
#print("Executing Query 2 on Parquet format...")
start_parquet_query = time.time()

# Load Parquet file and apply Query 2 logic
crime_data_parquet = spark.read.format("parquet").load(parquet_path)

crime_data_parquet_filtered = crime_data_parquet.select(
    col("DATE OCC").substr(7, 4).alias("Year"),
    col("AREA NAME").alias("PoliceStation"),
    col("Status").alias("CaseStatus")
).filter(
    col("Year").isNotNull() & col("PoliceStation").isNotNull() & col("CaseStatus").isNotNull()
).withColumn(
    "IsClosed", is_closed_udf(col("CaseStatus"))
)

closed_case_rate_parquet = crime_data_parquet_filtered.groupBy("Year", "PoliceStation").agg(
    count(when(col("IsClosed") == True, True)).alias("ClosedCases"),
    count("*").alias("TotalCases")
).withColumn(
    "ClosedCaseRate", (col("ClosedCases") / col("TotalCases")) * 100
)

top_3_stations_parquet = closed_case_rate_parquet.withColumn("Rank", rank().over(window_spec)) \
    .filter(col("Rank") <= 3).orderBy("Year", "Rank")

# Select only the desired columns
top_3_stations_parquet.select(
    col("Year").alias("year"),
    col("PoliceStation").alias("precinct"),
    col("ClosedCaseRate").alias("closed_case_rate"),
    col("Rank").alias("#")
).show()

end_parquet_query = time.time()
print(f"Parquet Query Execution Time: {end_parquet_query - start_parquet_query:.2f} seconds")

print('\n'*3)
#λιγος κενος χωρος να φαινεται η συγκριση

# Step 4: Summary of results
print("CSV Vs Parquet Execution Times:")
print(f"CSV Query Execution Time: {end_csv_query - start_csv_query:.2f} seconds")
print(f"Parquet Query Execution Time: {end_parquet_query - start_parquet_query:.2f} seconds")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Query 3

In [9]:
from sedona.spark import *
from pyspark.sql.functions import col, sum, avg, countDistinct, regexp_replace, count, desc, round
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
import time

# Create spark Session
spark = SparkSession.builder \
    .appName("Query 3 implementation") \
    .getOrCreate()

# Create sedona context
sedona = SedonaContext.create(spark)

# Read the file from s3
geojson_path = "s3://initial-notebook-data-bucket-dblab-905418150721/2010_Census_Blocks.geojson"
blocks_df = sedona.read.format("geojson") \
            .option("multiLine", "true").load(geojson_path) \
            .selectExpr("explode(features) as features") \
            .select("features.*")

# Formatting magic
flattened_df = blocks_df.select( \
                [col(f"properties.{col_name}").alias(col_name) for col_name in \
                blocks_df.schema["properties"].dataType.fieldNames()] + ["geometry"]) \
            .drop("properties") \
            .drop("type")

# Load la income per house dataframe
la_income_2015_df = spark.read.format('csv') \
                .options(header='true') \
                .load("s3://initial-notebook-data-bucket-dblab-905418150721/LA_income_2015.csv") \

# We will join the 2010 data with the 2015 data based on the zip code of the areas
# To do so the zipcode columns must have the same data type
# So we change the datatype of flattened_df accordingly
# We also filter out null values in the columns of interest and keep the rows corresponding to LA city areas
flattened_df = flattened_df \
            .withColumn("ZCTA10", flattened_df["ZCTA10"].cast("int")) \
            .filter(col("COMM").isNotNull() & col("POP_2010").isNotNull() & col("HOUSING10").isNotNull()) \
            .filter(col("CITY") == "Los Angeles")

# Join on Zip Code
joined_df = flattened_df.join(la_income_2015_df.hint("broadcast"), flattened_df["ZCTA10"] == la_income_2015_df["Zip Code"], how="inner")

start_time = time.time()
joined_df.collect()
print(f"Zip Codes join broadcast: {time.time()-start_time}")

# Convert the Estimated Median Income column to integer
# By removing "$" and ","
joined_df = joined_df.withColumn(
    "Median Income",
    regexp_replace(col("Estimated Median Income"), r"[\$,]", "").cast("int")
)

# Average Income per Person = (Estimated Median Income)*(Number of Houses)/(Community Population)
average_income_per_person_df = joined_df \
        .groupBy("COMM") \
        .agg(
            sum("HOUSING10").alias("Comm Houses"),
            sum("POP_2010").alias("Comm Population"),
            avg("Median Income").alias("Comm Median Income per Household")
        ) \
        .withColumn("Avg Income per Person", col("Comm Median Income per Household")*col("Comm Houses")/col("Comm Population")) \

average_income_per_person_df.show()

# Load and join crime datasets
crime_data_2010_2019_df = spark.read.format('csv') \
                .options(header='true') \
                .load("s3://initial-notebook-data-bucket-dblab-905418150721/CrimeData/Crime_Data_from_2010_to_2019_20241101.csv") \
                .select("DR_NO", "LAT", "LON")   # No other columns are needed
crime_data_2020_present_df = spark.read.format('csv') \
                .options(header='true') \
                .load("s3://initial-notebook-data-bucket-dblab-905418150721/CrimeData/Crime_Data_from_2020_to_Present_20241101.csv") \
                .select("DR_NO", "LAT", "LON")

crime_data_df = crime_data_2010_2019_df.union(crime_data_2020_present_df)

# Create geometry column for the crime data 
crime_data_df = crime_data_df.withColumn("geometry", ST_Point("LON", "LAT"))

# We will pre-calculate the community population that we need for this query 
# Because it will be harder after the join
# Define a window partitioned by "COMM"
community_window = Window.partitionBy("COMM")

# Calculate the total population per community and add it as a new column
df_with_comm_population = flattened_df.withColumn(
    "Comm Population",
    sum("POP_2010").over(community_window)
)

# Join based on geometry
joined_df_1 = crime_data_df \
        .join(df_with_comm_population, ST_Within(crime_data_df.geometry, df_with_comm_population.geometry), "inner")

#start_time = time.time()
#joined_df_1.collect()
#print(f"geometry join broadcast:{time.time()-start_time}")

# Crimes per person = Total Crimes/Population
crimes_per_person_df = joined_df_1 \
            .groupBy("COMM", "Comm Population") \
            .agg(
                count("DR_NO").alias("Total Number of Crimes in Comm"),
            ) \
            .withColumn(
                "Crimes per Person", 
                round(col("Total Number of Crimes in Comm")/col("Comm Population"), 10)
            )

crimes_per_person_df.show()

info_per_person_df = crimes_per_person_df.hint("shuffle_replicate_nl").join(average_income_per_person_df, on=["COMM", "Comm Population"], how="inner").orderBy(desc("Avg Income per Person"))

# Force the execution of the join
# And measure performance
start_time = time.time()
info_per_person_df.collect()
print(f"Shuffle Replicate NL: {time.time()-start_time}")

info_per_person_df.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Zip Codes join broadcast: 13.92885160446167
+------------------+-----------+---------------+--------------------------------+---------------------+
|              COMM|Comm Houses|Comm Population|Comm Median Income per Household|Avg Income per Person|
+------------------+-----------+---------------+--------------------------------+---------------------+
|    Toluca Terrace|        541|           1301|                         48499.0|   20167.531898539586|
|      Elysian Park|       1993|           5267|                35151.9801980198|   13301.290399592457|
|          Longwood|       1474|           4210|                         38330.0|   13420.052256532066|
|     Green Meadows|       5204|          19821|              30573.460674157304|   8027.0566242023415|
|  Cadillac-Corning|       2215|           6665|                         62425.2|    20745.95918979745|
|          Mid-city|       6692|          14339|                         46571.0|    21734.64899923286|
|   Lincoln Heights|

In [10]:
joined_df.explain(mode="formatted")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

== Physical Plan ==
AdaptiveSparkPlan (11)
+- Project (10)
   +- BroadcastHashJoin Inner BuildRight (9)
      :- Project (5)
      :  +- Filter (4)
      :     +- Generate (3)
      :        +- Filter (2)
      :           +- Scan geojson  (1)
      +- BroadcastExchange (8)
         +- Filter (7)
            +- Scan csv  (6)


(1) Scan geojson 
Output [1]: [features#6814]
Batched: false
Location: InMemoryFileIndex [s3://initial-notebook-data-bucket-dblab-905418150721/2010_Census_Blocks.geojson]
PushedFilters: [IsNotNull(features)]
ReadSchema: struct<features:array<struct<geometry:binary,properties:struct<BG10:string,BG10FIP10:string,BG12:string,CB10:string,CEN_FIP13:string,CITY:string,CITYCOM:string,COMM:string,CT10:string,CT12:string,CTCB10:string,HD_2012:bigint,HD_NAME:string,HOUSING10:bigint,LA_FIP10:string,OBJECTID:bigint,POP_2010:bigint,PUMA10:string,SPA_2012:bigint,SPA_NAME:string,SUP_DIST:string,SUP_LABEL:string,ShapeSTArea:double,ShapeSTLength:double,ZCTA10:string>,type:string>

## Query 4

In [11]:
%%configure -f
{
    "conf": {
        "spark.executor.instances": "2",
        "spark.executor.memory": "2g",
        "spark.executor.cores": "1"
    }
}

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
35,application_1738075734771_0036,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
11,application_1738075734771_0012,pyspark,idle,Link,Link,,
15,application_1738075734771_0016,pyspark,idle,Link,Link,,
18,application_1738075734771_0019,pyspark,idle,Link,Link,,
21,application_1738075734771_0022,pyspark,idle,Link,Link,,
22,application_1738075734771_0023,pyspark,idle,Link,Link,,
26,application_1738075734771_0027,pyspark,idle,Link,Link,,
30,application_1738075734771_0031,pyspark,idle,Link,Link,,
35,application_1738075734771_0036,pyspark,idle,Link,Link,,✔
36,application_1738075734771_0037,pyspark,starting,,,,


In [12]:
from sedona.spark import *
from pyspark.sql.functions import col, sum, avg, countDistinct, regexp_replace, count, desc, round
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
import time

# Create spark Session
spark = SparkSession.builder \
    .appName("Query 4 implementation") \
    .getOrCreate()

conf = spark.sparkContext.getConf()

# Print relevant executor settings
print("Executor Instances:", conf.get("spark.executor.instances"))
print("Executor Memory:", conf.get("spark.executor.memory"))
print("Executor Cores:", conf.get("spark.executor.cores"))

# Create sedona context
sedona = SedonaContext.create(spark)

# We now follow the same steps as query 3 to recalculate the average income per person

start_time = time.time()

# Read the file from s3
geojson_path = "s3://initial-notebook-data-bucket-dblab-905418150721/2010_Census_Blocks.geojson"
blocks_df = sedona.read.format("geojson") \
            .option("multiLine", "true").load(geojson_path) \
            .selectExpr("explode(features) as features") \
            .select("features.*")

# Formatting magic
flattened_df = blocks_df.select( \
                [col(f"properties.{col_name}").alias(col_name) for col_name in \
                blocks_df.schema["properties"].dataType.fieldNames()] + ["geometry"]) \
            .drop("properties") \
            .drop("type")

# Load la income per house dataframe
la_income_2015_df = spark.read.format('csv') \
                .options(header='true') \
                .load("s3://initial-notebook-data-bucket-dblab-905418150721/LA_income_2015.csv") \

# We will join the 2010 data with the 2015 data based on the zip code of the areas
# To do so the zipcode columns must have the same data type
# So we change the datatype of flattened_df accordingly
flattened_df = flattened_df \
        .withColumn("ZCTA10", flattened_df["ZCTA10"].cast("int")) \
        .filter(col("COMM").isNotNull() & col("POP_2010").isNotNull() & col("HOUSING10").isNotNull()) \
        .filter(col("CITY") == "Los Angeles")

# Join on Zip Code
joined_df = flattened_df.join(la_income_2015_df, flattened_df["ZCTA10"] == la_income_2015_df["Zip Code"], how="inner")

# Convert the Estimated Median Income column to integer
joined_df = joined_df.withColumn(
    "Median Income",
    regexp_replace(col("Estimated Median Income"), r"[\$,]", "").cast("int")
)

# Average Income per Person = (Estimated Median Income)*(Number of Houses)/(Community Population)
average_income_per_person_df = joined_df \
        .groupBy("COMM") \
        .agg(
            sum("HOUSING10").alias("Comm Houses"),
            sum("POP_2010").alias("Comm Population"),
            avg("Median Income").alias("Comm Median Income per Household")
        ) \
        .withColumn("Avg Income per Person", col("Comm Median Income per Household")*col("Comm Houses")/col("Comm Population")) \
        .orderBy(desc("Avg Income per Person"))

# End of query 3 recalculations

# Load 2010-2019 crime dataset 
crime_data_2010_2019_df = spark.read.format('csv') \
                .options(header='true') \
                .load("s3://initial-notebook-data-bucket-dblab-905418150721/CrimeData/Crime_Data_from_2010_to_2019_20241101.csv") \
                .select("DR_NO", "DATE OCC", "Vict Descent", "LON", "LAT")  # No other columns are needed

# Keep 2015 data
crime_data_filtered_df = crime_data_2010_2019_df.filter(col("DATE OCC").substr(7, 4) == "2015")

# Create geometry column for the crime data 
crime_data_filtered_df = crime_data_filtered_df.withColumn("geometry", ST_Point("LON", "LAT"))

# Find top 3 richest and top 3 poorest areas
top_3_richest_areas = average_income_per_person_df.orderBy(desc("Avg Income per Person")).limit(3)
top_3_poorest_areas = average_income_per_person_df.orderBy(col("Avg Income per Person")).limit(3)

# We need every block from each area to do the geometry join
# So we join with the 2010 census blocks dataframe
# This is like filtering the census dataframe by only choosing the desired areas
richest_areas_blocks_df = flattened_df.join(top_3_richest_areas, on="COMM", how="inner")
poorest_areas_blocks_df = flattened_df.join(top_3_poorest_areas, on="COMM", how="inner")

# Join based on geometry to find the crimes in the 3 areas of each dataset
joined_df_richest = crime_data_filtered_df \
    .join(richest_areas_blocks_df, ST_Within(crime_data_filtered_df.geometry, richest_areas_blocks_df.geometry), "inner")
joined_df_poorest = crime_data_filtered_df \
    .join(poorest_areas_blocks_df, ST_Within(crime_data_filtered_df.geometry, poorest_areas_blocks_df.geometry), "inner")

# Load race codes dataframe
race_eth_codes_df = spark.read.format('csv') \
                .options(header='true') \
                .load("s3://initial-notebook-data-bucket-dblab-905418150721/RE_codes.csv") \
                .withColumnRenamed("Vict Descent Full", "Victim Descent")

# Count the victims per race, then join with the ethnicity data frame to get the full ethnicity names
# Select the desired columns and sort in descending order
richest_vict_descent_df = joined_df_richest.groupBy("Vict Descent").agg(count("*").alias("#")) \
    .join(race_eth_codes_df, on="Vict Descent", how="inner") \
    .select("Victim Descent", "#") \
    .orderBy(desc("#"))
poorest_vict_descent_df = joined_df_poorest.groupBy("Vict Descent").agg(count("*").alias("#")) \
    .join(race_eth_codes_df, on="Vict Descent", how="inner") \
    .select("Victim Descent", "#") \
    .orderBy(desc("#"))

# NOTE: Some Vict Descent values are null
richest_vict_descent_df.show()
poorest_vict_descent_df.show()

print(f"Elapsed time: {time.time()-start_time}")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Executor Instances: 2
Executor Memory: 2g
Executor Cores: 1
+--------------------+---+
|      Victim Descent|  #|
+--------------------+---+
|               White|894|
|               Other|156|
|Hispanic/Latin/Me...| 97|
|               Black| 55|
|             Unknown| 50|
|         Other Asian| 31|
|American Indian/A...|  1|
|             Chinese|  1|
+--------------------+---+

+--------------------+----+
|      Victim Descent|   #|
+--------------------+----+
|Hispanic/Latin/Me...|3191|
|               Black| 872|
|               White| 430|
|               Other| 265|
|         Other Asian| 140|
|             Unknown|  26|
|American Indian/A...|  24|
|              Korean|   5|
|             Chinese|   3|
|            Filipino|   2|
|         AsianIndian|   1|
+--------------------+----+

Elapsed time: 94.40970206260681

## Query 5

In [22]:
%%configure -f
{
    "conf": {
        "spark.executor.instances": "4",
        "spark.executor.memory": "4g",
        "spark.executor.cores": "2"
    }
}

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
44,application_1738075734771_0045,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
11,application_1738075734771_0012,pyspark,idle,Link,Link,,
15,application_1738075734771_0016,pyspark,idle,Link,Link,,
18,application_1738075734771_0019,pyspark,idle,Link,Link,,
21,application_1738075734771_0022,pyspark,idle,Link,Link,,
22,application_1738075734771_0023,pyspark,idle,Link,Link,,
26,application_1738075734771_0027,pyspark,idle,Link,Link,,
30,application_1738075734771_0031,pyspark,idle,Link,Link,,
43,application_1738075734771_0044,pyspark,busy,Link,Link,,
44,application_1738075734771_0045,pyspark,idle,Link,Link,,✔


In [23]:
from sedona.spark import *
from pyspark.sql.functions import col, sum, avg, countDistinct, regexp_replace, count, desc, round
from pyspark.sql import SparkSession
import time

# Create spark Session
# spark = SparkSession.builder \
#    .appName("Query 3 implementation") \
#    .getOrCreate()

# Create sedona context
sedona = SedonaContext.create(spark)

conf = spark.sparkContext.getConf()

# Print relevant executor settings
print("Executor Instances:", conf.get("spark.executor.instances"))
print("Executor Memory:", conf.get("spark.executor.memory"))
print("Executor Cores:", conf.get("spark.executor.cores"))

start_time = time.time()

# Load and join crime datasets
# For some reason AREA is written with a whitespace in 2010-2019 dataset
# So we renamed it
crime_data_2010_2019_df = spark.read.format('csv') \
                .options(header='true') \
                .load("s3://initial-notebook-data-bucket-dblab-905418150721/CrimeData/Crime_Data_from_2010_to_2019_20241101.csv") \
                .select("DR_NO", "AREA ", "LAT", "LON") \
                .withColumnRenamed("AREA ", "AREA")
crime_data_2020_present_df = spark.read.format('csv') \
                .options(header='true') \
                .load("s3://initial-notebook-data-bucket-dblab-905418150721/CrimeData/Crime_Data_from_2020_to_Present_20241101.csv") \
                .select("DR_NO", "AREA", "LAT", "LON")

crime_data_df = crime_data_2010_2019_df.union(crime_data_2020_present_df)

# Create geometry column for crime data after filtering out NULL values
crime_data_df = crime_data_df \
                .filter((crime_data_df["LON"] != 0) & (crime_data_df["LAT"] != 0)) \
                .withColumn("cd_geometry", ST_Point("LON", "LAT")) \
                .withColumn("AREA", crime_data_df["AREA"].cast("int"))

# Load police stations data
la_police_stations_df = spark.read.format('csv') \
                .options(header='true') \
                .load("s3://initial-notebook-data-bucket-dblab-905418150721/LA_Police_Stations.csv") \

# Create geomery column for police stations data
la_police_stations_df = la_police_stations_df \
                .withColumn("pd_geometry", ST_Point("x", "y")) \
                .withColumn("PREC", la_police_stations_df["PREC"].cast("int"))

# We needed to cast AREA and PREC to int because single digit numbers have value "0x" in crimes dataset
# And value "x" in police stations dataset

# Join the 2 dataframes on the LAPD number attribute  (1-21)
# This number is found in AREA attribute for the crime data and in PREC attribute for Police Stations data
joined_df = crime_data_df.join(la_police_stations_df, crime_data_df["AREA"] == la_police_stations_df["PREC"], how="inner")

# Calculate the distance of the crime from the police department responsible for the specific area
# The distance is calculated in km
distance_from_pd_df = joined_df.withColumn("distance_km", ST_DistanceSphere("cd_geometry", "pd_geometry")/1000) \
                .groupBy("DIVISION") \
                .agg(round(avg("distance_km"), 1).alias("average distance"), count("*").alias("#")) \
                .orderBy(desc("#"))

distance_from_pd_df.show()

print(f"Elapsed time: {time.time()-start_time}")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Executor Instances: 4
Executor Memory: 4g
Executor Cores: 2
+----------------+----------------+------+
|        DIVISION|average distance|     #|
+----------------+----------------+------+
|     77TH STREET|             2.7|206784|
|       SOUTHWEST|             2.7|192226|
|         PACIFIC|             3.9|170903|
|         CENTRAL|             1.1|166698|
| NORTH HOLLYWOOD|             2.6|164532|
|       SOUTHEAST|             2.1|161051|
|       HOLLYWOOD|             1.6|150663|
|          NEWTON|             2.1|148757|
|         OLYMPIC|             1.8|144962|
|         MISSION|             4.7|143600|
|       NORTHEAST|             3.8|142732|
|        VAN NUYS|             2.4|142194|
|         TOPANGA|             3.8|138642|
|      DEVONSHIRE|             3.9|137881|
|        WILSHIRE|             2.6|136199|
|         RAMPART|             1.6|135948|
|WEST LOS ANGELES|             3.6|134259|
|          HARBOR|             4.1|132911|
|     WEST VALLEY|             3.6|13