# **PySpark Data Processing Assignment**

## 1. Data Processing **Pipeline**

### Load data:

In [0]:
# Load table
table_name = "default.individual_incident_2020"   
df = spark.table(table_name)

# Basic info
print("Rows:", df.count(), "Cols:", len(df.columns))
df.printSchema()
display(df.limit(5))


Rows: 9130711 Cols: 53
root
 |-- state: string (nullable = true)
 |-- ID: string (nullable = true)
 |-- ORI: string (nullable = true)
 |-- incident_number: string (nullable = true)
 |-- date_HRF: long (nullable = true)
 |-- date_SIF: string (nullable = true)
 |-- hour: string (nullable = true)
 |-- total_offense: long (nullable = true)
 |-- total_victim: long (nullable = true)
 |-- total_offender: long (nullable = true)
 |-- violence_offense: long (nullable = true)
 |-- theft_offense: long (nullable = true)
 |-- drug_offense: long (nullable = true)
 |-- sex_offense: long (nullable = true)
 |-- kidnapping_trafficking: long (nullable = true)
 |-- other_offense: long (nullable = true)
 |-- gun_involvement: long (nullable = true)
 |-- drug_involvement: long (nullable = true)
 |-- property_value: long (nullable = true)
 |-- stolen_motor: long (nullable = true)
 |-- male_victim: long (nullable = true)
 |-- female_victim: long (nullable = true)
 |-- unknown_sex_victim: long (nullable = true)


state,ID,ORI,incident_number,date_HRF,date_SIF,hour,total_offense,total_victim,total_offender,violence_offense,theft_offense,drug_offense,sex_offense,kidnapping_trafficking,other_offense,gun_involvement,drug_involvement,property_value,stolen_motor,male_victim,female_victim,unknown_sex_victim,w_victim,b_victim,i_victim,a_victim,p_victim,unknown_race_victim,minor_victim,non_minor_victim,unknown_age_victim,offender_wi_family,offender_outside_family,offender_not_known,male_offender,female_offender,unknown_sex_offender,w_offender,b_offender,i_offender,a_offender,p_offender,unknown_race_offender,minor_offender,non_minor_offender,unknown_age_offender,completed_attempted2,completed_attempted3,property_description,property_description2,property_description3,race_offender2
NV-Nevada,NV0160200_CI0BRCTRG28N,NV0160200,CI0BRCTRG28N,20200803,03aug2020,On or between midnight and 0059,1.0,1.0,1.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,,263.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,1.0,,,,,,,,,,,,,,,,,,77.0,9.0,,
NV-Nevada,NV0160200_CI0BRCTRQ28N,NV0160200,CI0BRCTRQ28N,20200803,03aug2020,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,0.0,0.0,,0.0,0.0,0.0,0.0,0.0,,1.0,0.0,,,,,,,
NV-Nevada,NV0160200_CI0BRCTRV28N,NV0160200,CI0BRCTRV28N,20200803,03aug2020,On or between 0200 and 0259,1.0,1.0,2.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,,1160.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,1.0,0.0,,,,,,,,,,,,,,,,,,6.0,77.0,25.0,
NV-Nevada,NV0160200_CI0BRCTV-28N,NV0160200,CI0BRCTV-28N,20200805,05aug2020,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,0.0,0.0,,0.0,0.0,0.0,0.0,0.0,,1.0,0.0,,,,,,,
NV-Nevada,NV0160200_CI0BRCTV128N,NV0160200,CI0BRCTV128N,20200805,05aug2020,On or between 1800 and 1859,1.0,1.0,1.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,,18530.0,0.0,1.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,,,,,,,,,,,,,,,,,,20.0,13.0,77.0,


### Apply transformations:

In [0]:
from pyspark.sql import functions as F

# Select and derive
df_t = (df
    .select("state", "incident_number", "date_HRF")
    .withColumn("state_abbr", F.split(F.col("state"), "-")[0])
    .withColumn("date_str", F.col("date_HRF").cast("string"))
    .withColumn("incident_date", F.expr("try_to_date(date_str, 'yyyyMMdd')"))
    .withColumn("year", F.year("incident_date"))
    .withColumn("month", F.month("incident_date"))
)

# Early filters
df_t = (df_t
    .filter(F.col("incident_number").isNotNull())           
    .filter(F.col("date_str").rlike(r"^\d{8}$"))            
    .filter(F.col("incident_date").isNotNull())             
    .filter(F.col("state_abbr").isNotNull())                
)

print("Rows after filters:", df_t.count())
display(df_t.limit(5))



Rows after filters: 9105317


state,incident_number,date_HRF,state_abbr,date_str,incident_date,year,month
NV-Nevada,CI0BRCTRG28N,20200803,NV,20200803,2020-08-03,2020,8
NV-Nevada,CI0BRCTRQ28N,20200803,NV,20200803,2020-08-03,2020,8
NV-Nevada,CI0BRCTRV28N,20200803,NV,20200803,2020-08-03,2020,8
NV-Nevada,CI0BRCTV-28N,20200805,NV,20200805,2020-08-05,2020,8
NV-Nevada,CI0BRCTV128N,20200805,NV,20200805,2020-08-05,2020,8


In [0]:
from pyspark.sql import functions as F

# Small dim: state -> census region
region_map = [
 ("CT","Northeast"),("ME","Northeast"),("MA","Northeast"),("NH","Northeast"),
 ("RI","Northeast"),("VT","Northeast"),("NJ","Northeast"),("NY","Northeast"),
 ("PA","Northeast"),
 ("IL","Midwest"),("IN","Midwest"),("MI","Midwest"),("OH","Midwest"),("WI","Midwest"),
 ("IA","Midwest"),("KS","Midwest"),("MN","Midwest"),("MO","Midwest"),
 ("NE","Midwest"),("ND","Midwest"),("SD","Midwest"),
 ("DE","South"),("FL","South"),("GA","South"),("MD","South"),("NC","South"),
 ("SC","South"),("VA","South"),("DC","South"),("WV","South"),
 ("AL","South"),("KY","South"),("MS","South"),("TN","South"),
 ("AR","South"),("LA","South"),("OK","South"),("TX","South"),
 ("AZ","West"),("CO","West"),("ID","West"),("MT","West"),
 ("NV","West"),("NM","West"),("UT","West"),("WY","West"),
 ("AK","West"),("CA","West"),("HI","West"),("OR","West"),("WA","West")
]
region_df = spark.createDataFrame(region_map, ["state_abbr","region"])

# Broadcast join
df_j = df_t.join(F.broadcast(region_df), "state_abbr", "left")

# GroupBy + aggregations
by_state_month = (df_j
    .groupBy("state_abbr","region","year","month")
    .agg(
        F.count("*").alias("n_incidents"),
        F.approx_count_distinct("incident_number").alias("n_distinct_inc")
    )
    .withColumn("year_month", F.concat_ws("-", F.col("year"), F.col("month")))
)

print("Groups:", by_state_month.count())
display(by_state_month.orderBy(F.desc("n_incidents")).limit(10))


Groups: 568


state_abbr,region,year,month,n_incidents,n_distinct_inc,year_month
TX,South,2020,12,125096,130844,2020-12
TX,South,2020,11,123586,117623,2020-11
TX,South,2020,10,116784,111343,2020-10
TX,South,2020,8,114286,102965,2020-8
TX,South,2020,9,110686,96018,2020-9
TX,South,2020,2,105662,95426,2020-2
TX,South,2020,7,105593,106073,2020-7
TX,South,2020,6,105335,91435,2020-6
TX,South,2020,4,103919,109061,2020-4
TX,South,2020,3,103702,98082,2020-3


###  SQL queries:

In [0]:
# Temp views
df_j = df_j.withColumn("state_abbr", F.trim(F.upper(F.col("state_abbr"))))
df_j.createOrReplaceTempView("nibrs2020")
by_state_month.createOrReplaceTempView("nibrs2020_by_state_month")

# SQL 1: Top 15 states by total incidents
sql1 = spark.sql("""
SELECT state_abbr, region, COUNT(*) AS n
FROM nibrs2020
WHERE region IS NOT NULL
GROUP BY state_abbr, region
ORDER BY n DESC
LIMIT 15
""")
display(sql1)

# SQL 2: Monthly trend per region (exclude NULL region)
sql2 = spark.sql("""
SELECT region, year, month, COUNT(*) AS n
FROM nibrs2020
WHERE region IS NOT NULL
GROUP BY region, year, month
ORDER BY region, year, month
""")
display(sql2)


state_abbr,region,n
TX,South,1306207
NC,South,543958
TN,South,533122
MI,Midwest,455170
WA,West,455022
OH,Midwest,443409
VA,South,438518
CO,West,371322
GA,South,361965
SC,South,352396


region,year,month,n
Midwest,2019,7,1
Midwest,2019,9,1
Midwest,2020,1,188846
Midwest,2020,2,174373
Midwest,2020,3,166831
Midwest,2020,4,170667
Midwest,2020,5,140986
Midwest,2020,6,190087
Midwest,2020,7,204478
Midwest,2020,8,212298


### Optimization and Write Results to Table:

In [0]:
# Optimization 
from pyspark.sql import functions as F
df_opt = df_j.filter(F.col("incident_number").isNotNull()) \
             .filter(F.col("incident_date").isNotNull()) \
             .repartition(64, "state_abbr")

by_state_month_opt = df_opt.groupBy("state_abbr","region","year","month") \
                           .agg(F.count("*").alias("n_incidents"))
df_opt.explain(mode="formatted")

# Results
by_state_month_opt.write.mode("overwrite").format("delta").saveAsTable("default.nibrs2020_by_state_month")
sql1.write.mode("overwrite").format("delta").saveAsTable("default.nibrs2020_sql_top_states")
sql2.write.mode("overwrite").format("delta").saveAsTable("default.nibrs2020_sql_region_month")

print("Results written to tables.")

== Physical Plan ==
AdaptiveSparkPlan (18)
+- == Initial Plan ==
   ColumnarToRow (17)
   +- PhotonResultStage (16)
      +- PhotonShuffleExchangeSource (15)
         +- PhotonShuffleMapStage (14)
            +- PhotonShuffleExchangeSink (13)
               +- PhotonProject (12)
                  +- PhotonBroadcastHashJoin LeftOuter (11)
                     :- PhotonProject (4)
                     :  +- PhotonProject (3)
                     :     +- PhotonProject (2)
                     :        +- PhotonScan parquet workspace.default.individual_incident_2020 (1)
                     +- PhotonShuffleExchangeSource (10)
                        +- PhotonShuffleMapStage (9)
                           +- PhotonShuffleExchangeSink (8)
                              +- PhotonFilter (7)
                                 +- PhotonRowToColumnar (6)
                                    +- LocalTableScan (5)


(1) PhotonScan parquet workspace.default.individual_incident_2020
Output [3]: [state#2

In [0]:
# Check tables
df_check = spark.table("default.nibrs2020_by_state_month")
print("Rows:", df_check.count())
df_check.show(5)

Rows: 568
+----------+-------+----+-----+-----------+--------------+
|state_abbr| region|year|month|n_incidents|n_distinct_inc|
+----------+-------+----+-----+-----------+--------------+
|        ND|Midwest|2020|   12|       4462|          NULL|
|        ND|Midwest|2020|   11|       4354|          NULL|
|        ND|Midwest|2020|    7|       4954|          NULL|
|        ND|Midwest|2020|   10|       4636|          NULL|
|        ND|Midwest|2020|    9|       4793|          NULL|
+----------+-------+----+-----+-----------+--------------+
only showing top 5 rows


## 2. Performance Analysis 



### Execution Plan and Caching optimization:

In [0]:
import time
from pyspark.sql.utils import AnalysisException

# Physical plan
by_state_month_opt.explain(True)

# First action  
t0 = time.time()
n1 = by_state_month_opt.count()
t1 = time.time()
print("First count() time:", round(t1 - t0, 2), "seconds | rows:", n1)

cached = False
try:
    by_state_month_opt.cache()
    _ = by_state_month_opt.count()
    cached = True
    print("DataFrame cached in memory.")
except Exception as e:
    print("Cache skipped (serverless restriction):", str(e).split("\n", 1)[0])

# Second action 
t2 = time.time()
n2 = by_state_month_opt.count()
t3 = time.time()
print(
    "Second count() time:",
    round(t3 - t2, 2),
    "seconds | rows:",
    n2,
    "| cached:" ,
    cached,
)

# SQL via temp view
by_state_month_opt.createOrReplaceTempView("temp_by_state_month")
spark.sql("SELECT COUNT(*) AS n FROM temp_by_state_month").show()
spark.sql("""
  SELECT region, year, SUM(n_incidents) AS total_inc
  FROM temp_by_state_month
  GROUP BY region, year
""").explain(True)



== Parsed Logical Plan ==
'Aggregate ['state_abbr, 'region, 'year, 'month], ['state_abbr, 'region, 'year, 'month, 'count(*) AS n_incidents#28037]
+- 'RepartitionByExpression ['state_abbr], 64
   +- 'Filter 'isNotNull('incident_date)
      +- 'Filter 'isNotNull('incident_number)
         +- Project [trim(upper(state_abbr#24783), None) AS state_abbr#26946, state#24728, incident_number#24731, date_HRF#24732L, date_str#24785, incident_date#24787, year#24789, month#24791, region#26944]
            +- Project [state_abbr#24783, state#24728, incident_number#24731, date_HRF#24732L, date_str#24785, incident_date#24787, year#24789, month#24791, region#26944]
               +- Join LeftOuter, (state_abbr#24783 = state_abbr#26943)
                  :- Filter isnotnull(state_abbr#24783)
                  :  +- Filter isnotnull(incident_date#24787)
                  :     +- Filter RLIKE(date_str#24785, ^\d{8}$)
                  :        +- Filter isnotnull(incident_number#24731)
                  

### Analysis Process
Spark’s execution plan output shows how the query was optimized for efficiency. The system automatically pushed filters closer to the data source to remove unnecessary rows early, reducing I/O overhead. It also selected only the columns required for grouping and aggregation, which minimized data movement. Operations in the plan indicate that Databricks Photon’s vectorized engine was used for faster computation. Filters on state and date attributes were applied at the data source level, which helped prevent extra shuffling across executors and improved overall query performance.

The query optimization mainly relied on early filtering and reduced data exchange between nodes. However, the initial aggregation still required a complete scan of roughly nine million records, leading to a runtime of about five seconds. The execution plan also showed that small reference data was broadcast to all workers to reduce shuffle costs. Even with these optimizations, the first execution was limited by read latency from Delta storage rather than by CPU processing, revealing I/O as the main performance bottleneck.

After the first run, in-memory caching was applied so that the dataset could be reused without another full read from storage. Although persistent caching isn’t supported in the current environment, the second execution ran faster because Spark reused the optimized plan and metadata already loaded into memory. This demonstrates how caching improves performance for repeated operations by reducing both disk I/O and query compilation overhead. The difference between the first and second run times highlights the impact of caching on repeated computations in Spark pipelines.

## 3. Actions vs Transformations

In [0]:
from pyspark.sql import functions as F

# Transformations 
filtered_df = df_opt.filter(F.col("region") == "South")  
grouped_df  = filtered_df.groupBy("state_abbr").count()    
print("No action executed yet - Spark has built the logical plan only.")

# Action 
result = grouped_df.collect() 
print("Rows collected:", len(result))

No action executed yet - Spark has built the logical plan only.
Rows collected: 16


In [0]:
# Transformations vs Actions -
 
filtered_df = by_state_month_opt.filter(by_state_month_opt.region == "South")
print("Transformation defined, but no computation yet.")
count_result = filtered_df.count()
print(f"Action executed. Total rows in region 'South': {count_result}")


Transformation defined, but no computation yet.
Action executed. Total rows in region 'South': 192


In Spark, transformations such as filtering or grouping are lazy operations that only build a logical plan without performing any actual computation. The real execution happens only when an action like counting or collecting is called, which triggers Spark to launch tasks, read data, and produce results. In this demonstration, the filtering step simply defines what should happen, while the counting step forces Spark to execute the plan. This behavior shows how Spark delays computation until an action is invoked, allowing it to optimize the overall execution plan before running the job.

## 4. Machine Learning

In [0]:
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import ClusteringEvaluator

input_df = by_state_month_opt.select("year", "month", "n_incidents")

assembler = VectorAssembler(
    inputCols=["year", "month", "n_incidents"],
    outputCol="features"
)
feature_df = assembler.transform(input_df)

kmeans = KMeans(k=3, seed=42)
model = kmeans.fit(feature_df)

predictions = model.transform(feature_df)

evaluator = ClusteringEvaluator()
silhouette = evaluator.evaluate(predictions)

print(f"Silhouette with squared euclidean distance = {silhouette:.4f}")
print("Cluster Centers:")
for center in model.clusterCenters():
    print(center)


Silhouette with squared euclidean distance = 0.8314
Cluster Centers:
[2.02000000e+03 6.72049689e+00 3.19425528e+04]
[2.02000000e+03 6.39746835e+00 6.72495949e+03]
[2.02000000e+03 6.50000000e+00 1.08850583e+05]
