In [4]:
import time
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, split, explode, trim, regexp_replace, count, desc, broadcast

# Configuration (4 Executors, 1 Core, 2GB RAM)
spark = SparkSession.builder \
    .appName("Query3_Mocodes_Analysis") \
    .config("spark.executor.instances", "4") \
    .config("spark.executor.cores", "1") \
    .config("spark.executor.memory", "2g") \
    .getOrCreate()

DATA_PATH = "s3a://groups-bucket-dblab-905418150721/group36/processed_data"

print(f"Spark Session Created. App Name: {spark.conf.get('spark.app.name')}")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Spark Session Created. App Name: Query3_Mocodes_Analysis

In [5]:
# 1. Load Data
df_crime = spark.read.parquet(f"{DATA_PATH}/crime_data_clean.parquet")
df_mo_codes = spark.read.parquet(f"{DATA_PATH}/mo_codes.parquet")

# 2. Prepare MO Codes Mapping Table (Rename for clarity)
df_mo_mapping = df_mo_codes \
    .withColumnRenamed(df_mo_codes.columns[0], "code") \
    .withColumnRenamed(df_mo_codes.columns[1], "description")

# 3. Clean & Explode Crime MO Codes
# Logic: Trim spaces -> Replace multiple spaces with single space -> Split -> Explode
df_exploded = df_crime \
    .filter(col("Mocodes").isNotNull()) \
    .select(
        explode(
            split(
                regexp_replace(trim(col("Mocodes")), r"\s+", " "), 
                " "
            )
        ).alias("mo_code")
    )

# Cache the exploded dataset as it is used by both implementations
df_exploded.cache()
print(f"Data Prepared. Total MO Code Occurrences: {df_exploded.count():,}")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Data Prepared. Total MO Code Occurrences: 8,256,514

In [6]:
print("--- DataFrame API: Join Strategy Comparison ---")

def benchmark_join_strategy(strategy_name, mapping_df):
    spark.catalog.clearCache()
    start_time = time.time()
    
    # Perform Join
    df_joined = df_exploded.join(
        mapping_df,
        df_exploded["mo_code"] == mapping_df["code"],
        "inner"
    )
    
    # Aggregation
    count_result = df_joined.groupBy("code", "description").count().count()
    
    duration = time.time() - start_time
    print(f"Strategy: {strategy_name:<15} | Time: {duration:.4f}s | Result Rows: {count_result}")
    return duration

# 1. Default Strategy (Let Optimizer decide)
time_default = benchmark_join_strategy("Default", df_mo_mapping)

# 2. Broadcast Strategy (Force Broadcast)
time_broadcast = benchmark_join_strategy("Broadcast", broadcast(df_mo_mapping))

# 3. Merge Strategy (Force SortMergeJoin)
time_merge = benchmark_join_strategy("Merge", df_mo_mapping.hint("merge"))

print("-" * 50)
print(f"DataFrame API (Best Time): {min(time_default, time_broadcast, time_merge):.4f}s")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

--- DataFrame API: Join Strategy Comparison ---
Strategy: Default         | Time: 5.1218s | Result Rows: 613
Strategy: Broadcast       | Time: 2.5074s | Result Rows: 613
Strategy: Merge           | Time: 5.3529s | Result Rows: 613
--------------------------------------------------
DataFrame API (Best Time): 2.5074s

In [7]:
print("--- RDD API Implementation ---")
spark.catalog.clearCache()
start_time_rdd = time.time()

# 1. Convert DataFrames to RDDs
rdd_crimes = df_exploded.rdd.map(lambda row: (row["mo_code"], 1))
rdd_mapping = df_mo_mapping.rdd.map(lambda row: (row["code"], row["description"]))

# 2. Join & Aggregate
# Map -> ((Code, Desc), 1) -> ReduceByKey
rdd_result = rdd_crimes \
    .join(rdd_mapping) \
    .map(lambda x: ((x[0], x[1][1]), x[1][0])) \
    .reduceByKey(lambda a, b: a + b) \
    .sortBy(lambda x: x[1], ascending=False) \
    .collect()

duration_rdd = time.time() - start_time_rdd
print(f"RDD API Time: {duration_rdd:.4f}s")

# Display Top 5 Results
print("\nTop 5 MO Codes (RDD Results):")
for item in rdd_result[:5]:
    print(f"{item[0][0]} - {item[0][1]}: {item[1]}")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

--- RDD API Implementation ---
RDD API Time: 17.5667s

Top 5 MO Codes (RDD Results):
0344 - Removes vict property: 1002274
1822 - Stranger: 547653
0416 - Hit-Hit w/ weapon: 404276
0329 - Vandalized: 377265
0913 - Victim knew Suspect: 278086