# Introduction

## Execution Instructions

- execute the `Time Measurement` cell in order to use the `@measure_time` decorator
- each query can be executed independently from other queries
- For Query 2, ensure `s3_path` points to a directory where the parquet file can be stored (else change the path or create dir)
- For Queries 4, 5
    1. select the configuration for `spark.executor` by executing the corresponding cell from `Configurations`
    2. execute the `Time Measurement` cell
    3. execute corresponding `PySpark & Sedona imports, Read Datasets, register functions` cells
    4. execute the cell defining the query execution function
    5. run a cell from `Experiments` section
    6. repeat for each configuration to be tested

## Time Measurement

In [1]:
# To log our application's execution time:
import time

# decorator to calculate duration
# taken by any function.
def measure_time(func):
    # added arguments inside the inner1,
    # if function takes any arguments,
    # can be added like this.
    def inner1(*args, **kwargs):

        # Start timing
        start_time = time.time()

        func(*args, **kwargs)

        # Stop timing and print out the execution duration
        end_time = time.time()
        elapsed_time = end_time - start_time
        print(f"Time taken by {func.__name__}: {elapsed_time:.2f} seconds")

    return inner1

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
3313,application_1732639283265_3269,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# Query 1

In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Query 1") \
    .config("spark.executor.instances", "4") \
    .config("spark.executor.memory", "2g") \
    .config("spark.executor.cores", "2") \
    .getOrCreate()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
3505,application_1732639283265_3461,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [2]:
crime_data_19 = spark.read.csv("s3://initial-notebook-data-bucket-dblab-905418150721/CrimeData/Crime_Data_from_2010_to_2019_20241101.csv", header=True, inferSchema=True)
crime_data_20 = spark.read.csv("s3://initial-notebook-data-bucket-dblab-905418150721/CrimeData/Crime_Data_from_2020_to_Present_20241101.csv", header=True, inferSchema=True)
crime_data = crime_data_19.union(crime_data_20)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [3]:
## DataFrame APIs ##

from pyspark.sql.functions import col, when
import time

start_time_df = time.time()
filtered_data = crime_data.filter(col("Crm Cd Desc").like("%AGGRAVATED ASSAULT%"))
updated_df = filtered_data.withColumn(
    "Age_Group",
    when(col("Vict Age") < 18, "Children")
    .when((col("Vict Age") >= 18) & (col("Vict Age") < 25), "Young Adults")
    .when((col("Vict Age") >= 25) & (col("Vict Age") < 65), "Adults")
    .otherwise("Seniors")
)

age_group_counts = updated_df.groupBy("Age_Group").count()
age_group_counts = age_group_counts.orderBy(col("count").desc())
age_group_counts.show()
end_time_df = time.time()
print("DataFrame API execution time: ", end_time_df - start_time_df)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------------+------+
|   Age_Group| count|
+------------+------+
|      Adults|121093|
|Young Adults| 33605|
|    Children| 15928|
|     Seniors|  5985|
+------------+------+

DataFrame API execution time:  5.511902332305908

In [10]:
from pyspark.sql.functions import col, when
import time 

start_time_rdd = time.time()

crime_rdd = crime_data.rdd
filtered_rdd = crime_rdd.filter(lambda row: "AGGRAVATED ASSAULT" in row["Crm Cd Desc"])

def get_age_group(age):
    if age < 18:
        return "Children"
    elif 18 <= age < 25:
        return "Young Adults"
    elif 25 <= age < 65:
        return "Adults"
    else:
        return "Seniors"

age_group_rdd = filtered_rdd.map(lambda x: (get_age_group(x['Vict Age']), 1))
age_group_count = age_group_rdd.reduceByKey(lambda a, b: a + b)

sorted_age_group_count = age_group_count.sortBy(lambda x: x[1], ascending=False)
for age_group, count in sorted_age_group_count.collect():
    print(age_group, "->", count)
end_time_rdd = time.time()
print("RDD API execution time: ", end_time_rdd - start_time_rdd)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Adults -> 121093
Young Adults -> 33605
Children -> 15928
Seniors -> 5985
RDD API execution time:  22.607872486114502

# Query 2

In [95]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, StructType, IntegerType, FloatType, StringType, BooleanType
from pyspark.sql.functions import col, udf, sum, max, min, avg, count, mean, when, monotonically_increasing_id, dense_rank, window
from pyspark.sql.window import Window

spark = SparkSession \
    .builder \
    .appName("Query 2: 3 Police Stations for each year with biggest rate of closed cases") \
    .getOrCreate() 


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [96]:
crimes_2010_19_df = spark.read.csv("s3://initial-notebook-data-bucket-dblab-905418150721/CrimeData/Crime_Data_from_2010_to_2019_20241101.csv", header=True)
crimes_2020_24_df = spark.read.csv("s3://initial-notebook-data-bucket-dblab-905418150721/CrimeData/Crime_Data_from_2020_to_Present_20241101.csv", header=True)
crimes_df = crimes_2010_19_df.union(crimes_2020_24_df)
crimes_df.printSchema()
print("Number of Rows (Crime DataFrame)")
crimes_df.count()
# print('Crime data')
# crimes_df.show(3)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- DR_NO: string (nullable = true)
 |-- Date Rptd: string (nullable = true)
 |-- DATE OCC: string (nullable = true)
 |-- TIME OCC: string (nullable = true)
 |-- AREA : string (nullable = true)
 |-- AREA NAME: string (nullable = true)
 |-- Rpt Dist No: string (nullable = true)
 |-- Part 1-2: string (nullable = true)
 |-- Crm Cd: string (nullable = true)
 |-- Crm Cd Desc: string (nullable = true)
 |-- Mocodes: string (nullable = true)
 |-- Vict Age: string (nullable = true)
 |-- Vict Sex: string (nullable = true)
 |-- Vict Descent: string (nullable = true)
 |-- Premis Cd: string (nullable = true)
 |-- Premis Desc: string (nullable = true)
 |-- Weapon Used Cd: string (nullable = true)
 |-- Weapon Desc: string (nullable = true)
 |-- Status: string (nullable = true)
 |-- Status Desc: string (nullable = true)
 |-- Crm Cd 1: string (nullable = true)
 |-- Crm Cd 2: string (nullable = true)
 |-- Crm Cd 3: string (nullable = true)
 |-- Crm Cd 4: string (nullable = true)
 |-- LOCATION: str

In [97]:
# UDF - User Defined Functions definitions

def extract_year(date_occ: str) -> str:
    '''returns year from DATE OCC column'''
    return date_occ.split("/")[2].split(" ")[0]

def is_closed_case(case: str) -> int:
    '''returns 1 if an incident is a closed case in police department based on Status Desc else returns 0'''
    return 0 if (case=='Invest Cont' or case=='UNK') else 1

def percentage(closed: int, total: int) -> float:
    return (closed/total)*100

# print(is_closed_case('c'))
# print(extract_year("01/01/2010 12:00:..."))
extract_year_udf = udf(extract_year, StringType())
is_closed_case_udf = udf(is_closed_case, IntegerType())
percentage_udf = udf(percentage, FloatType())

# register functions for SQL
spark.udf.register("extract_year", extract_year)
spark.udf.register("is_closed_case", is_closed_case)
spark.udf.register("percentage", percentage)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<function percentage at 0x7f35087f01f0>

In [5]:
# s3://groups-bucket-dblab-905418150721/group46/query2/
# s3://groups-bucket-dblab-905418150721/group46/query2-single-parquet/
# create the output directory in your S3 bucket
group_number = "46"
s3_path = "s3://groups-bucket-dblab-905418150721/group"+group_number+"/query2-single-parquet/"
s3_path_multiple_parquet = "s3://groups-bucket-dblab-905418150721/group"+group_number+"/query2/"

# Repartition the DataFrame to a single partition so that it will possible be written in one parquet file
single_partition_df = crimes_df.repartition(1) 
single_partition_df.write.mode("overwrite").parquet(s3_path)

# else write parquet simply - it may be stored in multiple files
crimes_df.write.mode("overwrite").parquet(s3_path_multiple_parquet)

# crimes_df_from_parquet = spark.read.parquet(s3_path)
# crimes_df_from_parquet.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Query 2 - DataFrame API

In [98]:
@measure_time
def query2_dataframe(df, debug = False):
    '''Returns the DF that is result of the query 2 using DataFrame API'''

    modified_df = df.select("DR_NO", "DATE OCC","AREA NAME", "Status Desc") \
        .withColumn("year", extract_year_udf(col("DATE OCC"))) \
        .withColumn("precinct", col("AREA NAME")) \
        .withColumn("is_closed_case", is_closed_case_udf(col("Status Desc")))
    if debug: modified_df.show(3)

    grouped_df = modified_df.groupBy("year", "precinct") \
        .agg( \
             count("*").alias("total_cases"), \
             sum("is_closed_case").alias("closed_cases"), \
             percentage_udf(col("closed_cases"), col("total_cases")).alias("closed_case_rate") \
            )
    if debug: grouped_df.show(3)

    # Define a window and make partitions by year in order to assign specific rank values later to the rows
    windowSpec = Window.partitionBy("year").orderBy(col("closed_case_rate").desc())
    ranked_df = grouped_df.withColumn("#", dense_rank().over(windowSpec))
    if debug: ranked_df.show(3)

    # Project specific columns, Select Top 3 for each year and sort in ascending order for year and rank (#)
    result = ranked_df.select("year", "precinct","closed_case_rate", "#") \
        .filter(col("#") <= 3) \
        .orderBy(["year", "#"], ascending=[True,True])

    result.show(50)



FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Comparison of query execution time with input from CSV or Parquet

In [7]:
@measure_time
def query2_csv(execute_query = True):
    crimes_2010_19_df = spark.read.csv("s3://initial-notebook-data-bucket-dblab-905418150721/CrimeData/Crime_Data_from_2010_to_2019_20241101.csv", header=True)
    crimes_2020_24_df = spark.read.csv("s3://initial-notebook-data-bucket-dblab-905418150721/CrimeData/Crime_Data_from_2020_to_Present_20241101.csv", header=True)
    crimes_df = crimes_2010_19_df.union(crimes_2020_24_df)
    # crimes_df.show(3)
    if execute_query: query2_dataframe(crimes_df)

@measure_time
def query2_parquet(path: str, execute_query = True):
    crimes_df_from_parquet = spark.read.parquet(path)
    # crimes_df_from_parquet.show(3)
    if execute_query: query2_dataframe(crimes_df_from_parquet)


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [8]:
# Call the function that loads DF from CSV and then executes the query 
query2_csv()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----+-----------+----------------+---+
|year|   precinct|closed_case_rate|  #|
+----+-----------+----------------+---+
|2010|    Rampart|       32.847134|  1|
|2010|    Olympic|        31.51529|  2|
|2010|     Harbor|       29.360283|  3|
|2011|    Olympic|       35.040062|  1|
|2011|    Rampart|        32.49645|  2|
|2011|     Harbor|       28.513363|  3|
|2012|    Olympic|       34.297085|  1|
|2012|    Rampart|       32.460003|  2|
|2012|     Harbor|       29.509586|  3|
|2013|    Olympic|        33.58218|  1|
|2013|    Rampart|       32.106037|  2|
|2013|     Harbor|       29.723639|  3|
|2014|   Van Nuys|       32.021523|  1|
|2014|West Valley|       31.497547|  2|
|2014|    Mission|        31.22494|  3|
|2015|   Van Nuys|        32.26514|  1|
|2015|    Mission|       30.463762|  2|
|2015|   Foothill|       30.353003|  3|
|2016|   Van Nuys|        32.19452|  1|
|2016|West Valley|       31.401464|  2|
|2016|   Foothill|       29.908648|  3|
|2017|   Van Nuys|       32.055428|  1|


In [8]:
# Call the function that loads DF from the single parquet (1 partition) and then executes the query 
query2_parquet(path = s3_path)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----+-----------+----------------+---+
|year|   precinct|closed_case_rate|  #|
+----+-----------+----------------+---+
|2010|    Rampart|       32.847134|  1|
|2010|    Olympic|        31.51529|  2|
|2010|     Harbor|       29.360283|  3|
|2011|    Olympic|       35.040062|  1|
|2011|    Rampart|        32.49645|  2|
|2011|     Harbor|       28.513363|  3|
|2012|    Olympic|       34.297085|  1|
|2012|    Rampart|       32.460003|  2|
|2012|     Harbor|       29.509586|  3|
|2013|    Olympic|        33.58218|  1|
|2013|    Rampart|       32.106037|  2|
|2013|     Harbor|       29.723639|  3|
|2014|   Van Nuys|       32.021523|  1|
|2014|West Valley|       31.497547|  2|
|2014|    Mission|        31.22494|  3|
|2015|   Van Nuys|        32.26514|  1|
|2015|    Mission|       30.463762|  2|
|2015|   Foothill|       30.353003|  3|
|2016|   Van Nuys|        32.19452|  1|
|2016|West Valley|       31.401464|  2|
|2016|   Foothill|       29.908648|  3|
|2017|   Van Nuys|       32.055428|  1|


In [8]:
# Call the function that loads DF from multiple parquet files and then executes the query 
query2_parquet(path = s3_path_multiple_parquet)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----+-----------+----------------+---+
|year|   precinct|closed_case_rate|  #|
+----+-----------+----------------+---+
|2010|    Rampart|       32.847134|  1|
|2010|    Olympic|        31.51529|  2|
|2010|     Harbor|       29.360283|  3|
|2011|    Olympic|       35.040062|  1|
|2011|    Rampart|        32.49645|  2|
|2011|     Harbor|       28.513363|  3|
|2012|    Olympic|       34.297085|  1|
|2012|    Rampart|       32.460003|  2|
|2012|     Harbor|       29.509586|  3|
|2013|    Olympic|        33.58218|  1|
|2013|    Rampart|       32.106037|  2|
|2013|     Harbor|       29.723639|  3|
|2014|   Van Nuys|       32.021523|  1|
|2014|West Valley|       31.497547|  2|
|2014|    Mission|        31.22494|  3|
|2015|   Van Nuys|        32.26514|  1|
|2015|    Mission|       30.463762|  2|
|2015|   Foothill|       30.353003|  3|
|2016|   Van Nuys|        32.19452|  1|
|2016|West Valley|       31.401464|  2|
|2016|   Foothill|       29.908648|  3|
|2017|   Van Nuys|       32.055428|  1|


## Query 2 - SQL API

In [99]:
@measure_time
def query2_sql(df, debug = False):
    '''Returns the DF that is result of the query 2 using SQL API'''

    df.createOrReplaceTempView("crimes")

    query = """
        SELECT `DR_NO`, `DATE OCC`, `AREA NAME`, `Status Desc`, extract_year(`DATE OCC`) as year, `AREA NAME` as precinct, is_closed_case(`Status Desc`) as is_closed_case 
        FROM crimes
    """

    modified_crimes = spark.sql(query)
    if debug: modified_crimes.show(3)
    modified_crimes.createOrReplaceTempView("modified_crimes")

    query = """
        SELECT year, precinct, count(*) as total_cases, CAST(sum(is_closed_case) AS INT) as closed_cases, CAST(percentage(`closed_cases`,`total_cases`) AS DECIMAL(10,6)) as closed_case_rate
        FROM modified_crimes
        GROUP BY year, precinct
    """

    grouped_crimes = spark.sql(query)
    if debug: grouped_crimes.show(3)
    grouped_crimes.createOrReplaceTempView("grouped_crimes")

    query = """
        SELECT year, precinct,  total_cases, closed_cases, closed_case_rate, DENSE_RANK() OVER(PARTITION BY year ORDER BY closed_case_rate DESC) as `#`
        FROM grouped_crimes
        ORDER BY year, closed_case_rate DESC
    """

    ranked_crimes = spark.sql(query)
    if debug: ranked_crimes.show(3)
    ranked_crimes.createOrReplaceTempView("ranked_crimes")

    query = """
        SELECT year, precinct, closed_case_rate, `#`
        FROM ranked_crimes
        WHERE `#` <= 3
        ORDER BY year, `#`
    """

    result = spark.sql(query)
    result.show(50)
    result.createOrReplaceTempView("result")


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Comparison of query execution time using DataFrame or SQL API

In [100]:
# Execute the Query 2 with the DF API
query2_dataframe(crimes_df)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----+-----------+----------------+---+
|year|   precinct|closed_case_rate|  #|
+----+-----------+----------------+---+
|2010|    Rampart|       32.847134|  1|
|2010|    Olympic|        31.51529|  2|
|2010|     Harbor|       29.360283|  3|
|2011|    Olympic|       35.040062|  1|
|2011|    Rampart|        32.49645|  2|
|2011|     Harbor|       28.513363|  3|
|2012|    Olympic|       34.297085|  1|
|2012|    Rampart|       32.460003|  2|
|2012|     Harbor|       29.509586|  3|
|2013|    Olympic|        33.58218|  1|
|2013|    Rampart|       32.106037|  2|
|2013|     Harbor|       29.723639|  3|
|2014|   Van Nuys|       32.021523|  1|
|2014|West Valley|       31.497547|  2|
|2014|    Mission|        31.22494|  3|
|2015|   Van Nuys|        32.26514|  1|
|2015|    Mission|       30.463762|  2|
|2015|   Foothill|       30.353003|  3|
|2016|   Van Nuys|        32.19452|  1|
|2016|West Valley|       31.401464|  2|
|2016|   Foothill|       29.908648|  3|
|2017|   Van Nuys|       32.055428|  1|


In [92]:
# Execute Query 2 with the SQL API
query2_sql(crimes_df)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----+-----------+----------------+---+
|year|   precinct|closed_case_rate|  #|
+----+-----------+----------------+---+
|2010|    Rampart|       32.847134|  1|
|2010|    Olympic|       31.515290|  2|
|2010|     Harbor|       29.360283|  3|
|2011|    Olympic|       35.040060|  1|
|2011|    Rampart|       32.496447|  2|
|2011|     Harbor|       28.513362|  3|
|2012|    Olympic|       34.297085|  1|
|2012|    Rampart|       32.460005|  2|
|2012|     Harbor|       29.509586|  3|
|2013|    Olympic|       33.582179|  1|
|2013|    Rampart|       32.106038|  2|
|2013|     Harbor|       29.723639|  3|
|2014|   Van Nuys|       32.021524|  1|
|2014|West Valley|       31.497548|  2|
|2014|    Mission|       31.224940|  3|
|2015|   Van Nuys|       32.265141|  1|
|2015|    Mission|       30.463763|  2|
|2015|   Foothill|       30.353002|  3|
|2016|   Van Nuys|       32.194518|  1|
|2016|West Valley|       31.401464|  2|
|2016|   Foothill|       29.908647|  3|
|2017|   Van Nuys|       32.055427|  1|


# Query 3

In [3]:
import time
from sedona.spark import *
from pyspark.sql.functions import col, sum as spark_sum, avg, regexp_replace, count
from pyspark.sql import SparkSession

# Create spark Session
spark = SparkSession.builder \
    .appName("Query 3") \
    .config("spark.executor.instances", "4") \
    .config("spark.executor.memory", "2g") \
    .config("spark.executor.cores", "2") \
    .getOrCreate()


# Create sedona context
sedona = SedonaContext.create(spark)
# Read the file from s3
geojson_path = "s3://initial-notebook-data-bucket-dblab-905418150721/2010_Census_Blocks.geojson"
blocks_df = sedona.read.format("geojson") \
            .option("multiLine", "true").load(geojson_path) \
            .selectExpr("explode(features) as features") \
            .select("features.*")

flat_census_data = blocks_df.select( \
                [col(f"properties.{col_name}").alias(col_name) for col_name in \
                blocks_df.schema["properties"].dataType.fieldNames()] + ["geometry"]) \
            .drop("properties") \
            .drop("type")

LA_areas = flat_census_data.filter(col("CITY") == "Los Angeles")

# Read csvs
crime_data = spark.read.csv("s3://initial-notebook-data-bucket-dblab-905418150721/CrimeData/Crime_Data_from_2010_to_2019_20241101.csv", header=True, inferSchema=True)
census_data = spark.read.json("s3://initial-notebook-data-bucket-dblab-905418150721/2010_Census_Blocks.geojson")
income_data = spark.read.csv("s3://initial-notebook-data-bucket-dblab-905418150721/LA_income_2015.csv", header=True, inferSchema=True)

# Start timing
start_time = time.time()
# Join
joined_df = LA_areas.join(income_data, LA_areas["ZCTA10"] == income_data["Zip Code"], "inner")
joined_df.count()
end_time = time.time()
elapsed_time = end_time - start_time
print(f"1st join - optimizer: {elapsed_time:.2f} seconds")

#s3_path = "s3://groups-bucket-dblab-905418150721/group46/query3/join.parquet"
#result = [("broadcast", execution_time)]
#result_df = spark.createDataFrame(result, ["Join Strategy", "Execution Time (s)"])
#result_df.write.mode("append").parquet(s3_path)
#joined_df.explain(mode="formatted")

joined_df = joined_df.withColumn(
    "Estimated Median Income",
    regexp_replace(col("Estimated Median Income"), "[\\$,]", ""))
    
joined_df = joined_df.withColumn(
    "Estimated Median Income",
    col("Estimated Median Income").cast("double")
)

LA_areas = joined_df.groupBy("COMM").agg(
                spark_sum("POP_2010").alias("total_population"),
                spark_sum("HOUSING10").alias("total_housing"),
                avg("Estimated Median Income").alias("average_income_per_house"),
                ST_Union_Aggr("geometry").alias("geometry"))

# Compute median income per person
result_df = LA_areas.withColumn(
    "median_income_per_person",
    (col("total_housing") * col("average_income_per_house")) / col("total_population")
)

crime_data = crime_data.withColumn("geometry", ST_Point("LON", "LAT"))
start_time = time.time()
joined_crimes_df = crime_data \
    .join(result_df, ST_Within(crime_data.geometry, result_df.geometry), "inner")
joined_crimes_df.count()
end_time = time.time()
elapsed_time = end_time - start_time
print(f"2nd join - optimizer: {elapsed_time:.2f} seconds")
# Join no.2
#joined_crimes_df.explain(mode="formatted")

crime_counts = joined_crimes_df.groupBy("COMM").agg(
    count("*").alias("crime_count")
)

# Join no.3
start_time = time.time()
joined_LA = result_df.join(crime_counts, "COMM", "inner")
joined_LA.count()
end_time = time.time()
elapsed_time = end_time - start_time
print(f"3rd join - optimizer: {elapsed_time:.2f} seconds")

final_LA_df = joined_LA.withColumn(
    "crimes_per_person",
    (col("crime_count") / col("total_population")
))
final_LA_df.select("COMM", "median_income_per_person", "crimes_per_person").orderBy(col("median_income_per_person").desc()).show()

# Stop timing and print out the execution duration

#elapsed_time = end_time - start_time
#print(f"Time taken: {elapsed_time:.2f} seconds")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

1st join - optimizer: 10.63 seconds
2nd join - optimizer: 25.65 seconds
3rd join - optimizer: 24.33 seconds
+-------------------+------------------------+-------------------+
|               COMM|median_income_per_person|  crimes_per_person|
+-------------------+------------------------+-------------------+
|  Pacific Palisades|        70526.2203104497|0.31066220995010413|
|      Beverly Crest|       66513.90150799365| 0.2667541629070626|
|   Marina Peninsula|       65235.69402813004|0.40050726308508183|
|Palisades Highlands|       65048.95354904471|0.12914166449256456|
|            Bel Air|       63259.97685510228| 0.2881007141992495|
|  Mandeville Canyon|       61443.86522911051|  0.193628209093721|
|          Brentwood|      60696.777650004915|0.34619978840312615|
|            Carthay|      50282.692104378286| 0.5788834029624003|
|             Venice|       46575.69192582585| 0.8405823754789272|
|       Century City|       45707.53601562712| 0.5460891505466778|
|      Playa Del Rey|

In [1]:
### Trying different join strategies

import time
from sedona.spark import *
from pyspark.sql.functions import col, sum as spark_sum, avg, regexp_replace, count
from pyspark.sql import SparkSession

# Create spark Session
spark = SparkSession.builder \
    .appName("Query 3") \
    .config("spark.executor.instances", "4") \
    .config("spark.executor.memory", "2g") \
    .config("spark.executor.cores", "2") \
    .getOrCreate()



# Create sedona context
sedona = SedonaContext.create(spark)
# Read the file from s3
geojson_path = "s3://initial-notebook-data-bucket-dblab-905418150721/2010_Census_Blocks.geojson"
blocks_df = sedona.read.format("geojson") \
            .option("multiLine", "true").load(geojson_path) \
            .selectExpr("explode(features) as features") \
            .select("features.*")

flat_census_data = blocks_df.select( \
                [col(f"properties.{col_name}").alias(col_name) for col_name in \
                blocks_df.schema["properties"].dataType.fieldNames()] + ["geometry"]) \
            .drop("properties") \
            .drop("type")

LA_areas = flat_census_data.filter(col("CITY") == "Los Angeles")

# Read csvs
crime_data = spark.read.csv("s3://initial-notebook-data-bucket-dblab-905418150721/CrimeData/Crime_Data_from_2010_to_2019_20241101.csv", header=True, inferSchema=True)
census_data = spark.read.json("s3://initial-notebook-data-bucket-dblab-905418150721/2010_Census_Blocks.geojson")
income_data = spark.read.csv("s3://initial-notebook-data-bucket-dblab-905418150721/LA_income_2015.csv", header=True, inferSchema=True)

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
173,application_1738075734771_0174,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [2]:
### First Join

start_time = time.time()
joined_df = LA_areas.join(income_data, LA_areas["ZCTA10"] == income_data["Zip Code"], "inner")
joined_df.count()
end_time = time.time()
elapsed_time = end_time - start_time
joined_df.explain(mode="formatted")
print(f"1st join: {elapsed_time:.2f} seconds")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

== Physical Plan ==
AdaptiveSparkPlan (10)
+- BroadcastHashJoin Inner BuildRight (9)
   :- Project (5)
   :  +- Filter (4)
   :     +- Generate (3)
   :        +- Filter (2)
   :           +- Scan geojson  (1)
   +- BroadcastExchange (8)
      +- Filter (7)
         +- Scan csv  (6)


(1) Scan geojson 
Output [1]: [features#25]
Batched: false
Location: InMemoryFileIndex [s3://initial-notebook-data-bucket-dblab-905418150721/2010_Census_Blocks.geojson]
PushedFilters: [IsNotNull(features)]
ReadSchema: struct<features:array<struct<geometry:binary,properties:struct<BG10:string,BG10FIP10:string,BG12:string,CB10:string,CEN_FIP13:string,CITY:string,CITYCOM:string,COMM:string,CT10:string,CT12:string,CTCB10:string,HD_2012:bigint,HD_NAME:string,HOUSING10:bigint,LA_FIP10:string,OBJECTID:bigint,POP_2010:bigint,PUMA10:string,SPA_2012:bigint,SPA_NAME:string,SUP_DIST:string,SUP_LABEL:string,ShapeSTArea:double,ShapeSTLength:double,ZCTA10:string>,type:string>>>

(2) Filter
Input [1]: [features#25]
Condi

In [2]:
### First Join

start_time = time.time()
joined_df = LA_areas.join(income_data.hint("MERGE"), LA_areas["ZCTA10"] == income_data["Zip Code"], "inner")
joined_df.count()
end_time = time.time()
elapsed_time = end_time - start_time
joined_df.explain(mode="formatted")
print(f"1st join: {elapsed_time:.2f} seconds")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

== Physical Plan ==
AdaptiveSparkPlan (13)
+- SortMergeJoin Inner (12)
   :- Sort (7)
   :  +- Exchange (6)
   :     +- Project (5)
   :        +- Filter (4)
   :           +- Generate (3)
   :              +- Filter (2)
   :                 +- Scan geojson  (1)
   +- Sort (11)
      +- Exchange (10)
         +- Filter (9)
            +- Scan csv  (8)


(1) Scan geojson 
Output [1]: [features#25]
Batched: false
Location: InMemoryFileIndex [s3://initial-notebook-data-bucket-dblab-905418150721/2010_Census_Blocks.geojson]
PushedFilters: [IsNotNull(features)]
ReadSchema: struct<features:array<struct<geometry:binary,properties:struct<BG10:string,BG10FIP10:string,BG12:string,CB10:string,CEN_FIP13:string,CITY:string,CITYCOM:string,COMM:string,CT10:string,CT12:string,CTCB10:string,HD_2012:bigint,HD_NAME:string,HOUSING10:bigint,LA_FIP10:string,OBJECTID:bigint,POP_2010:bigint,PUMA10:string,SPA_2012:bigint,SPA_NAME:string,SUP_DIST:string,SUP_LABEL:string,ShapeSTArea:double,ShapeSTLength:double,ZC

In [2]:
### First Join

start_time = time.time()
joined_df = LA_areas.join(income_data.hint("SHUFFLE_REPLICATE_NL"), LA_areas["ZCTA10"] == income_data["Zip Code"], "inner")
joined_df.count()
end_time = time.time()
elapsed_time = end_time - start_time
joined_df.explain(mode="formatted")
print(f"1st join: {elapsed_time:.2f} seconds")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

== Physical Plan ==
CartesianProduct Inner (8)
:- * Project (5)
:  +- * Filter (4)
:     +- * Generate (3)
:        +- * Filter (2)
:           +- Scan geojson  (1)
+- * Filter (7)
   +- Scan csv  (6)


(1) Scan geojson 
Output [1]: [features#25]
Batched: false
Location: InMemoryFileIndex [s3://initial-notebook-data-bucket-dblab-905418150721/2010_Census_Blocks.geojson]
PushedFilters: [IsNotNull(features)]
ReadSchema: struct<features:array<struct<geometry:binary,properties:struct<BG10:string,BG10FIP10:string,BG12:string,CB10:string,CEN_FIP13:string,CITY:string,CITYCOM:string,COMM:string,CT10:string,CT12:string,CTCB10:string,HD_2012:bigint,HD_NAME:string,HOUSING10:bigint,LA_FIP10:string,OBJECTID:bigint,POP_2010:bigint,PUMA10:string,SPA_2012:bigint,SPA_NAME:string,SUP_DIST:string,SUP_LABEL:string,ShapeSTArea:double,ShapeSTLength:double,ZCTA10:string>,type:string>>>

(2) Filter [codegen id : 1]
Input [1]: [features#25]
Condition : ((size(features#25, true) > 0) AND isnotnull(features#25))


In [2]:
### First Join

start_time = time.time()
joined_df = LA_areas.join(income_data.hint("SHUFFLE_HASH"), LA_areas["ZCTA10"] == income_data["Zip Code"], "inner")
joined_df.count()
end_time = time.time()
elapsed_time = end_time - start_time
joined_df.explain(mode="formatted")
print(f"1st join: {elapsed_time:.2f} seconds")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

== Physical Plan ==
AdaptiveSparkPlan (11)
+- ShuffledHashJoin Inner BuildRight (10)
   :- Exchange (6)
   :  +- Project (5)
   :     +- Filter (4)
   :        +- Generate (3)
   :           +- Filter (2)
   :              +- Scan geojson  (1)
   +- Exchange (9)
      +- Filter (8)
         +- Scan csv  (7)


(1) Scan geojson 
Output [1]: [features#25]
Batched: false
Location: InMemoryFileIndex [s3://initial-notebook-data-bucket-dblab-905418150721/2010_Census_Blocks.geojson]
PushedFilters: [IsNotNull(features)]
ReadSchema: struct<features:array<struct<geometry:binary,properties:struct<BG10:string,BG10FIP10:string,BG12:string,CB10:string,CEN_FIP13:string,CITY:string,CITYCOM:string,COMM:string,CT10:string,CT12:string,CTCB10:string,HD_2012:bigint,HD_NAME:string,HOUSING10:bigint,LA_FIP10:string,OBJECTID:bigint,POP_2010:bigint,PUMA10:string,SPA_2012:bigint,SPA_NAME:string,SUP_DIST:string,SUP_LABEL:string,ShapeSTArea:double,ShapeSTLength:double,ZCTA10:string>,type:string>>>

(2) Filter
Input

In [3]:
joined_df = joined_df.withColumn(
    "Estimated Median Income",
    regexp_replace(col("Estimated Median Income"), "[\\$,]", ""))
    
joined_df = joined_df.withColumn(
    "Estimated Median Income",
    col("Estimated Median Income").cast("double")
)

LA_areas = joined_df.groupBy("COMM").agg(
                spark_sum("POP_2010").alias("total_population"),
                spark_sum("HOUSING10").alias("total_housing"),
                avg("Estimated Median Income").alias("average_income_per_house"),
                ST_Union_Aggr("geometry").alias("geometry"))

# Compute median income per person
result_df = LA_areas.withColumn(
    "median_income_per_person",
    (col("total_housing") * col("average_income_per_house")) / col("total_population")
)

crime_data = crime_data.withColumn("geometry", ST_Point("LON", "LAT"))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [4]:
### Second Join
start_time = time.time()
joined_crimes_df = crime_data \
    .join(result_df, ST_Within(crime_data.geometry, result_df.geometry), "inner")
joined_crimes_df.count()
end_time = time.time()
elapsed_time = end_time - start_time
joined_crimes_df.explain(mode="formatted")
print(f"2nd join: {elapsed_time:.2f} seconds")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

== Physical Plan ==
AdaptiveSparkPlan (20)
+- RangeJoin (19)
   :- Project (3)
   :  +- Filter (2)
   :     +- Scan csv  (1)
   +- Project (18)
      +- Filter (17)
         +- ObjectHashAggregate (16)
            +- Exchange (15)
               +- ObjectHashAggregate (14)
                  +- Project (13)
                     +- BroadcastHashJoin Inner BuildRight (12)
                        :- Project (8)
                        :  +- Filter (7)
                        :     +- Generate (6)
                        :        +- Filter (5)
                        :           +- Scan geojson  (4)
                        +- BroadcastExchange (11)
                           +- Filter (10)
                              +- Scan csv  (9)


(1) Scan csv 
Output [28]: [DR_NO#188, Date Rptd#189, DATE OCC#190, TIME OCC#191, AREA #192, AREA NAME#193, Rpt Dist No#194, Part 1-2#195, Crm Cd#196, Crm Cd Desc#197, Mocodes#198, Vict Age#199, Vict Sex#200, Vict Descent#201, Premis Cd#202, Premis Desc#203

In [4]:
### Second Join
start_time = time.time()
joined_crimes_df = crime_data \
    .join(result_df.hint("BROADCAST"), ST_Within(crime_data.geometry, result_df.geometry), "inner")
joined_crimes_df.count()
end_time = time.time()
elapsed_time = end_time - start_time
joined_crimes_df.explain(mode="formatted")
print(f"2nd join: {elapsed_time:.2f} seconds")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

== Physical Plan ==
AdaptiveSparkPlan (21)
+- BroadcastIndexJoin (20)
   :- Project (3)
   :  +- Filter (2)
   :     +- Scan csv  (1)
   +- SpatialIndex (19)
      +- Project (18)
         +- Filter (17)
            +- ObjectHashAggregate (16)
               +- Exchange (15)
                  +- ObjectHashAggregate (14)
                     +- Project (13)
                        +- BroadcastHashJoin Inner BuildRight (12)
                           :- Project (8)
                           :  +- Filter (7)
                           :     +- Generate (6)
                           :        +- Filter (5)
                           :           +- Scan geojson  (4)
                           +- BroadcastExchange (11)
                              +- Filter (10)
                                 +- Scan csv  (9)


(1) Scan csv 
Output [28]: [DR_NO#188, Date Rptd#189, DATE OCC#190, TIME OCC#191, AREA #192, AREA NAME#193, Rpt Dist No#194, Part 1-2#195, Crm Cd#196, Crm Cd Desc#197, Mocodes#198

In [4]:
### Second Join
start_time = time.time()
joined_crimes_df = crime_data \
    .join(result_df.hint("SHUFFLE_HASH"), ST_Within(crime_data.geometry, result_df.geometry), "inner")
joined_crimes_df.count()
end_time = time.time()
elapsed_time = end_time - start_time
joined_crimes_df.explain(mode="formatted")
print(f"2nd join: {elapsed_time:.2f} seconds")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

== Physical Plan ==
AdaptiveSparkPlan (20)
+- RangeJoin (19)
   :- Project (3)
   :  +- Filter (2)
   :     +- Scan csv  (1)
   +- Project (18)
      +- Filter (17)
         +- ObjectHashAggregate (16)
            +- Exchange (15)
               +- ObjectHashAggregate (14)
                  +- Project (13)
                     +- BroadcastHashJoin Inner BuildRight (12)
                        :- Project (8)
                        :  +- Filter (7)
                        :     +- Generate (6)
                        :        +- Filter (5)
                        :           +- Scan geojson  (4)
                        +- BroadcastExchange (11)
                           +- Filter (10)
                              +- Scan csv  (9)


(1) Scan csv 
Output [28]: [DR_NO#188, Date Rptd#189, DATE OCC#190, TIME OCC#191, AREA #192, AREA NAME#193, Rpt Dist No#194, Part 1-2#195, Crm Cd#196, Crm Cd Desc#197, Mocodes#198, Vict Age#199, Vict Sex#200, Vict Descent#201, Premis Cd#202, Premis Desc#203

In [4]:
### Second Join
start_time = time.time()
joined_crimes_df = crime_data \
    .join(result_df.hint("SHUFFLE_REPLICATE_NL"), ST_Within(crime_data.geometry, result_df.geometry), "inner")
joined_crimes_df.count()
end_time = time.time()
elapsed_time = end_time - start_time
joined_crimes_df.explain(mode="formatted")
print(f"2nd join: {elapsed_time:.2f} seconds")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

== Physical Plan ==
AdaptiveSparkPlan (20)
+- RangeJoin (19)
   :- Project (3)
   :  +- Filter (2)
   :     +- Scan csv  (1)
   +- Project (18)
      +- Filter (17)
         +- ObjectHashAggregate (16)
            +- Exchange (15)
               +- ObjectHashAggregate (14)
                  +- Project (13)
                     +- BroadcastHashJoin Inner BuildRight (12)
                        :- Project (8)
                        :  +- Filter (7)
                        :     +- Generate (6)
                        :        +- Filter (5)
                        :           +- Scan geojson  (4)
                        +- BroadcastExchange (11)
                           +- Filter (10)
                              +- Scan csv  (9)


(1) Scan csv 
Output [28]: [DR_NO#188, Date Rptd#189, DATE OCC#190, TIME OCC#191, AREA #192, AREA NAME#193, Rpt Dist No#194, Part 1-2#195, Crm Cd#196, Crm Cd Desc#197, Mocodes#198, Vict Age#199, Vict Sex#200, Vict Descent#201, Premis Cd#202, Premis Desc#203

In [4]:
### Second Join
start_time = time.time()
joined_crimes_df = crime_data \
    .join(result_df.hint("MERGE"), ST_Within(crime_data.geometry, result_df.geometry), "inner")
joined_crimes_df.count()
end_time = time.time()
elapsed_time = end_time - start_time
joined_crimes_df.explain(mode="formatted")
print(f"2nd join: {elapsed_time:.2f} seconds")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

== Physical Plan ==
AdaptiveSparkPlan (20)
+- RangeJoin (19)
   :- Project (3)
   :  +- Filter (2)
   :     +- Scan csv  (1)
   +- Project (18)
      +- Filter (17)
         +- ObjectHashAggregate (16)
            +- Exchange (15)
               +- ObjectHashAggregate (14)
                  +- Project (13)
                     +- BroadcastHashJoin Inner BuildRight (12)
                        :- Project (8)
                        :  +- Filter (7)
                        :     +- Generate (6)
                        :        +- Filter (5)
                        :           +- Scan geojson  (4)
                        +- BroadcastExchange (11)
                           +- Filter (10)
                              +- Scan csv  (9)


(1) Scan csv 
Output [28]: [DR_NO#188, Date Rptd#189, DATE OCC#190, TIME OCC#191, AREA #192, AREA NAME#193, Rpt Dist No#194, Part 1-2#195, Crm Cd#196, Crm Cd Desc#197, Mocodes#198, Vict Age#199, Vict Sex#200, Vict Descent#201, Premis Cd#202, Premis Desc#203

In [5]:
crime_counts = joined_crimes_df.groupBy("COMM").agg(
    count("*").alias("crime_count")
)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [6]:
### Third Join

start_time = time.time()
joined_LA = result_df.join(crime_counts, "COMM", "inner")
joined_LA.count()
end_time = time.time()
elapsed_time = end_time - start_time
joined_LA.explain(mode="formatted")
print(f"3rd join - optimizer: {elapsed_time:.2f} seconds")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

== Physical Plan ==
AdaptiveSparkPlan (42)
+- Project (41)
   +- SortMergeJoin Inner (40)
      :- Sort (15)
      :  +- Project (14)
      :     +- ObjectHashAggregate (13)
      :        +- Exchange (12)
      :           +- ObjectHashAggregate (11)
      :              +- Project (10)
      :                 +- BroadcastHashJoin Inner BuildRight (9)
      :                    :- Project (5)
      :                    :  +- Filter (4)
      :                    :     +- Generate (3)
      :                    :        +- Filter (2)
      :                    :           +- Scan geojson  (1)
      :                    +- BroadcastExchange (8)
      :                       +- Filter (7)
      :                          +- Scan csv  (6)
      +- Sort (39)
         +- HashAggregate (38)
            +- Exchange (37)
               +- HashAggregate (36)
                  +- Project (35)
                     +- RangeJoin (34)
                        :- Project (18)
                        :

In [6]:
### Third Join

start_time = time.time()
joined_LA = result_df.join(crime_counts.hint("SHUFFLE_REPLICATE_NL"), "COMM", "inner")
joined_LA.count()
end_time = time.time()
elapsed_time = end_time - start_time
joined_LA.explain(mode="formatted")
print(f"3rd join - optimizer: {elapsed_time:.2f} seconds")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

== Physical Plan ==
AdaptiveSparkPlan (40)
+- Project (39)
   +- CartesianProduct Inner (38)
      :- Project (14)
      :  +- ObjectHashAggregate (13)
      :     +- Exchange (12)
      :        +- ObjectHashAggregate (11)
      :           +- Project (10)
      :              +- BroadcastHashJoin Inner BuildRight (9)
      :                 :- Project (5)
      :                 :  +- Filter (4)
      :                 :     +- Generate (3)
      :                 :        +- Filter (2)
      :                 :           +- Scan geojson  (1)
      :                 +- BroadcastExchange (8)
      :                    +- Filter (7)
      :                       +- Scan csv  (6)
      +- HashAggregate (37)
         +- Exchange (36)
            +- HashAggregate (35)
               +- Project (34)
                  +- RangeJoin (33)
                     :- Project (17)
                     :  +- Filter (16)
                     :     +- Scan csv  (15)
                     +- Filter (32)


In [6]:
### Third Join

start_time = time.time()
joined_LA = result_df.join(crime_counts.hint("SHUFFLE_HASH"), "COMM", "inner")
joined_LA.count()
end_time = time.time()
elapsed_time = end_time - start_time
joined_LA.explain(mode="formatted")
print(f"3rd join - optimizer: {elapsed_time:.2f} seconds")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

== Physical Plan ==
AdaptiveSparkPlan (40)
+- Project (39)
   +- ShuffledHashJoin Inner BuildRight (38)
      :- Project (14)
      :  +- ObjectHashAggregate (13)
      :     +- Exchange (12)
      :        +- ObjectHashAggregate (11)
      :           +- Project (10)
      :              +- BroadcastHashJoin Inner BuildRight (9)
      :                 :- Project (5)
      :                 :  +- Filter (4)
      :                 :     +- Generate (3)
      :                 :        +- Filter (2)
      :                 :           +- Scan geojson  (1)
      :                 +- BroadcastExchange (8)
      :                    +- Filter (7)
      :                       +- Scan csv  (6)
      +- HashAggregate (37)
         +- Exchange (36)
            +- HashAggregate (35)
               +- Project (34)
                  +- RangeJoin (33)
                     :- Project (17)
                     :  +- Filter (16)
                     :     +- Scan csv  (15)
                     +- F

In [6]:
### Third Join

start_time = time.time()
joined_LA = result_df.join(crime_counts.hint("BROADCAST"), "COMM", "inner")
joined_LA.count()
end_time = time.time()
elapsed_time = end_time - start_time
joined_LA.explain(mode="formatted")
print(f"3rd join - optimizer: {elapsed_time:.2f} seconds")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

== Physical Plan ==
AdaptiveSparkPlan (41)
+- Project (40)
   +- BroadcastHashJoin Inner BuildRight (39)
      :- Project (14)
      :  +- ObjectHashAggregate (13)
      :     +- Exchange (12)
      :        +- ObjectHashAggregate (11)
      :           +- Project (10)
      :              +- BroadcastHashJoin Inner BuildRight (9)
      :                 :- Project (5)
      :                 :  +- Filter (4)
      :                 :     +- Generate (3)
      :                 :        +- Filter (2)
      :                 :           +- Scan geojson  (1)
      :                 +- BroadcastExchange (8)
      :                    +- Filter (7)
      :                       +- Scan csv  (6)
      +- BroadcastExchange (38)
         +- HashAggregate (37)
            +- Exchange (36)
               +- HashAggregate (35)
                  +- Project (34)
                     +- RangeJoin (33)
                        :- Project (17)
                        :  +- Filter (16)
                

In [7]:
final_LA_df = joined_LA.withColumn(
    "crimes_per_person",
    (col("crime_count") / col("total_population")
))
final_LA_df.select("COMM", "median_income_per_person", "crimes_per_person").orderBy(col("median_income_per_person").desc()).show()


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------------------+------------------------+-------------------+
|               COMM|median_income_per_person|  crimes_per_person|
+-------------------+------------------------+-------------------+
|  Pacific Palisades|        70526.2203104497|0.31066220995010413|
|      Beverly Crest|       66513.90150799365| 0.2667541629070626|
|   Marina Peninsula|       65235.69402813004|0.40050726308508183|
|Palisades Highlands|       65048.95354904471|0.12914166449256456|
|            Bel Air|       63259.97685510228| 0.2881007141992495|
|  Mandeville Canyon|       61443.86522911051|  0.193628209093721|
|          Brentwood|      60696.777650004915|0.34619978840312615|
|            Carthay|      50282.692104378286| 0.5788834029624003|
|             Venice|       46575.69192582585| 0.8405823754789272|
|       Century City|       45707.53601562712| 0.5460891505466778|
|      Playa Del Rey|         45522.596580114|  0.525965801139962|
|        Playa Vista|      44472.100292884345|0.48532377324669

# Query 4

## Configurations

In [93]:
%%configure -f
{
    "conf": {
        "spark.executor.instances": "2",
        "spark.executor.memory": "2g",
        "spark.executor.cores": "1"
    }
}

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
2842,application_1732639283265_2801,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
2728,application_1732639283265_2687,pyspark,idle,Link,Link,,
2729,application_1732639283265_2688,pyspark,idle,Link,Link,,
2736,application_1732639283265_2695,pyspark,idle,Link,Link,,
2738,application_1732639283265_2697,pyspark,idle,Link,Link,,
2741,application_1732639283265_2700,pyspark,idle,Link,Link,,
2743,application_1732639283265_2702,pyspark,idle,Link,Link,,
2753,application_1732639283265_2712,pyspark,idle,Link,Link,,
2754,application_1732639283265_2713,pyspark,idle,Link,Link,,
2762,application_1732639283265_2721,pyspark,idle,Link,Link,,
2767,application_1732639283265_2726,pyspark,idle,Link,Link,,


In [70]:
%%configure -f
{
    "conf": {
        "spark.executor.instances": "2",
        "spark.executor.memory": "4g",
        "spark.executor.cores": "2"
    }
}

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
2795,application_1732639283265_2754,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
2704,application_1732639283265_2663,pyspark,idle,Link,Link,,
2719,application_1732639283265_2678,pyspark,idle,Link,Link,,
2727,application_1732639283265_2686,pyspark,idle,Link,Link,,
2728,application_1732639283265_2687,pyspark,idle,Link,Link,,
2729,application_1732639283265_2688,pyspark,idle,Link,Link,,
2736,application_1732639283265_2695,pyspark,idle,Link,Link,,
2738,application_1732639283265_2697,pyspark,idle,Link,Link,,
2741,application_1732639283265_2700,pyspark,idle,Link,Link,,
2743,application_1732639283265_2702,pyspark,idle,Link,Link,,
2753,application_1732639283265_2712,pyspark,idle,Link,Link,,


In [63]:
%%configure -f
{
    "conf": {
        "spark.executor.instances": "2",
        "spark.executor.memory": "8g",
        "spark.executor.cores": "4"
    }
}

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
2794,application_1732639283265_2753,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
2704,application_1732639283265_2663,pyspark,idle,Link,Link,,
2719,application_1732639283265_2678,pyspark,idle,Link,Link,,
2727,application_1732639283265_2686,pyspark,idle,Link,Link,,
2728,application_1732639283265_2687,pyspark,idle,Link,Link,,
2729,application_1732639283265_2688,pyspark,idle,Link,Link,,
2736,application_1732639283265_2695,pyspark,idle,Link,Link,,
2738,application_1732639283265_2697,pyspark,idle,Link,Link,,
2741,application_1732639283265_2700,pyspark,idle,Link,Link,,
2743,application_1732639283265_2702,pyspark,idle,Link,Link,,
2753,application_1732639283265_2712,pyspark,idle,Link,Link,,


## PySpark & Sedona imports, Read datasets, register functions

In [72]:
from sedona.spark import *
from pyspark.sql.types import StructField, StructType, IntegerType, FloatType, StringType, BooleanType
from pyspark.sql.functions import col, udf, sum, avg, regexp_replace, row_number, count
from pyspark.sql.window import Window
from pyspark.sql import SparkSession

# Create spark Session
spark = SparkSession.builder \
    .appName("GeoJSON read") \
    .getOrCreate()

sedona = SedonaContext.create(spark)
# Read the file from s3
geojson_path = "s3://initial-notebook-data-bucket-dblab-905418150721/2010_Census_Blocks.geojson"
blocks_df = sedona.read.format("geojson") \
            .option("multiLine", "true").load(geojson_path) \
            .selectExpr("explode(features) as features") \
            .select("features.*")

flat_census_data = blocks_df.select( \
                [col(f"properties.{col_name}").alias(col_name) for col_name in \
                blocks_df.schema["properties"].dataType.fieldNames()] + ["geometry"]) \
            .drop("properties") \
            .drop("type")
# Print schema
flat_census_data.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- BG10: string (nullable = true)
 |-- BG10FIP10: string (nullable = true)
 |-- BG12: string (nullable = true)
 |-- CB10: string (nullable = true)
 |-- CEN_FIP13: string (nullable = true)
 |-- CITY: string (nullable = true)
 |-- CITYCOM: string (nullable = true)
 |-- COMM: string (nullable = true)
 |-- CT10: string (nullable = true)
 |-- CT12: string (nullable = true)
 |-- CTCB10: string (nullable = true)
 |-- HD_2012: long (nullable = true)
 |-- HD_NAME: string (nullable = true)
 |-- HOUSING10: long (nullable = true)
 |-- LA_FIP10: string (nullable = true)
 |-- OBJECTID: long (nullable = true)
 |-- POP_2010: long (nullable = true)
 |-- PUMA10: string (nullable = true)
 |-- SPA_2012: long (nullable = true)
 |-- SPA_NAME: string (nullable = true)
 |-- SUP_DIST: string (nullable = true)
 |-- SUP_LABEL: string (nullable = true)
 |-- ShapeSTArea: double (nullable = true)
 |-- ShapeSTLength: double (nullable = true)
 |-- ZCTA10: string (nullable = true)
 |-- geometry: geometry (nulla

In [73]:
# UDF - User Defined Functions definitions
def extract_year(date_occ: str) -> str:
    '''returns year from DATE OCC column'''
    return date_occ.split("/")[2].split(" ")[0]

extract_year_udf = udf(extract_year, StringType())
# register functions for SQL
spark.udf.register("extract_year", extract_year)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<function extract_year at 0x7f62da3f24c0>

In [74]:
crimes_2010_19_df = spark.read.csv("s3://initial-notebook-data-bucket-dblab-905418150721/CrimeData/Crime_Data_from_2010_to_2019_20241101.csv", header=True)
crimes_2020_24_df = spark.read.csv("s3://initial-notebook-data-bucket-dblab-905418150721/CrimeData/Crime_Data_from_2020_to_Present_20241101.csv", header=True)
crimes_data = crimes_2010_19_df.union(crimes_2020_24_df)

crimes_data_2015 = crimes_2010_19_df \
    .withColumn("year", extract_year_udf(col("DATE OCC"))) \
    .filter(col("year") == "2015")

census_data = spark.read.json("s3://initial-notebook-data-bucket-dblab-905418150721/2010_Census_Blocks.geojson")
income_data = spark.read.csv("s3://initial-notebook-data-bucket-dblab-905418150721/LA_income_2015.csv", header=True, inferSchema=True)
RE_data = spark.read.csv("s3://initial-notebook-data-bucket-dblab-905418150721/RE_codes.csv", header=True)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Query 4 - DataFrame API

In [75]:
@measure_time
def query4_dataframe(debug = False):
    '''Shows the two DFs that are the result of the query 4 using DataFrame API'''
    global flat_census_data, income_data, crimes_data_2015, RE_data
    
    
    # Filter rows so that dataset refers only to Los Angeles communities
    LA_areas = flat_census_data.filter(col("CITY") == "Los Angeles")

    # Join datasets Census and Average Household Income on Zip Code
    joined_df = LA_areas.join(income_data, LA_areas["ZCTA10"] == income_data["Zip Code"], "inner")

    # Cast the "Estimated Median Income" string column as a double
    joined_df = joined_df.withColumn(
        "Estimated Median Income",
        regexp_replace(col("Estimated Median Income"), "[\\$,]", ""))
    joined_df = joined_df.withColumn(
        "Estimated Median Income",
        col("Estimated Median Income").cast("double")
    )

    # Group Dataframe by COMM (Community) -  sum of population, households, avg of average income per house, aggregate geometries
    weighted_avg = False
    if weighted_avg:
        LA_comms = joined_df.groupBy("COMM").agg(
            sum("POP_2010").alias("total_population"),
            sum("HOUSING10").alias("total_housing"),
            ( sum(col("Estimated Median Income")*col("HOUSING10"))/ sum("HOUSING10") ).alias("average_income_per_house"),
            ST_Union_Aggr("geometry").alias("geometry"))
    else:
        LA_comms = joined_df.groupBy("COMM").agg(
            sum("POP_2010").alias("total_population"),
            sum("HOUSING10").alias("total_housing"),
            avg("Estimated Median Income").alias("average_income_per_house"),
            ST_Union_Aggr("geometry").alias("geometry")) 

    # Extract median income per person from average income per house, total households and population
    result_df = LA_comms.withColumn(
        "median_income_per_person",
        (col("total_housing") * col("average_income_per_house")) / col("total_population")
    )

    # Order by descending median income per person
    sorted_df = result_df.orderBy(col("median_income_per_person").desc())

    # Show dataframe
    if debug: sorted_df.select("COMM", "median_income_per_person").show()

    
    ############################ Query 4 Starts here - The above was also done in previous query ##################################
    
    # select the top 3 and the last 3 communities after ranking areas by median_income_per_person
    df_ranked = sorted_df.withColumn("rank", row_number().over(Window.orderBy(col("median_income_per_person").desc())))
    top_3 = df_ranked.filter(df_ranked.rank <= 3).select("COMM", "median_income_per_person", "geometry")
    bottom_3 = df_ranked.filter(df_ranked.rank > (df_ranked.count() - 3)).select("COMM", "median_income_per_person", "geometry")
    if debug: top_3.show()
    if debug: bottom_3.show()

    # append geom column in crime data based on LON, LAT
    crime_geo = crimes_data_2015.withColumn("geom", ST_Point("LON", "LAT"))

    # crime_geo_RE JOIN (the above top_3 and bottom_3 dataframes) on ST_Within(crime_geo.geom, result_df.geometry) ...
    crimes_top_3_comm = top_3 \
        .join(crime_geo, ST_Within(crime_geo.geom, top_3.geometry), "inner")
    crimes_bottom_3_comm = bottom_3 \
        .join(crime_geo, ST_Within(crime_geo.geom, bottom_3.geometry), "inner")

    # results above JOIN with RE codes on Vict Descent (same on both datasets)
    crimes_top_3_comm_RE = crimes_top_3_comm \
        .join(RE_data, RE_data["Vict Descent"] == crimes_top_3_comm["Vict Descent"], "inner") \
        .select( col("DR_NO"), col("AREA NAME"), col("LON"), col("LAT"), col("geom"), col("Vict Descent Full") \
        )
    crimes_bottom_3_comm_RE = crimes_bottom_3_comm \
        .join(RE_data, RE_data["Vict Descent"] == crimes_bottom_3_comm["Vict Descent"], "inner") \
        .select( col("DR_NO"), col("AREA NAME"), col("LON"), col("LAT"), col("geom"), col("Vict Descent Full") \
        )

    # group by Vict Descent select Vict Descent Full, count(*) as #
    crimes_top_3_comm_grouped = crimes_top_3_comm_RE \
        .groupBy("Vict Descent Full") \
        .agg(count("*").alias("#"))
    crimes_top_3_comm_grouped = crimes_top_3_comm_grouped.select("Vict Descent Full", "#")
    crimes_bottom_3_comm_grouped = crimes_bottom_3_comm_RE \
        .groupBy("Vict Descent Full") \
        .agg(count("*").alias("#")) 
    crimes_bottom_3_comm_grouped = crimes_bottom_3_comm_grouped.select("Vict Descent Full", "#")

    # order by # descending
    crimes_top_3_comm_grouped = crimes_top_3_comm_grouped.orderBy("#", ascending=False)
    crimes_bottom_3_comm_grouped = crimes_bottom_3_comm_grouped.orderBy("#", ascending=False)

    # show
    crimes_top_3_comm_grouped.show()
    crimes_bottom_3_comm_grouped.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Experiments

In [55]:
# Access configuration
conf = spark.sparkContext.getConf()

# Print relevant executor settings
print("Executor Instances:", conf.get("spark.executor.instances"))
print("Executor Memory:", conf.get("spark.executor.memory"))
print("Executor Cores:", conf.get("spark.executor.cores"))

query4_dataframe()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Executor Instances: 2
Executor Memory: 2g
Executor Cores: 1
+--------------------+---+
|   Vict Descent Full|  #|
+--------------------+---+
|               White|894|
|               Other|156|
|Hispanic/Latin/Me...| 97|
|               Black| 55|
|             Unknown| 50|
|         Other Asian| 31|
|American Indian/A...|  1|
|             Chinese|  1|
+--------------------+---+

+--------------------+----+
|   Vict Descent Full|   #|
+--------------------+----+
|Hispanic/Latin/Me...|3191|
|               Black| 872|
|               White| 430|
|               Other| 265|
|         Other Asian| 140|
|             Unknown|  26|
|American Indian/A...|  24|
|              Korean|   5|
|             Chinese|   3|
|            Filipino|   2|
|         AsianIndian|   1|
+--------------------+----+

Time taken by query4_dataframe: 73.36 seconds

In [76]:
# Access configuration
conf = spark.sparkContext.getConf()

# Print relevant executor settings
print("Executor Instances:", conf.get("spark.executor.instances"))
print("Executor Memory:", conf.get("spark.executor.memory"))
print("Executor Cores:", conf.get("spark.executor.cores"))

query4_dataframe()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Executor Instances: 2
Executor Memory: 4g
Executor Cores: 2
+--------------------+---+
|   Vict Descent Full|  #|
+--------------------+---+
|               White|894|
|               Other|156|
|Hispanic/Latin/Me...| 97|
|               Black| 55|
|             Unknown| 50|
|         Other Asian| 31|
|             Chinese|  1|
|American Indian/A...|  1|
+--------------------+---+

+--------------------+----+
|   Vict Descent Full|   #|
+--------------------+----+
|Hispanic/Latin/Me...|3191|
|               Black| 872|
|               White| 430|
|               Other| 265|
|         Other Asian| 140|
|             Unknown|  26|
|American Indian/A...|  24|
|              Korean|   5|
|             Chinese|   3|
|            Filipino|   2|
|         AsianIndian|   1|
+--------------------+----+

Time taken by query4_dataframe: 68.58 seconds

In [69]:
# Access configuration
conf = spark.sparkContext.getConf()

# Print relevant executor settings
print("Executor Instances:", conf.get("spark.executor.instances"))
print("Executor Memory:", conf.get("spark.executor.memory"))
print("Executor Cores:", conf.get("spark.executor.cores"))

query4_dataframe()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Executor Instances: 2
Executor Memory: 8g
Executor Cores: 4
+--------------------+---+
|   Vict Descent Full|  #|
+--------------------+---+
|               White|894|
|               Other|156|
|Hispanic/Latin/Me...| 97|
|               Black| 55|
|             Unknown| 50|
|         Other Asian| 31|
|             Chinese|  1|
|American Indian/A...|  1|
+--------------------+---+

+--------------------+----+
|   Vict Descent Full|   #|
+--------------------+----+
|Hispanic/Latin/Me...|3191|
|               Black| 872|
|               White| 430|
|               Other| 265|
|         Other Asian| 140|
|             Unknown|  26|
|American Indian/A...|  24|
|              Korean|   5|
|             Chinese|   3|
|            Filipino|   2|
|         AsianIndian|   1|
+--------------------+----+

Time taken by query4_dataframe: 57.63 seconds

# Query 5

## Configurations

In [2]:
%%configure -f
{
    "conf": {
        "spark.executor.instances": "2",
        "spark.executor.memory": "8g",
        "spark.executor.cores": "4"
    }
}

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
1268,application_1732639283265_1231,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
1208,application_1732639283265_1171,pyspark,idle,Link,Link,,
1211,application_1732639283265_1174,pyspark,idle,Link,Link,,
1212,application_1732639283265_1175,pyspark,idle,Link,Link,,
1213,application_1732639283265_1176,pyspark,idle,Link,Link,,
1233,application_1732639283265_1196,pyspark,idle,Link,Link,,
1236,application_1732639283265_1199,pyspark,idle,Link,Link,,
1238,application_1732639283265_1201,pyspark,idle,Link,Link,,
1240,application_1732639283265_1203,pyspark,idle,Link,Link,,
1243,application_1732639283265_1206,pyspark,idle,Link,Link,,
1244,application_1732639283265_1207,pyspark,idle,Link,Link,,


In [7]:
%%configure -f
{
    "conf": {
        "spark.executor.instances": "4",
        "spark.executor.memory": "4g",
        "spark.executor.cores": "2"
    }
}

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
1269,application_1732639283265_1232,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
1208,application_1732639283265_1171,pyspark,idle,Link,Link,,
1211,application_1732639283265_1174,pyspark,idle,Link,Link,,
1212,application_1732639283265_1175,pyspark,idle,Link,Link,,
1213,application_1732639283265_1176,pyspark,idle,Link,Link,,
1233,application_1732639283265_1196,pyspark,idle,Link,Link,,
1236,application_1732639283265_1199,pyspark,idle,Link,Link,,
1238,application_1732639283265_1201,pyspark,idle,Link,Link,,
1240,application_1732639283265_1203,pyspark,idle,Link,Link,,
1243,application_1732639283265_1206,pyspark,idle,Link,Link,,
1244,application_1732639283265_1207,pyspark,idle,Link,Link,,


In [12]:
%%configure -f
{
    "conf": {
        "spark.executor.instances": "8",
        "spark.executor.memory": "2g",
        "spark.executor.cores": "1"
    }
}

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
1452,application_1732639283265_1413,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
1451,application_1732639283265_1412,pyspark,idle,Link,Link,,
1452,application_1732639283265_1413,pyspark,idle,Link,Link,,✔


## PySpark imports, Read datasets, register functions

In [14]:
from pyspark.sql import SparkSession
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, StructType, IntegerType, FloatType, StringType, BooleanType
from pyspark.sql.functions import col, udf, sum, max, min, avg, count, mean, when, monotonically_increasing_id, dense_rank, window, row_number
from pyspark.sql.window import Window
from sedona.spark import *

spark = SparkSession.builder \
    .appName("Query 5") \
    .getOrCreate()

# Create sedona context
sedona = SedonaContext.create(spark)

# read datasets
crimes_2010_19_df = spark.read.csv("s3://initial-notebook-data-bucket-dblab-905418150721/CrimeData/Crime_Data_from_2010_to_2019_20241101.csv", header=True)
crimes_2020_24_df = spark.read.csv("s3://initial-notebook-data-bucket-dblab-905418150721/CrimeData/Crime_Data_from_2020_to_Present_20241101.csv", header=True)
crimes_df = crimes_2010_19_df.union(crimes_2020_24_df)
police_stations_df = spark.read.csv("s3://initial-notebook-data-bucket-dblab-905418150721/LA_Police_Stations.csv", header=True)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Query 5 - DataFrame API

In [15]:
@measure_time
def query5_dataframe(df, debug = False):
    '''Shows the DF that is result of the query 5 using DataFrame API'''
    global police_stations_df
    
    joined_df = df.select('DR_NO', 'AREA NAME', 'LON', 'LAT') \
        .filter( (col('LON') != '0') & (col('LON') != 0) ) \
        .withColumn("crime_point", ST_Point("LON", "LAT")) \
        .join(police_stations_df) \
        .withColumn("police_point", ST_Point("X", "Y")) \
        .withColumn('distance', ST_DistanceSphere("crime_point", "police_point")/1000) # divide with 1000 to conver into km \

    crimes_in_null_island = df.filter( (col('LON') == '0') | (col('LON') == 0) ).count()
        
    if debug: joined_df.filter(col('DR_NO') == '001307355').show(30)
    # In this DF, for each DR_NO we have 21 rows for the distances between the crime location and police departments 

    # Define a window 
    windowSpec = Window.partitionBy("DR_NO")
    extended_df = joined_df.withColumn("min_distance", when(col("distance").isNotNull(), min("distance").over(windowSpec)).otherwise(None)) \
        .filter(col('distance') == col('min_distance'))
    if debug: extended_df.filter(col('DR_NO') == '001307355').show(30)
    if debug: extended_df.filter(col('DIVISION') == 'HOLLENBECK').orderBy('min_distance', ascending=True).show(30)
    if debug: extended_df.filter(col('DIVISION') == 'HOLLENBECK').agg(avg('min_distance').alias('avg')).show()
    # Now we have each DR_NO only once

    grouped_df = extended_df.groupBy("DIVISION") \
        .agg( \
             avg("min_distance").alias("average_distance"), \
             count("*").alias("#") \
        ) \
        .select('DIVISION', 'average_distance', '#') \
        .orderBy(["#"], ascending=[False])

    schema = StructType([
        StructField("DIVISION", StringType(), nullable=True), 
        StructField("average_distance", FloatType(), nullable=True), 
        StructField("#", IntegerType(), nullable=False) 
    ])
    # Create a DataFrame for the Null Island row
    null_island_row = spark.createDataFrame([
        ("Unknown", None, crimes_in_null_island) 
    ], schema)

    grouped_df = grouped_df.union(null_island_row)
    
    if debug: grouped_df.agg(sum('#').alias('incidents')).show()

    # This DF is grouped by police departments/divisions 
    # so we see the average distance of crimes that happened closer to that and the number of these incidents/crimes
    grouped_df.show(50)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Experiments

In [6]:
# Access configuration
conf = spark.sparkContext.getConf()

# Print relevant executor settings
print("Executor Instances:", conf.get("spark.executor.instances"))
print("Executor Memory:", conf.get("spark.executor.memory"))
print("Executor Cores:", conf.get("spark.executor.cores"))

query5_dataframe(crimes_df)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Executor Instances: 2
Executor Memory: 8g
Executor Cores: 4
+----------------+------------------+------+
|        DIVISION|  average_distance|     #|
+----------------+------------------+------+
|       HOLLYWOOD|2.0762639601787205|224340|
|        VAN NUYS| 2.953369742819787|210134|
|       SOUTHWEST|2.1913988057808838|188901|
|        WILSHIRE|2.5926655329787796|185996|
|     77TH STREET| 1.716544971970102|171827|
|         OLYMPIC|1.7236036971780937|170897|
| NORTH HOLLYWOOD|2.6430060941415676|167854|
|         PACIFIC|3.8500706553079027|161359|
|         CENTRAL|0.9924764374568903|153871|
|         RAMPART|1.5345341879190049|152736|
|       SOUTHEAST|2.4218662158881794|152176|
|     WEST VALLEY| 3.035671216314078|138643|
|         TOPANGA|3.2969548417555608|138217|
|        FOOTHILL| 4.250921708424991|134896|
|          HARBOR|3.7025615993565033|126747|
|      HOLLENBECK|2.6801812377068224|115837|
|WEST LOS ANGELES| 2.792457289034108|115781|
|          NEWTON|1.6346357397097435|111

In [11]:
# Access configuration
conf = spark.sparkContext.getConf()

# Print relevant executor settings
print("Executor Instances:", conf.get("spark.executor.instances"))
print("Executor Memory:", conf.get("spark.executor.memory"))
print("Executor Cores:", conf.get("spark.executor.cores"))

query5_dataframe(crimes_df)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Executor Instances: 4
Executor Memory: 4g
Executor Cores: 2
+----------------+------------------+------+
|        DIVISION|  average_distance|     #|
+----------------+------------------+------+
|       HOLLYWOOD|2.0762639601787196|224340|
|        VAN NUYS|2.9533697428197883|210134|
|       SOUTHWEST| 2.191398805780884|188901|
|        WILSHIRE|2.5926655329787796|185996|
|     77TH STREET|1.7165449719701025|171827|
|         OLYMPIC|1.7236036971780941|170897|
| NORTH HOLLYWOOD| 2.643006094141567|167854|
|         PACIFIC|3.8500706553079027|161359|
|         CENTRAL|0.9924764374568901|153871|
|         RAMPART|1.5345341879190044|152736|
|       SOUTHEAST| 2.421866215888179|152176|
|     WEST VALLEY|3.0356712163140793|138643|
|         TOPANGA|3.2969548417555603|138217|
|        FOOTHILL| 4.250921708424989|134896|
|          HARBOR| 3.702561599356503|126747|
|      HOLLENBECK|2.6801812377068237|115837|
|WEST LOS ANGELES|2.7924572890341084|115781|
|          NEWTON|1.6346357397097424|111

In [16]:
# Access configuration
conf = spark.sparkContext.getConf()

# Print relevant executor settings
print("Executor Instances:", conf.get("spark.executor.instances"))
print("Executor Memory:", conf.get("spark.executor.memory"))
print("Executor Cores:", conf.get("spark.executor.cores"))

query5_dataframe(crimes_df)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Executor Instances: 8
Executor Memory: 2g
Executor Cores: 1
+----------------+------------------+------+
|        DIVISION|  average_distance|     #|
+----------------+------------------+------+
|       HOLLYWOOD|2.0762639601787205|224340|
|        VAN NUYS|2.9533697428197865|210134|
|       SOUTHWEST|2.1913988057808846|188901|
|        WILSHIRE|2.5926655329787787|185996|
|     77TH STREET|1.7165449719701025|171827|
|         OLYMPIC|1.7236036971780935|170897|
| NORTH HOLLYWOOD|2.6430060941415676|167854|
|         PACIFIC|3.8500706553079027|161359|
|         CENTRAL|0.9924764374568898|153871|
|         RAMPART|1.5345341879190046|152736|
|       SOUTHEAST| 2.421866215888179|152176|
|     WEST VALLEY|3.0356712163140793|138643|
|         TOPANGA|3.2969548417555603|138217|
|        FOOTHILL| 4.250921708424989|134896|
|          HARBOR|3.7025615993565038|126747|
|      HOLLENBECK|2.6801812377068233|115837|
|WEST LOS ANGELES|2.7924572890341075|115781|
|          NEWTON| 1.634635739709743|111