In [16]:
%%configure -f
{
    "driverMemory": "2G",
    "executorMemory": "2G",
    "executorCores": 1,
    "numExecutors": 4
}

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
1446,application_1765289937462_1434,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
1429,application_1765289937462_1417,pyspark,idle,Link,Link,,
1436,application_1765289937462_1424,pyspark,idle,Link,Link,,
1437,application_1765289937462_1425,pyspark,idle,Link,Link,,
1438,application_1765289937462_1426,pyspark,idle,Link,Link,,
1441,application_1765289937462_1429,pyspark,idle,Link,Link,,
1445,application_1765289937462_1433,pyspark,busy,Link,Link,,
1446,application_1765289937462_1434,pyspark,idle,Link,Link,,✔


In [9]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("query2_python").getOrCreate()
sc = spark.sparkContext


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [17]:
crimes_old_df = spark.read.csv("s3://initial-notebook-data-bucket-dblab-905418150721/project_data/LA_Crime_Data/LA_Crime_Data_2010_2019.csv",\
    header=True,
    inferSchema=True)
crimes_new_df = spark.read.csv("s3://initial-notebook-data-bucket-dblab-905418150721/project_data/LA_Crime_Data/LA_Crime_Data_2020_2025.csv",\
    header=True,
    inferSchema=True)


descent_diction = spark.read.csv("s3://initial-notebook-data-bucket-dblab-905418150721/project_data/RE_codes.csv",\
    header=True,
    inferSchema=True)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [24]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window
import time, gc

# Number of warm-up executions and measured runs
warmups = 3
runs = 10
times = []

# Execute the same pipeline multiple times
for i in range(warmups + runs):

    # Merge old and new crime datasets into a single DataFrame
    crimes_df = crimes_old_df.unionByName(crimes_new_df)

    # Extract the year of occurrence from the date field
    year_df = crimes_df.withColumn(
        "year",
        F.year(F.to_timestamp(F.col("DATE OCC"), "yyyy MMM dd hh:mm:ss a"))
    )

    # Keep only records with valid year and victim descent information
    clean_df = year_df.filter(
        F.col("year").isNotNull() & F.col("Vict Descent").isNotNull()
    )

    # Count the number of crimes per year and victim descent category
    per_race = (
        clean_df.groupBy("year", "Vict Descent")
        .agg(F.count("*").alias("cnt"))
    )

    # Compute the total number of crimes per year using a window function
    w_year = Window.partitionBy("year")
    ranked = per_race.withColumn("year_total", F.sum("cnt").over(w_year))

    # Calculate the percentage contribution of each category per year
    ranked = ranked.withColumn(
        "percent",
        F.round(F.col("cnt") / F.col("year_total") * 100, 1)
    )

    # Rank victim descent categories within each year by descending crime count
    w_rank = Window.partitionBy("year").orderBy(F.desc("cnt"))
    ranked = ranked.withColumn("rank", F.dense_rank().over(w_rank))

    # Select only the top 3 categories per year
    top3 = ranked.filter(F.col("rank") <= 3)

    # Join with the dictionary table to get the full description of victim descent
    result = top3.join(descent_diction, on="Vict Descent", how="left")

    # Select and format the final output columns
    result = (
        result.select(
            "year",
            F.col("Vict Descent Full").alias("Vict_Descent"),
            "cnt",
            "percent"
        )
        .orderBy(F.desc("year"), F.desc("cnt"))
    )

    # Trigger execution and measure execution time
    start = time.time()
    rows = result.collect()
    dt = time.time() - start

    # Free memory explicitly after each run
    del rows
    gc.collect()

    # Distinguish warm-up runs from measured runs
    if i < warmups:
        print(f"Warm-up {i+1}: {dt:.4f} sec")
    else:
        times.append(dt)
        print(f"Run {i-warmups+1}: {dt:.4f} sec")

# Compute the mean execution time excluding warm-up runs
mean_time = sum(times) / len(times)
print(f"Mean time (excluding warm-ups): {mean_time:.4f} sec")


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Warm-up 1: 2.5884 sec
Warm-up 2: 2.5219 sec
Warm-up 3: 2.7971 sec
Run 1: 3.1966 sec
Run 2: 2.6676 sec
Run 3: 2.8649 sec
Run 4: 2.7281 sec
Run 5: 2.7901 sec
Run 6: 2.8636 sec
Run 7: 2.3575 sec
Run 8: 2.3083 sec
Run 9: 2.3265 sec
Run 10: 2.4744 sec
Mean time (excluding warm-ups): 2.6578 sec

In [23]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window
import time, gc

warmups = 3
runs = 10
times = []


crimes_old_df.unionByName(crimes_new_df).createOrReplaceTempView("crimes")
descent_diction.createOrReplaceTempView("descent_diction")

query = """
WITH year_df AS (
    SELECT
        *,
        year(to_timestamp(`DATE OCC`, 'yyyy MMM dd hh:mm:ss a')) AS year
    FROM crimes
),
clean_df AS (
    SELECT
        *
    FROM year_df
    WHERE year IS NOT NULL
      AND `Vict Descent` IS NOT NULL
),
per_race AS (
    SELECT
        year,
        `Vict Descent`,
        COUNT(*) AS cnt
    FROM clean_df
    GROUP BY year, `Vict Descent`
),
ranked AS (
    SELECT
        year,
        `Vict Descent`,
        cnt,
        SUM(cnt) OVER (PARTITION BY year) AS year_total,
        ROUND(cnt / SUM(cnt) OVER (PARTITION BY year) * 100, 1) AS percent,
        DENSE_RANK() OVER (PARTITION BY year ORDER BY cnt DESC) AS rank
    FROM per_race
),
top3 AS (
    SELECT *
    FROM ranked
    WHERE rank <= 3
),
final AS (
    SELECT
        t.year,
        d.`Vict Descent Full` AS Vict_Descent,
        t.cnt,
        t.percent
    FROM top3 t
    LEFT JOIN descent_diction d
      ON t.`Vict Descent` = d.`Vict Descent`
)
SELECT
    *
FROM final
ORDER BY year DESC, cnt DESC
"""

for i in range(warmups + runs):
    
    result_sql = spark.sql(query)

    start = time.time()
    rows = result_sql.collect()
    dt = time.time() - start

    del rows
    gc.collect()

    if i < warmups:
        print(f"Warm-up {i+1}: {dt:.4f} sec")
    else:
        times.append(dt)
        print(f"Run {i-warmups+1}: {dt:.4f} sec")

mean_time = sum(times) / len(times)
print(f"Mean time (excluding warm-ups): {mean_time:.4f} sec")


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Warm-up 1: 3.5139 sec
Warm-up 2: 2.8723 sec
Warm-up 3: 2.9223 sec
Run 1: 2.6963 sec
Run 2: 2.4882 sec
Run 3: 2.7207 sec
Run 4: 2.6706 sec
Run 5: 2.6070 sec
Run 6: 2.8759 sec
Run 7: 3.0987 sec
Run 8: 3.3833 sec
Run 9: 2.7290 sec
Run 10: 2.5429 sec
Mean time (excluding warm-ups): 2.7813 sec