# DE1 — Lab 1: PySpark Warmup and Reading Plans
> Author : Badr TAJINI - Data Engineering I - ESIEE 2025-2026
---

This notebook is the **student deliverable**. Execute all cells and attach evidence.

## 0. Imports and Spark session

In [None]:
import os, sys, datetime, pathlib
from pyspark.sql import SparkSession, functions as F
print("Python:", sys.version)
spark = SparkSession.builder.appName("de1-lab1").getOrCreate()
print("Spark:", spark.version)


Python: 3.10.18 (main, Jun  5 2025, 13:14:17) [GCC 11.2.0]
Spark: 4.0.1


## 1. Load the CSV inputs

In [None]:
src_a = "data/lab1_dataset_a.csv"
src_b = "data/lab1_dataset_b.csv"
df_a = spark.read.option("header","true").option("inferSchema","true").csv(src_a)
df_b = spark.read.option("header","true").option("inferSchema","true").csv(src_b)
df = df_a.unionByName(df_b)
df.cache()
print("Rows:", df.count())
df.printSchema()
df.show(5, truncate=False)


Rows: 2700
root
 |-- id: integer (nullable = true)
 |-- category: string (nullable = true)
 |-- value: double (nullable = true)
 |-- text: string (nullable = true)

+---+-----------+-----+----------------------------------------------------------------------------+
|id |category   |value|text                                                                        |
+---+-----------+-----+----------------------------------------------------------------------------+
|0  |toys       |48.47|metrics ui data elt row columnar reduce warehouse shuffle join spark elt    |
|1  |books      |39.9 |metrics row lake aggregate columnar data reduce row columnar filter         |
|2  |grocery    |7.96 |lake join partition scala elt data                                          |
|3  |electronics|5.15 |spark scala elt filter join columnar lake lake plan warehouse columnar spark|
|4  |toys       |44.87|aggregate metrics row row filter lake map metrics columnar spark            |
+---+-----------+-----+----

## 2. Top‑N with **RDD** API

In [None]:
# RDD pipeline: tokenize 'text' column and count tokens
rdd = df.select("text").rdd.flatMap(lambda row: (row[0] or "").lower().split())
pair = rdd.map(lambda t: (t, 1))
counts = pair.reduceByKey(lambda a,b: a+b)
top_rdd = counts.sortBy(lambda kv: (-kv[1], kv[0])).take(10)
top_rdd


In [None]:
# Save as CSV (token,count)
pathlib.Path("outputs").mkdir(exist_ok=True)
with open("outputs/top10_rdd.csv","w",encoding="utf-8") as f:
    f.write("token,count\n")
    for t,c in top_rdd:
        f.write(f"{t},{c}\n")
print("Wrote outputs/top10_rdd.csv")


### RDD plan — evidence

In [None]:
# Trigger an action and record a textual plan for evidence
_ = counts.count()
plan_rdd = df._jdf.queryExecution().executedPlan().toString()
pathlib.Path("proof").mkdir(exist_ok=True)
with open("proof/plan_rdd.txt","w") as f:
    f.write(str(datetime.datetime.now()) + "\n\n")
    f.write(plan_rdd)
print("Saved proof/plan_rdd.txt")


## 3. Top‑N with **DataFrame** API

In [None]:
tokens = F.explode(F.split(F.lower(F.col("text")), "\\s+")).alias("token")
df_tokens = df.select(tokens).where(F.col("token") != "")
agg_df = df_tokens.groupBy("token").agg(F.count("*").alias("count"))
top_df = agg_df.orderBy(F.desc("count"), F.asc("token")).limit(10)
top_df.show(truncate=False)
top_df.coalesce(1).write.mode("overwrite").option("header","true").csv("outputs/top10_df_tmp")
# move single part file to stable path
import glob, shutil
part = glob.glob("outputs/top10_df_tmp/part*")[0]
shutil.copy(part, "outputs/top10_df.csv")
print("Wrote outputs/top10_df.csv")


### DataFrame plan — evidence

In [None]:
plan_df = top_df._jdf.queryExecution().executedPlan().toString()
with open("proof/plan_df.txt","w") as f:
    f.write(str(datetime.datetime.now()) + "\n\n")
    f.write(plan_df)
print("Saved proof/plan_df.txt")


## 4. Projection experiment: `select("*")` vs minimal projection

In [None]:
# Case A: select all columns then aggregate on 'category'
all_cols = df.select("*").groupBy("category").agg(F.sum("value").alias("sum_value"))
all_cols.explain("formatted")
_ = all_cols.count()  # trigger

# Case B: minimal projection then aggregate
proj = df.select("category","value").groupBy("category").agg(F.sum("value").alias("sum_value"))
proj.explain("formatted")
_ = proj.count()  # trigger

print("Open Spark UI at http://localhost:4040 while each job runs and record metrics into lab1_metrics_log.csv")


## 5. Cleanup

In [None]:
spark.stop()
print("Spark session stopped.")
