# Query 3

In [1]:
%%configure -f
{
  "conf": {
    "spark.executor.instances": "4",
    "spark.executor.cores": "1",
    "spark.driver.memory": "2g"
  }
}

ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
1817,application_1765289937462_1801,pyspark,idle,Link,Link,,
1820,application_1765289937462_1804,pyspark,idle,Link,Link,,
1821,application_1765289937462_1805,pyspark,idle,Link,Link,,
1822,application_1765289937462_1806,pyspark,idle,Link,Link,,


## DataFrame API υλοποίηση

In [3]:
import time, csv
from io import StringIO
from pyspark.sql import functions as F
from pyspark.sql.functions import broadcast

# =====================================================
# Query 3 — MO Codes frequency + descriptions mapping
# DF + RDD + join strategies (hint/explain) + timings
# =====================================================

CRIME_DIR = "s3://initial-notebook-data-bucket-dblab-905418150721/project_data/LA_Crime_Data/"
MO_PATH   = "s3://initial-notebook-data-bucket-dblab-905418150721/project_data/MO_codes.txt"

# ---------------------------
# Helper: robust MO lookup loader
# ---------------------------
def load_mo_lookup(mo_path: str):
    """
    MO_codes.txt format (per assignment): each line starts with code, then a space, then description.
    Return DF: (mo_code, description)
    """
    raw = sc.textFile(mo_path)

    # keep only lines that start with a digit (avoid empty/header junk)
    raw2 = raw.filter(lambda line: line and line.strip() and line.strip()[0].isdigit())

    def parse_line(line):
        # split first token (code) and rest (description)
        parts = line.strip().split(None, 1)
        code = parts[0].strip()
        desc = parts[1].strip() if len(parts) > 1 else ""
        return (code, desc)

    mo_rdd = raw2.map(parse_line).filter(lambda x: x[0] != "")
    mo_df = spark.createDataFrame(mo_rdd, ["mo_code", "description"])
    mo_df.limit(1).count()
    return mo_df

# ---------------------------
# Helper: timing + explain
# ---------------------------
def time_and_explain(df, label):
    print("\n==============================")
    print(label)
    print("==============================")
    df.explain(True)
    _ = df.limit(1).count()  # warm-up
    t0 = time.time()
    n = df.count()
    dt = time.time() - t0
    print(f"{label} -> {dt:.3f} sec | rows: {n}")
    return dt

# =====================================================
# =============== DataFrame API =======================
# =====================================================


crime_df = (spark.read
    .option("header", "true")
    .option("inferSchema", "true")
    .csv(CRIME_DIR)
)

crime_mo_exploded = (
    crime_df
    .select("Mocodes")
    .filter(F.col("Mocodes").isNotNull())
    .withColumn("mo_code", F.explode(F.split(F.col("Mocodes"), r"\s+")))
    .filter((F.col("mo_code").isNotNull()) & (F.col("mo_code") != ""))
)

mo_counts = crime_mo_exploded.groupBy("mo_code").count()

mo_codes = load_mo_lookup(MO_PATH)

mo_counts_norm = mo_counts.withColumn("mo_code", F.trim(F.col("mo_code").cast("string")))
mo_codes_norm  = mo_codes.withColumn("mo_code", F.trim(F.col("mo_code").cast("string")))

# CACHE το ακριβό κομμάτι (explode+groupBy) για να μην ξανατρέχει σε κάθε join
mo_counts_norm = mo_counts_norm.cache()
mo_counts_norm.count()  # materialize once (ζεσταίνει + χτίζει το cache)

# Join strategies
#df_baseline = mo_counts_norm.join(mo_codes_norm, on="mo_code", how="left")
#t_baseline  = time_and_explain(df_baseline, "DF Join baseline (no hint)")

df_bcast    = mo_counts_norm.join(broadcast(mo_codes_norm), on="mo_code", how="left")
t_bcast     = time_and_explain(df_bcast, "DF Join BROADCAST")

#df_merge    = mo_counts_norm.hint("MERGE").join(mo_codes_norm.hint("MERGE"), on="mo_code", how="left")
#t_merge     = time_and_explain(df_merge, "DF Join MERGE")

#spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")
#spark.conf.set("spark.sql.join.preferSortMergeJoin", "false")

#df_hash     = mo_counts_norm.hint("SHUFFLE_HASH").join(mo_codes_norm.hint("SHUFFLE_HASH"), on="mo_code", how="left")
#t_hash      = time_and_explain(df_hash, "DF Join SHUFFLE_HASH")

#df_nl       = mo_counts_norm.hint("SHUFFLE_REPLICATE_NL").join(
#                mo_codes_norm.hint("SHUFFLE_REPLICATE_NL"),
#                on="mo_code",
#                how="left"
#             )
#t_nl        = time_and_explain(df_nl, "DF Join SHUFFLE_REPLICATE_NL")

#spark.conf.unset("spark.sql.autoBroadcastJoinThreshold")
#spark.conf.unset("spark.sql.join.preferSortMergeJoin")

final_df = (df_bcast
            .orderBy(F.col("count").desc())
            .select("mo_code", "description", "count"))

_ = final_df.limit(1).count()  # warm-up
t0 = time.time()
top20_rows = final_df.limit(20).collect()
df_total = time.time() - t0
print(f"FINAL DF top20 -> {df_total:.3f} sec | rows fetched: {len(top20_rows)}")

print("\n==============================")
print(f"FINAL DF top20 -> {df_total:.3f} sec | rows fetched: {len(top20_rows)}")
print("==============================")
final_df.show(20, truncate=False)

print("\nJoin timing summary (sec):")
#print("baseline:", t_baseline)
print("broadcast:", t_bcast)
#print("merge:", t_merge)
#print("shuffle_hash:", t_hash)
#print("shuffle_replicate_nl:", t_nl)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…


DF Join BROADCAST
== Parsed Logical Plan ==
'Join UsingJoin(LeftOuter, [mo_code])
:- Project [trim(cast(mo_code#846 as string), None) AS mo_code#866, count#852L]
:  +- Aggregate [mo_code#846], [mo_code#846, count(1) AS count#852L]
:     +- Filter (isnotnull(mo_code#846) AND NOT (mo_code#846 = ))
:        +- Project [Mocodes#797, mo_code#846]
:           +- Generate explode(split(Mocodes#797, \s+, -1)), false, [mo_code#846]
:              +- Filter isnotnull(Mocodes#797)
:                 +- Project [Mocodes#797]
:                    +- Relation [DR_NO#787,Date Rptd#788,DATE OCC#789,TIME OCC#790,AREA#791,AREA NAME#792,Rpt Dist No#793,Part 1-2#794,Crm Cd#795,Crm Cd Desc#796,Mocodes#797,Vict Age#798,Vict Sex#799,Vict Descent#800,Premis Cd#801,Premis Desc#802,Weapon Used Cd#803,Weapon Desc#804,Status#805,Status Desc#806,Crm Cd 1#807,Crm Cd 2#808,Crm Cd 3#809,Crm Cd 4#810,... 4 more fields] csv
+- ResolvedHint (strategy=broadcast)
   +- Project [trim(cast(mo_code#855 as string), None) AS m

In [3]:
# =====================================================
# ================== RDD API ==========================
# =====================================================
import time, csv
from io import StringIO
from pyspark.sql import functions as F
from pyspark.sql.functions import broadcast

# =====================================================
# Query 3 — MO Codes frequency + descriptions mapping
# DF + RDD + join strategies (hint/explain) + timings
# =====================================================

CRIME_DIR = "s3://initial-notebook-data-bucket-dblab-905418150721/project_data/LA_Crime_Data/"
MO_PATH   = "s3://initial-notebook-data-bucket-dblab-905418150721/project_data/MO_codes.txt"

# ---------------------------
# Helper: robust MO lookup loader
# ---------------------------
def load_mo_lookup(mo_path: str):
    """
    MO_codes.txt format (per assignment): each line starts with code, then a space, then description.
    Return DF: (mo_code, description)
    """
    raw = sc.textFile(mo_path)

    # keep only lines that start with a digit (avoid empty/header junk)
    raw2 = raw.filter(lambda line: line and line.strip() and line.strip()[0].isdigit())

    def parse_line(line):
        # split first token (code) and rest (description)
        parts = line.strip().split(None, 1)
        code = parts[0].strip()
        desc = parts[1].strip() if len(parts) > 1 else ""
        return (code, desc)

    mo_rdd = raw2.map(parse_line).filter(lambda x: x[0] != "")
    mo_df = spark.createDataFrame(mo_rdd, ["mo_code", "description"])
    mo_df.limit(1).count()
    return mo_df

# ---------------------------
# Helper: timing + explain
# ---------------------------
def time_and_explain(df, label):
    print("\n==============================")
    print(label)
    print("==============================")
    df.explain(True)
    _ = df.limit(1).count()  # warm-up
    t0 = time.time()
    n = df.count()
    dt = time.time() - t0
    print(f"{label} -> {dt:.3f} sec | rows: {n}")
    return dt
print("\n=== RDD pipeline (Corrected to include Description Join) ===")

start_rdd = time.time()

raw = sc.textFile(CRIME_DIR)
header = raw.first()

def split_row(line):
    return next(csv.reader(StringIO(line)))

rdd = raw.filter(lambda x: x != header).map(split_row)

cols = [c.strip('"') for c in header.split(",")]
mo_idx = cols.index("Mocodes")

# RDD με ζεύγη (mo_code, count)
mo_rdd_counts = (rdd
    .filter(lambda r: r[mo_idx] is not None and r[mo_idx] != "")
    .flatMap(lambda r: r[mo_idx].split(" "))
    .filter(lambda x: x is not None and x != "")
    .map(lambda mo: (mo.strip(), 1)) # Key = mo_code (string)
    .reduceByKey(lambda a, b: a + b)
)

# 2. RDD για MO Codes Lookup 
raw_lookup = sc.textFile(MO_PATH)
raw2_lookup = raw_lookup.filter(lambda line: line and line.strip() and line.strip()[0].isdigit())

def parse_line_rdd(line):
    parts = line.strip().split(None, 1)
    code = parts[0].strip()
    desc = parts[1].strip() if len(parts) > 1 else "Unknown Description" # Default value
    return (code, desc)

# RDD με ζεύγη (mo_code, description)
mo_rdd_lookup = raw2_lookup.map(parse_line_rdd).filter(lambda x: x[0] != "")

# JOIN RDDs (Pair RDD Join), Result: (mo_code, (count, description))
joined_rdd = mo_rdd_counts.leftOuterJoin(mo_rdd_lookup) 

# Order and: (mo_code, (count, (description ή None))) --> (count, mo_code, description)
final_rdd_sorted = (joined_rdd
    .map(lambda x: (
        x[1][0], 
        x[0], 
        x[1][1] if x[1][1] is not None else "No Description Found"
    ))
    .sortBy(lambda x: x[0], ascending=False)
)

top20 = final_rdd_sorted.take(20)
rdd_time = time.time() - start_rdd

print("\n==============================")
print(f"RDD time -> {rdd_time:.3f} sec (top20 computed)")
print("==============================")
print("Top 20 MO codes (RDD) with Description:")
print("{:<10} {:<10} {}".format("Count", "Code", "Description"))
print("-" * 50)
for cnt, code, desc in top20:
    print("{:<10} {:<10} {}".format(cnt, code, desc))



FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…


=== RDD pipeline (Corrected to include Description Join) ===

RDD time -> 29.737 sec (top20 computed)
Top 20 MO codes (RDD) with Description:
Count      Code       Description
--------------------------------------------------
1002900    0344       Removes vict property
548422     1822       Stranger
404773     0416       Hit-Hit w/ weapon
377536     0329       Vandalized
278618     0913       Victim knew Suspect
256188     2000       Domestic violence
219082     1300       Vehicle involved
213165     0400       Force used
177470     1402       Evidence Booked (any crime)
131229     1609       Smashed
122108     1309       Susp uses vehicle
120238     1202       Victim was aged (60 & over) or blind/physically disabled/unable to care for self
120159     0325       Took merchandise
118073     1814       Susp is/was current/former boyfriend/girlfriend
116763     0444       Pushed
115589     1501       Other MO (see rpt)
113609     1307       Breaks window
105665     0334       Brandishes