In [None]:
# !pip show pyspark

In [None]:
!apt-get update
!apt-get install openjdk-8-jdk -y

0% [Working]            Get:1 http://security.ubuntu.com/ubuntu jammy-security InRelease [129 kB]
0% [Connecting to archive.ubuntu.com (185.125.190.83)] [1 InRelease 14.2 kB/1290% [Connecting to archive.ubuntu.com (185.125.190.83)] [Connected to cloud.r-pr                                                                               Get:2 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease [3,632 B]
0% [Waiting for headers] [Connected to r2u.stat.illinois.edu (192.17.190.167)]                                                                                Get:3 https://cli.github.com/packages stable InRelease [3,917 B]
Get:4 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease [1,581 B]
Hit:5 http://archive.ubuntu.com/ubuntu jammy InRelease
Get:6 https://r2u.stat.illinois.edu/ubuntu jammy InRelease [6,555 B]
Get:7 http://archive.ubuntu.com/ubuntu jammy-updates InRelease [128 kB]
Hit:8 https://ppa.launchpadcontent.net/deadsnakes

In [None]:
!rm -rf spark-3.2.0-bin-hadoop3.2
!rm -rf spark-3.0.3-bin-hadoop3.2
!rm -f spark-3.*.tgz

In [None]:
!java -version

openjdk version "17.0.16" 2025-07-15
OpenJDK Runtime Environment (build 17.0.16+8-Ubuntu-0ubuntu122.04.1)
OpenJDK 64-Bit Server VM (build 17.0.16+8-Ubuntu-0ubuntu122.04.1, mixed mode, sharing)


In [None]:
# 1. Install Dependencies
# Install JDK 8 (still necessary for the JVM)
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

# Install the correct PySpark version (3.5.1) and a utility library
# The 'pyspark' package contains all necessary Java binaries, simplifying setup.
!pip install -q pyspark==3.5.1

# 2. Set Java Home
import os
from pyspark.sql import SparkSession

# Set JAVA_HOME, which is often the final piece needed for the JVM
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

# 3. Create the SparkSession
# PySpark will now use its internal libraries and findspark is not strictly needed.
spark = SparkSession.builder \
    .appName("ColabSparkAuto") \
    .master("local[*]") \
    .config("spark.driver.memory", "6g") \
    .getOrCreate()

print("\n---")
print("Spark initialized successfully! Spark Version:", spark.version)


---
Spark initialized successfully! Spark Version: 3.5.1


In [None]:
import os

# Set environment variables for Java
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop3.2"
os.environ["PATH"] = os.environ["JAVA_HOME"] + "/bin:" + os.environ["PATH"]


In [None]:
# Define the mock data (list of tuples)
data = [
    ('Alice', 1, 'New York', 75000),
    ('Bob', 2, 'London', 90000),
    ('Charlie', 3, 'Paris', 62000),
    ('David', 4, 'New York', 88000),
    ('Eve', 5, 'London', 105000)
]

# Define the schema (column names)
columns = ["Name", "ID", "City", "Salary"]

# Create the DataFrame
df = spark.createDataFrame(data, columns)

## ðŸ’¡ Key Inspection Concepts

# .show(): Displays the top rows of the DataFrame
print("\n--- Displaying the DataFrame (.show()) ---")
df.show()

# .printSchema(): Shows the column names and data types (schema)
print("\n--- Displaying the Schema (.printSchema()) ---")
df.printSchema()

# .count(): Returns the number of rows (an Action)
print("\n--- Row Count (.count()) ---")
print("Total Rows:", df.count())

# .describe(): Computes statistics for numerical columns
print("\n--- Descriptive Statistics (.describe()) ---")
df.describe().show()


--- Displaying the DataFrame (.show()) ---
+-------+---+--------+------+
|   Name| ID|    City|Salary|
+-------+---+--------+------+
|  Alice|  1|New York| 75000|
|    Bob|  2|  London| 90000|
|Charlie|  3|   Paris| 62000|
|  David|  4|New York| 88000|
|    Eve|  5|  London|105000|
+-------+---+--------+------+


--- Displaying the Schema (.printSchema()) ---
root
 |-- Name: string (nullable = true)
 |-- ID: long (nullable = true)
 |-- City: string (nullable = true)
 |-- Salary: long (nullable = true)


--- Row Count (.count()) ---
Total Rows: 5

--- Descriptive Statistics (.describe()) ---
+-------+-----+------------------+------+------------------+
|summary| Name|                ID|  City|            Salary|
+-------+-----+------------------+------+------------------+
|  count|    5|                 5|     5|                 5|
|   mean| NULL|               3.0|  NULL|           84000.0|
| stddev| NULL|1.5811388300841898|  NULL|16263.455967290593|
|    min|Alice|                 1|L

In [None]:
from pyspark.sql.functions import col, lit

df_filtered = df.filter(
    (col("City") == "New York") & (col("Salary") > 8000)
)
df_filtered.show()

df_with_add_col = df.withColumn("Income Tax", col("Salary") * 12/100)
df_with_add_col.show()


+-----+---+--------+------+
| Name| ID|    City|Salary|
+-----+---+--------+------+
|Alice|  1|New York| 75000|
|David|  4|New York| 88000|
+-----+---+--------+------+

+-------+---+--------+------+----------+
|   Name| ID|    City|Salary|Income Tax|
+-------+---+--------+------+----------+
|  Alice|  1|New York| 75000|    9000.0|
|    Bob|  2|  London| 90000|   10800.0|
|Charlie|  3|   Paris| 62000|    7440.0|
|  David|  4|New York| 88000|   10560.0|
|    Eve|  5|  London|105000|   12600.0|
+-------+---+--------+------+----------+



In [None]:
from pyspark.sql.functions import col, sum, avg, count

# Mock review data (Product ID, Rating, Reviewer Location)
review_data = [
    (101, 5, 'USA'),
    (102, 4, 'Canada'),
    (101, 4, 'USA'),
    (103, 5, 'Mexico'),
    (102, 1, 'Canada'),
    (101, 5, 'USA'),
    (103, 3, 'Mexico'),
    (104, 5, 'USA'),
    (104, 2, 'USA')
]
review_columns = ["product_id", "rating", "location"]

review_df = spark.createDataFrame(review_data, review_columns)
print("\n--- Original Review Data ---")
review_df.show()
review_df.printSchema()


--- Original Review Data ---
+----------+------+--------+
|product_id|rating|location|
+----------+------+--------+
|       101|     5|     USA|
|       102|     4|  Canada|
|       101|     4|     USA|
|       103|     5|  Mexico|
|       102|     1|  Canada|
|       101|     5|     USA|
|       103|     3|  Mexico|
|       104|     5|     USA|
|       104|     2|     USA|
+----------+------+--------+

root
 |-- product_id: long (nullable = true)
 |-- rating: long (nullable = true)
 |-- location: string (nullable = true)



In [None]:
df_rating_review = review_df.groupBy("product_id").agg(
    avg("rating").alias("Avg rating"),
    count("rating").alias("Total Reviews")
).sort(col("product_id"))

print("Product Summary -")
df_rating_review.show()

Product Summary -
+----------+-----------------+-------------+
|product_id|       Avg rating|Total Reviews|
+----------+-----------------+-------------+
|       101|4.666666666666667|            3|
|       102|              2.5|            2|
|       103|              4.0|            2|
|       104|              3.5|            2|
+----------+-----------------+-------------+



In [None]:
# Mock product details (Product ID, Name, Category)
details_data = [
    (101, 'Laptop', 'Electronics'),
    (102, 'T-Shirt', 'Apparel'),
    (103, 'Coffee Maker', 'Home Goods'),
    (104, 'Book', 'Media'),
    (105, 'Headphones', 'Electronics') # This product has no reviews (yet)
]
details_columns = ["product_id", "product_name", "category"]

details_df = spark.createDataFrame(details_data, details_columns)
print("\n--- Product Details Data ---")
details_df.show()


--- Product Details Data ---
+----------+------------+-----------+
|product_id|product_name|   category|
+----------+------------+-----------+
|       101|      Laptop|Electronics|
|       102|     T-Shirt|    Apparel|
|       103|Coffee Maker| Home Goods|
|       104|        Book|      Media|
|       105|  Headphones|Electronics|
+----------+------------+-----------+



In [None]:
products_quality = df_rating_review.join(
    details_df, on="product_id", how="left"
).select(col("product_id"), col("product_name"), col("category"), col("Avg rating"), col("Total Reviews")
).sort(col("product_name"))

print("\n--- Products Quality ---")
products_quality.show()



--- Products Quality ---
+----------+------------+-----------+-----------------+-------------+
|product_id|product_name|   category|       Avg rating|Total Reviews|
+----------+------------+-----------+-----------------+-------------+
|       104|        Book|      Media|              3.5|            2|
|       103|Coffee Maker| Home Goods|              4.0|            2|
|       101|      Laptop|Electronics|4.666666666666667|            3|
|       102|     T-Shirt|    Apparel|              2.5|            2|
+----------+------------+-----------+-----------------+-------------+



In [None]:
from pyspark.sql.functions import col, sum, avg, count, current_timestamp
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType
from datetime import date

# (Assuming 'spark' is already initialized from previous steps)

sales_data = [
    ("A", 100, date(2025, 1, 1)),
    ("B", 150, date(2025, 1, 1)),
    ("A", 200, date(2025, 1, 2)),
    ("A", 50, date(2025, 1, 2)), # Important: Two sales on the same day for Team A
    ("B", 300, date(2025, 1, 2)),
    ("C", 400, date(2025, 1, 3)),
    ("A", 250, date(2025, 1, 3)),
    ("C", 100, date(2025, 1, 4)),
    ("B", 500, date(2025, 1, 4)),
    ("A", 50, date(2025, 1, 4)),
]

sales_columns = ["team", "sales_amount", "sale_date"]

sales_df = spark.createDataFrame(sales_data, sales_columns)

print("\n--- Original Sales Data (Sorted by Date) ---")
sales_df.sort("team", "sale_date").show()


--- Original Sales Data (Sorted by Date) ---
+----+------------+----------+
|team|sales_amount| sale_date|
+----+------------+----------+
|   A|         100|2025-01-01|
|   A|         200|2025-01-02|
|   A|          50|2025-01-02|
|   A|         250|2025-01-03|
|   A|          50|2025-01-04|
|   B|         150|2025-01-01|
|   B|         300|2025-01-02|
|   B|         500|2025-01-04|
|   C|         400|2025-01-03|
|   C|         100|2025-01-04|
+----+------------+----------+



In [None]:
from pyspark.sql.window import Window
from pyspark.sql.functions import sum

window_spec_running_total = Window.partitionBy("Team").orderBy("sale_date").rowsBetween(Window.unboundedPreceding, Window.currentRow)

In [None]:
running_total_df = sales_df.withColumn(
    "cumulative_sales", sum(col("sales_amount")).over(window_spec_running_total)
).sort("team", "sale_date")

print("Running Total :")
running_total_df.show()

Running Total :
+----+------------+----------+----------------+
|team|sales_amount| sale_date|cumulative_sales|
+----+------------+----------+----------------+
|   A|         100|2025-01-01|             100|
|   A|         200|2025-01-02|             300|
|   A|          50|2025-01-02|             350|
|   A|         250|2025-01-03|             600|
|   A|          50|2025-01-04|             650|
|   B|         150|2025-01-01|             150|
|   B|         300|2025-01-02|             450|
|   B|         500|2025-01-04|             950|
|   C|         400|2025-01-03|             400|
|   C|         100|2025-01-04|             500|
+----+------------+----------+----------------+



In [None]:
from pyspark.sql.functions import col, rand
from pyspark.storagelevel import StorageLevel

import time

# mock dataframe
large_sales_df = spark.range(1000000).withColumn("sales_value", rand()*1000)
# large_sales_df.show(5)

# add a column that need complex transformation
large_sales_df = large_sales_df.withColumn(
    "is_high_value",
    (col("sales_value") > 800)
)

print(f"Data frame is ready with {large_sales_df.count()} rows." )
large_sales_df.show(5)

Data frame is ready with 1000000 rows.
+---+------------------+-------------+
| id|       sales_value|is_high_value|
+---+------------------+-------------+
|  0| 946.9472217076882|         true|
|  1| 949.3896769184495|         true|
|  2| 728.7301605284208|        false|
|  3|138.52727335666847|        false|
|  4| 889.7328632486704|         true|
+---+------------------+-------------+
only showing top 5 rows



In [None]:
# without chaching computed data
large_sales_df.unpersist()

start_time = time.time()
large_sales_df.groupBy("is_high_value").count().show(1) # Action 1
time_no_cache_1 = time.time() - start_time

start_time = time.time()
large_sales_df.agg({"sales_value" : "avg"}).show(1) # Action 2
time_no_cache_2 = time.time() - start_time

print(f"time without caching Action 1: {time_no_cache_1:.4f}")
print(f"time without caching Action 2: {time_no_cache_2:.4f}") # lineage reexecuted


+-------------+------+
|is_high_value| count|
+-------------+------+
|         true|199566|
+-------------+------+
only showing top 1 row

+-----------------+
| avg(sales_value)|
+-----------------+
|499.7963839873064|
+-----------------+

time without caching Action 1: 0.6116
time without caching Action 2: 0.3625


In [None]:
# with caching computed data
# caching the dataframe in memory
large_sales_df.persist(StorageLevel.MEMORY_ONLY) # set the cache true

start_time = time.time()
large_sales_df.groupBy()

large_sales_df.groupBy("is_high_value").count().show(1) # Action 1
time_with_cache_1 = time.time() - start_time

start_time = time.time()
large_sales_df.agg({"sales_value": "avg"}).show(1)     # Second Action (Reads from Cache)
time_with_cache_2 = time.time() - start_time

print(f"time without caching Action 1: {time_with_cache_1:.4f}")
print(f"time without caching Action 2: {time_with_cache_2:.4f}") # lineage reexecuted
large_sales_df.unpersist()

+-------------+------+
|is_high_value| count|
+-------------+------+
|         true|199566|
+-------------+------+
only showing top 1 row

+-----------------+
| avg(sales_value)|
+-----------------+
|499.7963839873064|
+-----------------+

time without caching Action 1: 2.1210
time without caching Action 2: 0.5441


DataFrame[id: bigint, sales_value: double, is_high_value: boolean]

In [None]:
from pyspark.sql.functions import broadcast

category_data = [
    (1, "Electronics"),
    (2, "Apparel"),
    (3, "Home Goods"),
    (4, "Media"),
    (5, "Books")
]
category_cols = ["category_id", "category_name"]
small_dim_df = spark.createDataFrame(category_data, category_cols)

# Large DF
large_transaction_df = spark.range(1000000) \
    .withColumnRenamed("id","transaction_id") \
    .withColumn("category_id", (col("transaction_id") % 5) + 1) # Add IDs 1 to 5

print(f"Small DF Count: {small_dim_df.count()}, Large DF Count: {large_transaction_df.count()}")

start_time = time.time()
united_small_large_df = large_transaction_df.join(
    broadcast(small_dim_df),
    on = "category_id",
    how = "inner"
)
united_small_large_df.show(10)
time_broadcast_join = time.time() - start_time

print(f"time with broadcast join: {time_broadcast_join}")

Small DF Count: 5, Large DF Count: 1000000
+-----------+--------------+-------------+
|category_id|transaction_id|category_name|
+-----------+--------------+-------------+
|          1|             0|  Electronics|
|          2|             1|      Apparel|
|          3|             2|   Home Goods|
|          4|             3|        Media|
|          5|             4|        Books|
|          1|             5|  Electronics|
|          2|             6|      Apparel|
|          3|             7|   Home Goods|
|          4|             8|        Media|
|          5|             9|        Books|
+-----------+--------------+-------------+
only showing top 10 rows

time with broadcast join: 0.5385096073150635


In [None]:
# Predicate and Pushdown
output_path = "/content/tmp/patitioned_data"

!rm -rf {output_path}

united_small_large_df.write \
.mode("overwrite") \
.partitionBy("category_name") \
.parquet(output_path)

# Verify the file structure on disk
!ls -l {output_path}
!ls -l {output_path}/category_name=Electronics/


total 20
drwxr-xr-x 2 root root 4096 Nov 17 13:45 'category_name=Apparel'
drwxr-xr-x 2 root root 4096 Nov 17 13:45 'category_name=Books'
drwxr-xr-x 2 root root 4096 Nov 17 13:45 'category_name=Electronics'
drwxr-xr-x 2 root root 4096 Nov 17 13:45 'category_name=Home Goods'
drwxr-xr-x 2 root root 4096 Nov 17 13:45 'category_name=Media'
-rw-r--r-- 1 root root    0 Nov 17 13:45  _SUCCESS
total 792
-rw-r--r-- 1 root root 403534 Nov 17 13:45 part-00000-780ef021-7972-4596-92cb-00b86b455c21.c000.snappy.parquet
-rw-r--r-- 1 root root 403570 Nov 17 13:45 part-00001-780ef021-7972-4596-92cb-00b86b455c21.c000.snappy.parquet


In [None]:
# Query 1: Only interested in books transactions

spark.read.parquet(output_path).filter(col("category_name") == "Books").show(10)
spark.read.parquet(output_path).filter(col("category_name") == "Books").count()

+-----------+--------------+-------------+
|category_id|transaction_id|category_name|
+-----------+--------------+-------------+
|          5|        500004|        Books|
|          5|        500009|        Books|
|          5|        500014|        Books|
|          5|        500019|        Books|
|          5|        500024|        Books|
|          5|        500029|        Books|
|          5|        500034|        Books|
|          5|        500039|        Books|
|          5|        500044|        Books|
|          5|        500049|        Books|
+-----------+--------------+-------------+
only showing top 10 rows



200000