In [None]:
# STEP 1 ############################################################
# GOAL:
#   Install PySpark in Google Colab.
#
# WHAT IS HAPPENING:
#   - Apache Spark is a distributed data processing engine (NOT a database).
#   - In Colab, we do NOT manually install Java or download Spark binaries.
#   - Installing `pyspark` is enough to use Spark from Python in Colab.
#
# EXPECTED RESULT:
#   - This finishes quietly (no output or minimal output).
#   - If you re-run it, it’s fine.
############################################################

!pip install -q pyspark


In [None]:
# STEP 2 ############################################################
# GOAL:
#   Start a SparkSession (the main entry point to Spark).
#
# WHAT IS SPARKSESSION:
#   - Think of SparkSession as the "Spark engine handle".
#   - You use it to create DataFrames, read files, and run SQL queries.
#
# EXPECTED RESULT:
#   - You should see Spark version printed.
#   - `spark.sparkContext.master` will usually show "local[*]" in Colab,
#     meaning Spark runs locally using available CPU cores in the Colab VM.
############################################################

from pyspark.sql import SparkSession

spark = (
    SparkSession.builder
    .appName("CIS660_Spark_Tutorial")
    .getOrCreate()
)

print("Spark version:", spark.version)
print("Spark master:", spark.sparkContext.master)


Spark version: 4.0.2
Spark master: local[*]


In [None]:
# STEP 3 ############################################################
# GOAL:
#   Create your first Spark DataFrame (a distributed table).
#
# WHAT IS A DATAFRAME IN SPARK:
#   - Similar idea to a pandas DataFrame or an SQL table.
#   - But Spark DataFrames are designed to scale across many machines.
#   - Even in Colab (single machine), you use the same API.
#
# EXPECTED RESULT:
#   - `show()` prints a preview of rows (like a table).
#   - `printSchema()` shows Spark’s inferred data types.
############################################################

data = [
    ("Alice", "Sales", 70000),
    ("Bob", "Sales", 60000),
    ("Charlie", "Engineering", 120000),
]

columns = ["name", "department", "salary"]

df = spark.createDataFrame(data, columns)

df.show()
df.printSchema()


+-------+-----------+------+
|   name| department|salary|
+-------+-----------+------+
|  Alice|      Sales| 70000|
|    Bob|      Sales| 60000|
|Charlie|Engineering|120000|
+-------+-----------+------+

root
 |-- name: string (nullable = true)
 |-- department: string (nullable = true)
 |-- salary: long (nullable = true)



In [None]:
# STEP 4 ############################################################
# GOAL:
#   Perform basic DataFrame operations: select, filter, and sort.
#
# WHY THIS MATTERS:
#   - These are the core operations you’ll use in every Spark project.
#   - They mirror SQL concepts:
#       SELECT columns
#       WHERE filters
#       ORDER BY sorting
#
# NOTE:
#   - Many Spark operations are "transformations" (they build a plan).
#   - Results are only computed when you run an "action" (e.g., show, count).
############################################################

from pyspark.sql.functions import col

# SELECT-like behavior: choose specific columns
df.select("name", "salary").show()

# WHERE-like behavior: filter rows
df.filter(col("salary") > 65000).show()

# ORDER BY-like behavior: sort rows
df.orderBy(col("salary").desc()).show()


+-------+------+
|   name|salary|
+-------+------+
|  Alice| 70000|
|    Bob| 60000|
|Charlie|120000|
+-------+------+

+-------+-----------+------+
|   name| department|salary|
+-------+-----------+------+
|  Alice|      Sales| 70000|
|Charlie|Engineering|120000|
+-------+-----------+------+

+-------+-----------+------+
|   name| department|salary|
+-------+-----------+------+
|Charlie|Engineering|120000|
|  Alice|      Sales| 70000|
|    Bob|      Sales| 60000|
+-------+-----------+------+



In [None]:
# STEP 5 ############################################################
# GOAL:
#   Group and aggregate data (e.g., average salary per department).
#
# WHY THIS MATTERS:
#   - Aggregations are where Spark becomes very powerful at scale.
#   - groupBy + aggregations is a standard analytics workflow.
#
# EXPECTED RESULT:
#   - One row per department with count and average salary.
############################################################

from pyspark.sql.functions import avg, count

(
    df.groupBy("department")
      .agg(
          count("*").alias("num_people"),
          avg("salary").alias("avg_salary")
      )
      .show()
)


+-----------+----------+----------+
| department|num_people|avg_salary|
+-----------+----------+----------+
|      Sales|         2|   65000.0|
|Engineering|         1|  120000.0|
+-----------+----------+----------+



In [None]:
# STEP 6 ############################################################
# GOAL:
#   Understand "lazy evaluation" (transformations vs actions).
#
# KEY IDEA:
#   - Spark does NOT immediately run transformations.
#   - It builds an execution plan (DAG).
#   - An ACTION triggers execution.
#
# WHAT WE DO:
#   - Create a transformation `plan`
#   - Use `explain()` to view Spark’s planned execution
#   - Use `show()` (an action) to trigger the computation
############################################################

plan = df.filter(col("salary") > 60000).select("name", "department")

print("This is a DataFrame object (a plan), not computed results:")
print(plan)

print("\nSpark execution plan:")
plan.explain(True)

print("\nAction: show() triggers execution:")
plan.show()


This is a DataFrame object (a plan), not computed results:
DataFrame[name: string, department: string]

Spark execution plan:
== Parsed Logical Plan ==
'Project ['name, 'department]
+- Filter (salary#2L > cast(60000 as bigint))
   +- LogicalRDD [name#0, department#1, salary#2L], false

== Analyzed Logical Plan ==
name: string, department: string
Project [name#0, department#1]
+- Filter (salary#2L > cast(60000 as bigint))
   +- LogicalRDD [name#0, department#1, salary#2L], false

== Optimized Logical Plan ==
Project [name#0, department#1]
+- Filter (isnotnull(salary#2L) AND (salary#2L > 60000))
   +- LogicalRDD [name#0, department#1, salary#2L], false

== Physical Plan ==
*(1) Project [name#0, department#1]
+- *(1) Filter (isnotnull(salary#2L) AND (salary#2L > 60000))
   +- *(1) Scan ExistingRDD[name#0,department#1,salary#2L]


Action: show() triggers execution:
+-------+-----------+
|   name| department|
+-------+-----------+
|  Alice|      Sales|
|Charlie|Engineering|
+-------+-------

In [None]:
# STEP 7 ############################################################
# GOAL:
#   Read data from a CSV file using Spark.
#
# IMPORTANT FOR STUDENTS:
#   Later in labs you will load real datasets from Google Drive.
#   To do that:
#     1) Mount Google Drive (see STEP 14)
#     2) In the Colab left sidebar → Files → Google Drive
#     3) Right-click your file → Copy path
#     4) Paste the path into spark.read.csv(...)
#
# WHY WE CREATE A SMALL CSV FIRST:
#   - Keeps this tutorial self-contained.
#   - Does NOT use lab datasets.
############################################################

csv_text = """student_id,name,course,score,credits,tags
1,Alice,Math,85,3,hard|core
2,Bob,Math,90,3,important|exam
3,Charlie,Physics,78,4,lab|science
4,Diana,Physics,88,4,science|advanced
5,Evan,History,92,3,reading|essay
"""

csv_path = "/content/students_sample.csv"

with open(csv_path, "w") as f:
    f.write(csv_text)

df_students = (
    spark.read
    .option("header", True)
    .option("inferSchema", True)
    .csv(csv_path)
)

df_students.printSchema()
df_students.show(truncate=False)


root
 |-- student_id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- course: string (nullable = true)
 |-- score: integer (nullable = true)
 |-- credits: integer (nullable = true)
 |-- tags: string (nullable = true)

+----------+-------+-------+-----+-------+----------------+
|student_id|name   |course |score|credits|tags            |
+----------+-------+-------+-----+-------+----------------+
|1         |Alice  |Math   |85   |3      |hard|core       |
|2         |Bob    |Math   |90   |3      |important|exam  |
|3         |Charlie|Physics|78   |4      |lab|science     |
|4         |Diana  |Physics|88   |4      |science|advanced|
|5         |Evan   |History|92   |3      |reading|essay   |
+----------+-------+-------+-----+-------+----------------+



In [None]:
# STEP 8 ############################################################
# GOAL:
#   Demonstrate safe type conversion (data cleaning).
#
# WHY THIS MATTERS:
#   Real datasets may contain invalid numeric values.
#   We protect casting using regex validation.
############################################################

from pyspark.sql.functions import trim, when

df_clean = (
    df_students
    .withColumn(
        "score_num",
        when(trim(col("score")).rlike(r"^[0-9]+$"),
             trim(col("score")).cast("int"))
    )
    .withColumn(
        "credits_num",
        when(trim(col("credits")).rlike(r"^[0-9]+$"),
             trim(col("credits")).cast("int"))
    )
)

df_clean.show()


+----------+-------+-------+-----+-------+----------------+---------+-----------+
|student_id|   name| course|score|credits|            tags|score_num|credits_num|
+----------+-------+-------+-----+-------+----------------+---------+-----------+
|         1|  Alice|   Math|   85|      3|       hard|core|       85|          3|
|         2|    Bob|   Math|   90|      3|  important|exam|       90|          3|
|         3|Charlie|Physics|   78|      4|     lab|science|       78|          4|
|         4|  Diana|Physics|   88|      4|science|advanced|       88|          4|
|         5|   Evan|History|   92|      3|   reading|essay|       92|          3|
+----------+-------+-------+-----+-------+----------------+---------+-----------+



In [None]:
# STEP 9 ############################################################
# GOAL:
#   Aggregate student performance by course.
#
# WHY:
#   groupBy + aggregation is a core Spark workflow.
############################################################

from pyspark.sql.functions import avg, count

(
    df_clean.groupBy("course")
    .agg(
        count("*").alias("num_students"),
        avg("score_num").alias("avg_score")
    )
    .show()
)


+-------+------------+---------+
| course|num_students|avg_score|
+-------+------------+---------+
|   Math|           2|     87.5|
|History|           1|     92.0|
|Physics|           2|     83.0|
+-------+------------+---------+



In [None]:
# STEP 10 ############################################################
# GOAL:
#   Use Spark SQL on a DataFrame.
#
# WHY THIS MATTERS:
#   - Spark supports two interfaces:
#       (1) DataFrame API (Python)
#       (2) Spark SQL
#   - SQL is often easier for students who already know SQL.
#
# WHAT WE DO:
#   - Register a temporary view (like a temporary table)
#   - Run a SQL query against it
#
# NOTE:
#   - Temp views live only in the current Spark session.
############################################################

df_clean.createOrReplaceTempView("students")

spark.sql("""
SELECT name, course, score_num
FROM students
WHERE score_num >= 85
ORDER BY score_num DESC
""").show()

+-----+-------+---------+
| name| course|score_num|
+-----+-------+---------+
| Evan|History|       92|
|  Bob|   Math|       90|
|Diana|Physics|       88|
|Alice|   Math|       85|
+-----+-------+---------+



In [None]:
# STEP 11 ############################################################
# GOAL:
#   Learn explode() using Student example (tags separated by |).
#
# WHY THIS MATTERS:
#   - explode() converts an array/list into multiple rows.
#   - This is common in log data, tags, keywords, and text pipelines.
#
# WHAT WE DO:
#   - Split tags on "|"
#   - Explode into one row per tag


from pyspark.sql.functions import split, explode

df_tags = df_clean.withColumn(
    "tag",
    explode(split(col("tags"), r"\|"))
)

df_tags.select("name", "course", "tag").show()


+-------+-------+---------+
|   name| course|      tag|
+-------+-------+---------+
|  Alice|   Math|     hard|
|  Alice|   Math|     core|
|    Bob|   Math|important|
|    Bob|   Math|     exam|
|Charlie|Physics|      lab|
|Charlie|Physics|  science|
|  Diana|Physics|  science|
|  Diana|Physics| advanced|
|   Evan|History|  reading|
|   Evan|History|    essay|
+-------+-------+---------+



In [None]:
# STEP 12 ############################################################
# GOAL:
#   Count how often each tag appears.
#
# WHY:
#   This demonstrates distributed counting.
############################################################

from pyspark.sql.functions import desc

(
    df_tags.groupBy("tag")
    .count()
    .orderBy(desc("count"))
    .show()
)


+---------+-----+
|      tag|count|
+---------+-----+
|  science|    2|
|     exam|    1|
|  reading|    1|
|     core|    1|
|      lab|    1|
|    essay|    1|
|     hard|    1|
|important|    1|
| advanced|    1|
+---------+-----+



In [None]:
# STEP 13 ############################################################
# GOAL:
#   Write output results to disk.
#
# NOTE:
#   Spark writes output as a folder (distributed files).
############################################################

out_path = "/content/student_tag_counts"

(
    df_tags.groupBy("tag")
    .count()
    .coalesce(1)
    .write
    .mode("overwrite")
    .csv(out_path)
)

print("Saved results to:", out_path)


Saved results to: /content/student_tag_counts


In [None]:
# STEP 14 ############################################################
# OPTIONAL: GOOGLE DRIVE FOR REAL DATASETS
#
# INSTRUCTIONS FOR STUDENTS:
#   1) Uncomment drive.mount() and run this cell
#   2) In the left sidebar → Files → Google Drive
#   3) Find your dataset
#   4) Right-click → Copy path
#   5) Paste the path into spark.read.csv(...)
############################################################

# from google.colab import drive
# drive.mount('/content/drive')

# Example usage:
# path = "/content/drive/MyDrive/YourFolder/your_dataset.csv"
# df_real = spark.read.option("header", True).csv(path)
# df_real.show()

