# Practical 5: Apache Spark for Massive Data Processing

## Goals

This practical session introduces Apache Spark, a powerful distributed computing framework designed for large-scale data processing. You will learn how to leverage Spark's capabilities for handling massive datasets that exceed single-machine memory limits.

### Learning Objectives
* Understand Spark architecture: Driver, Executors, and Cluster Manager
* Master RDD (Resilient Distributed Dataset) operations
* Work with DataFrames and understand schema management
* Write efficient Spark SQL queries
* Implement joins, window functions, and complex aggregations
* Optimize performance through partitioning and caching strategies
* Process large-scale datasets with columnar file formats (Parquet, ORC)

### Prerequisites
* Completion of Practical 4 (Parallel and Distributed Computing)
* Understanding of functional programming concepts (map, filter, reduce)
* Basic SQL knowledge
* Python programming fundamentals

### Installation

Install PySpark before starting:
```bash
!pip install pyspark==3.5.3
```

### Exercises Overview

| Exercise | Topic | Difficulty |
|----------|-------|------------|
| 1 | Spark Architecture and RDD Basics | ★ |
| 2 | RDD Transformations and Actions | ★ |
| 3 | DataFrames and Schema Management | ★★ |
| 4 | Spark SQL and Complex Queries | ★★ |
| 5 | Joins, Window Functions, and Aggregations | ★★ |
| 6 | Partitioning, Caching, and Optimization | ★★★ |
| 7 | Processing Large-Scale Datasets | ★★★ |

---

## Exercise 1: Spark Architecture and RDD Basics [★]

### Understanding Spark Architecture

Apache Spark uses a **master-worker architecture**:

1. **Driver Program**: The main program that creates the SparkContext and coordinates execution
2. **Cluster Manager**: Allocates resources (can be Standalone, YARN, Mesos, or Kubernetes)
3. **Executors**: Worker processes that run tasks and store data

```
┌─────────────────┐
│  Driver Program │
│  (SparkContext) │
└────────┬────────┘
         │
         ▼
┌─────────────────┐
│ Cluster Manager │
└────────┬────────┘
         │
    ┌────┴────┐
    ▼         ▼
┌───────┐ ┌───────┐
│Executor│ │Executor│
│ Task   │ │ Task   │
│ Task   │ │ Task   │
└───────┘ └───────┘
```

### RDD (Resilient Distributed Dataset)

RDDs are the fundamental data structure in Spark:
- **Resilient**: Fault-tolerant through lineage information
- **Distributed**: Data is partitioned across multiple nodes
- **Dataset**: Collection of elements that can be operated on in parallel

In [None]:
# Verify PySpark installation (already installed via requirements.txt)
!pip install pyspark==3.5.3

In [None]:
# First, let's verify PySpark installation
from pyspark import SparkConf
from pyspark.context import SparkContext

print("PySpark imported successfully!")

In [None]:
# Create a SparkContext with a local configuration
# 'local[*]' uses all available cores
conf = SparkConf().setAppName("Practical5").setMaster("local[*]")
sc = SparkContext.getOrCreate(conf)

# Display Spark configuration
print(f"Spark Version: {sc.version}")
print(f"Application Name: {sc.appName}")
print(f"Master: {sc.master}")
print(f"Default Parallelism: {sc.defaultParallelism}")

In [None]:
# Creating RDDs - Method 1: From a Python collection (parallelize)
numbers = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
numbers_rdd = sc.parallelize(numbers)

print(f"Type: {type(numbers_rdd)}")
print(f"Number of partitions: {numbers_rdd.getNumPartitions()}")
print(f"First element: {numbers_rdd.first()}")
print(f"All elements: {numbers_rdd.collect()}")

In [None]:
# Creating RDDs - Method 2: From external files
lines_rdd = sc.textFile("../shared_data/pl.csv")

print(f"Number of partitions: {lines_rdd.getNumPartitions()}")
print(f"Number of lines: {lines_rdd.count()}")
print(f"\nFirst 5 lines:")
for line in lines_rdd.take(5):
    print(f"  {line}")

In [None]:
# Creating RDDs with specific number of partitions
# More partitions = more parallelism (but also more overhead)
numbers_4partitions = sc.parallelize(range(1, 101), numSlices=4)
numbers_8partitions = sc.parallelize(range(1, 101), numSlices=8)

print(f"RDD with 4 partitions: {numbers_4partitions.getNumPartitions()}")
print(f"RDD with 8 partitions: {numbers_8partitions.getNumPartitions()}")

# View how data is distributed across partitions
print(f"\nData distribution (4 partitions):")
print(numbers_4partitions.glom().collect())

### Questions - Exercise 1

**Q1.1** Create an RDD containing the squares of numbers from 1 to 1000. Experiment with different numbers of partitions (2, 4, 8, 16) and explain how partitioning affects data distribution.

**Q1.2** Load multiple CSV files from the `shared_data/` directory using wildcard patterns (e.g., `*.csv`). How many total lines are there across all files?

**Q1.3** Explain the difference between creating an RDD with `parallelize()` vs `textFile()`. When would you use each method?

In [None]:
# Your solutions here


---

## Exercise 2: RDD Transformations and Actions [★]

### Transformations vs Actions

Spark operations are divided into two categories:

**Transformations** (Lazy - create new RDDs):
- `map()`, `filter()`, `flatMap()`, `distinct()`
- `union()`, `intersection()`, `subtract()`
- `groupByKey()`, `reduceByKey()`, `sortByKey()`

**Actions** (Eager - return results):
- `collect()`, `count()`, `first()`, `take()`
- `reduce()`, `fold()`, `aggregate()`
- `saveAsTextFile()`, `foreach()`

### Lazy Evaluation

Transformations are **lazy**: Spark builds a DAG (Directed Acyclic Graph) of operations but doesn't execute them until an action is called. This allows Spark to optimize the execution plan.

In [None]:
# Demonstration of lazy evaluation
import time

# This transformation is NOT executed immediately
start = time.time()
large_rdd = sc.parallelize(range(1, 1000001))
squared = large_rdd.map(lambda x: x ** 2)
filtered = squared.filter(lambda x: x % 2 == 0)
print(f"Transformations defined in: {time.time() - start:.4f} seconds")

# The action triggers execution
start = time.time()
result = filtered.count()
print(f"Action executed in: {time.time() - start:.4f} seconds")
print(f"Count of even squares: {result}")

In [None]:
# Map transformation: apply a function to each element
words = sc.parallelize(["hello", "world", "spark", "python"])

# Transform to uppercase
upper_words = words.map(lambda w: w.upper())
print(f"Uppercase: {upper_words.collect()}")

# Transform to (word, length) tuples
word_lengths = words.map(lambda w: (w, len(w)))
print(f"Word lengths: {word_lengths.collect()}")

In [None]:
# Filter transformation: keep elements that satisfy a condition
numbers = sc.parallelize(range(1, 21))

# Keep only even numbers
evens = numbers.filter(lambda x: x % 2 == 0)
print(f"Even numbers: {evens.collect()}")

# Keep numbers divisible by 3
div_by_3 = numbers.filter(lambda x: x % 3 == 0)
print(f"Divisible by 3: {div_by_3.collect()}")

In [None]:
# FlatMap transformation: map that can return multiple elements
sentences = sc.parallelize([
    "Hello World",
    "Apache Spark is powerful",
    "Big Data processing"
])

# Split sentences into words
words = sentences.flatMap(lambda s: s.split())
print(f"All words: {words.collect()}")
print(f"Word count: {words.count()}")

In [None]:
# Reduce action: aggregate elements using a function
numbers = sc.parallelize([1, 2, 3, 4, 5])

# Sum all numbers
total = numbers.reduce(lambda a, b: a + b)
print(f"Sum: {total}")

# Find maximum
maximum = numbers.reduce(lambda a, b: a if a > b else b)
print(f"Maximum: {maximum}")

# Product of all numbers
product = numbers.reduce(lambda a, b: a * b)
print(f"Product: {product}")

In [None]:
# Word Count - The classic Spark example
text = sc.parallelize([
    "Apache Spark is a unified analytics engine",
    "Spark provides high-level APIs in Java Scala Python and R",
    "Spark powers a stack of libraries for SQL streaming and machine learning",
    "Spark runs on Hadoop YARN Mesos Kubernetes and standalone"
])

# Word count pipeline
word_counts = (text
    .flatMap(lambda line: line.lower().split())  # Split into words
    .map(lambda word: (word, 1))                  # Map to (word, 1) pairs
    .reduceByKey(lambda a, b: a + b)              # Sum counts per word
    .sortBy(lambda x: x[1], ascending=False))    # Sort by count

print("Word counts (top 10):")
for word, count in word_counts.take(10):
    print(f"  {word}: {count}")

In [None]:
# Set operations on RDDs
rdd1 = sc.parallelize([1, 2, 3, 4, 5])
rdd2 = sc.parallelize([4, 5, 6, 7, 8])

# Union (all elements from both)
print(f"Union: {rdd1.union(rdd2).collect()}")

# Intersection (common elements)
print(f"Intersection: {rdd1.intersection(rdd2).collect()}")

# Subtract (elements in rdd1 but not in rdd2)
print(f"Subtract: {rdd1.subtract(rdd2).collect()}")

# Distinct (unique elements)
print(f"Distinct union: {rdd1.union(rdd2).distinct().collect()}")

### Questions - Exercise 2

**Q2.1** Load the file `shared_data/pl.csv` and perform the following:
- Count the total number of characters across all lines
- Count the total number of tokens (comma-separated values)
- Find the line with the maximum number of characters

**Q2.2** Download 50 HTML pages from the web. Write a Spark program to:
- Count the total number of `<div>` and `</div>` tags across all files
- Find the page with the most `<a>` (anchor) tags
- Extract and count all unique CSS class names

**Q2.3** Implement a character frequency counter that:
- Reads all text files from a directory
- Counts the frequency of each character (case-insensitive)
- Returns the top 10 most frequent characters

In [None]:
# Your solutions here


---

## Exercise 3: DataFrames and Schema Management [★★]

### DataFrames

DataFrames are a higher-level abstraction built on top of RDDs:
- Organized into named columns (like a table)
- Optimized through Catalyst query optimizer
- Support for structured and semi-structured data
- Better performance than RDDs for most operations

### SparkSession

SparkSession is the entry point for DataFrame operations (introduced in Spark 2.0).

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType, DateType

# Create SparkSession
spark = SparkSession.builder \
    .appName("Practical5-DataFrames") \
    .getOrCreate()

print(f"Spark Session created: {spark.version}")

In [None]:
# Creating DataFrames - Method 1: From Python list with inferred schema
data = [
    ("Python", 1991, "Guido van Rossum"),
    ("Java", 1995, "James Gosling"),
    ("JavaScript", 1995, "Brendan Eich"),
    ("C", 1972, "Dennis Ritchie"),
    ("Rust", 2010, "Graydon Hoare")
]

df_inferred = spark.createDataFrame(data, ["language", "year", "creator"])
df_inferred.show()
df_inferred.printSchema()

In [None]:
# Creating DataFrames - Method 2: With explicit schema
schema = StructType([
    StructField("language", StringType(), nullable=False),
    StructField("year", IntegerType(), nullable=False),
    StructField("creator", StringType(), nullable=True)
])

df_explicit = spark.createDataFrame(data, schema)
df_explicit.show()
df_explicit.printSchema()

In [None]:
# Creating DataFrames - Method 3: From JSON file
df_json = spark.read.json("../shared_data/pl.json")
df_json.show(10)
df_json.printSchema()

In [None]:
# Creating DataFrames - Method 4: From CSV with options
df_csv = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv("../shared_data/pl.csv")

df_csv.show(10)
df_csv.printSchema()

In [None]:
# DataFrame basic operations
df = df_json

# Show first n rows
print("First 5 rows:")
df.show(5)

# Get column names
print(f"\nColumns: {df.columns}")

# Get number of rows and columns
print(f"Shape: ({df.count()}, {len(df.columns)})")

# Describe statistics
print("\nStatistics:")
df.describe().show()

In [None]:
# Column selection and manipulation
from pyspark.sql.functions import col, lit, when, upper, lower, length

# Select specific columns
df.select("languageLabel").show(5)

# Select with column expressions
df.select(
    col("languageLabel"),
    col("year"),
    (col("year") - 1900).alias("years_since_1900")
).show(5)

In [None]:
# Adding and modifying columns
df_modified = df \
    .withColumn("century", ((col("year") / 100) + 1).cast(IntegerType())) \
    .withColumn("language_upper", upper(col("languageLabel"))) \
    .withColumn("name_length", length(col("languageLabel")))

df_modified.show(10)

In [None]:
# Filtering rows
# Languages created after 2000
recent = df.filter(col("year") > 2000)
print(f"Languages after 2000: {recent.count()}")
recent.show(10)

# Multiple conditions
filtered = df.filter((col("year") >= 1990) & (col("year") <= 2000))
print(f"\nLanguages from 1990-2000: {filtered.count()}")
filtered.show()

In [None]:
# GroupBy and aggregations
from pyspark.sql.functions import count, avg, min as spark_min, max as spark_max, sum as spark_sum

# Count languages per year
df.groupBy("year") \
    .count() \
    .orderBy(col("count").desc()) \
    .show(10)

In [None]:
# Multiple aggregations
df_modified.groupBy("century") \
    .agg(
        count("*").alias("num_languages"),
        spark_min("year").alias("earliest_year"),
        spark_max("year").alias("latest_year"),
        avg("name_length").alias("avg_name_length")
    ) \
    .orderBy("century") \
    .show()

### Questions - Exercise 3

**Q3.1** Query Wikidata to download information about all software applications including: name, release date, developer, and programming language used. Load this data into a Spark DataFrame with an explicit schema.

**Q3.2** Using the programming languages DataFrame:
- Add a column categorizing languages as "Early" (before 1980), "Classic" (1980-2000), or "Modern" (after 2000)
- Calculate statistics (count, min year, max year) for each category
- Find the decade with the most language releases

**Q3.3** Create a DataFrame from a CSV file with proper handling of:
- Missing values (nulls)
- Date parsing
- Custom delimiters
- Escaped characters

In [None]:
# Your solutions here


---

## Exercise 4: Spark SQL and Complex Queries [★★]

### SQL Interface

Spark SQL allows you to run SQL queries directly on DataFrames using temporary views. This is useful for:
- Complex queries that are easier to express in SQL
- Interoperability with SQL-based tools
- Familiarity for SQL users

In [None]:
# Load data and create a temporary view
df = spark.read.json("../shared_data/pl.json")
df.createOrReplaceTempView("languages")

# Basic SELECT query
spark.sql("SELECT * FROM languages LIMIT 10").show()

In [None]:
# Filtering with WHERE clause
spark.sql("""
    SELECT languageLabel, year 
    FROM languages 
    WHERE year >= 1990 AND year <= 2000
    ORDER BY year
""").show()

In [None]:
# Aggregation queries
spark.sql("""
    SELECT 
        year,
        COUNT(*) as language_count
    FROM languages
    GROUP BY year
    HAVING COUNT(*) > 3
    ORDER BY language_count DESC
""").show(10)

In [None]:
# Using CASE expressions
spark.sql("""
    SELECT 
        languageLabel,
        year,
        CASE 
            WHEN year < 1980 THEN 'Pioneer Era'
            WHEN year < 1990 THEN '1980s'
            WHEN year < 2000 THEN '1990s'
            WHEN year < 2010 THEN '2000s'
            ELSE '2010s+'
        END as era
    FROM languages
    ORDER BY year
""").show(20)

In [None]:
# Subqueries
spark.sql("""
    SELECT languageLabel, year
    FROM languages
    WHERE year = (
        SELECT MAX(year) FROM languages
    )
""").show()

In [None]:
# Common Table Expressions (CTEs)
spark.sql("""
    WITH decade_stats AS (
        SELECT 
            FLOOR(year / 10) * 10 as decade,
            COUNT(*) as count
        FROM languages
        GROUP BY FLOOR(year / 10) * 10
    )
    SELECT 
        decade,
        count,
        ROUND(count * 100.0 / SUM(count) OVER(), 2) as percentage
    FROM decade_stats
    ORDER BY decade
""").show()

In [None]:
# Create additional tables for join examples
# Paradigm data
paradigms_data = [
    ("Python", "Multi-paradigm"),
    ("Java", "Object-oriented"),
    ("JavaScript", "Multi-paradigm"),
    ("Haskell", "Functional"),
    ("C", "Procedural"),
    ("Rust", "Multi-paradigm"),
    ("Lisp", "Functional"),
    ("Prolog", "Logic")
]

paradigms_df = spark.createDataFrame(paradigms_data, ["language", "paradigm"])
paradigms_df.createOrReplaceTempView("paradigms")

# Typing data
typing_data = [
    ("Python", "Dynamic", "Strong"),
    ("Java", "Static", "Strong"),
    ("JavaScript", "Dynamic", "Weak"),
    ("C", "Static", "Weak"),
    ("Rust", "Static", "Strong"),
    ("Haskell", "Static", "Strong")
]

typing_df = spark.createDataFrame(typing_data, ["language", "typing", "type_safety"])
typing_df.createOrReplaceTempView("typing")

print("Paradigms:")
paradigms_df.show()
print("Typing:")
typing_df.show()

In [None]:
# INNER JOIN
spark.sql("""
    SELECT p.language, p.paradigm, t.typing, t.type_safety
    FROM paradigms p
    INNER JOIN typing t ON p.language = t.language
""").show()

In [None]:
# LEFT JOIN
spark.sql("""
    SELECT p.language, p.paradigm, t.typing
    FROM paradigms p
    LEFT JOIN typing t ON p.language = t.language
""").show()

### Questions - Exercise 4

**Q4.1** Using the languages view, write SQL queries to:
- Find all languages released in the same year as Python (1991)
- Calculate the average number of languages released per decade
- Find years where more than 5 languages were released

**Q4.2** Create two new views from Wikidata:
- Software applications with their developers
- Developers with their countries
Write a query joining these tables to show software grouped by country.

**Q4.3** Using window functions, write queries to:
- Rank languages by year within each decade
- Calculate the running total of languages released over time
- Find the first and last language released each decade

In [None]:
# Your solutions here


---

## Exercise 5: Joins, Window Functions, and Aggregations [★★]

### Advanced DataFrame Operations

This exercise covers more complex operations that are essential for real-world data processing:
- Different types of joins
- Window functions for analytics
- Complex aggregations

In [None]:
from pyspark.sql.functions import col, row_number, rank, dense_rank, lag, lead, sum as spark_sum
from pyspark.sql.window import Window

# Create sample sales data
sales_data = [
    ("2024-01-15", "Electronics", "Laptop", 1200.00, 5),
    ("2024-01-15", "Electronics", "Phone", 800.00, 10),
    ("2024-01-16", "Electronics", "Laptop", 1200.00, 3),
    ("2024-01-16", "Clothing", "Shirt", 50.00, 20),
    ("2024-01-17", "Electronics", "Tablet", 500.00, 8),
    ("2024-01-17", "Clothing", "Pants", 80.00, 15),
    ("2024-01-18", "Clothing", "Shirt", 50.00, 25),
    ("2024-01-18", "Electronics", "Phone", 800.00, 12),
    ("2024-01-19", "Books", "Fiction", 25.00, 30),
    ("2024-01-19", "Books", "Technical", 60.00, 10),
]

sales_schema = ["date", "category", "product", "price", "quantity"]
sales_df = spark.createDataFrame(sales_data, sales_schema)

# Add calculated column
sales_df = sales_df.withColumn("revenue", col("price") * col("quantity"))
sales_df.show()

In [None]:
# Window function: Row number within each category
window_spec = Window.partitionBy("category").orderBy(col("revenue").desc())

ranked_sales = sales_df.withColumn("rank_in_category", row_number().over(window_spec))
ranked_sales.show()

In [None]:
# Different ranking functions
window_spec = Window.partitionBy("category").orderBy(col("revenue").desc())

sales_df.select(
    "category",
    "product",
    "revenue",
    row_number().over(window_spec).alias("row_number"),
    rank().over(window_spec).alias("rank"),
    dense_rank().over(window_spec).alias("dense_rank")
).show()

In [None]:
# Running totals with window functions
window_running = Window.partitionBy("category").orderBy("date").rowsBetween(Window.unboundedPreceding, Window.currentRow)

sales_df.select(
    "date",
    "category",
    "revenue",
    spark_sum("revenue").over(window_running).alias("running_total")
).orderBy("category", "date").show()

In [None]:
# Lag and Lead functions (previous/next row values)
window_ordered = Window.partitionBy("category").orderBy("date")

sales_df.select(
    "date",
    "category",
    "revenue",
    lag("revenue", 1).over(window_ordered).alias("prev_revenue"),
    lead("revenue", 1).over(window_ordered).alias("next_revenue")
).orderBy("category", "date").show()

In [None]:
# Complex aggregations with pivot
pivot_df = sales_df.groupBy("date").pivot("category").sum("revenue")
pivot_df.show()

In [None]:
# Multiple aggregations at once
from pyspark.sql.functions import round as spark_round

sales_df.groupBy("category").agg(
    count("*").alias("num_transactions"),
    spark_round(avg("revenue"), 2).alias("avg_revenue"),
    spark_round(spark_min("revenue"), 2).alias("min_revenue"),
    spark_round(spark_max("revenue"), 2).alias("max_revenue"),
    spark_round(spark_sum("revenue"), 2).alias("total_revenue")
).show()

In [None]:
# Create additional DataFrames for join examples
# Customer data
customers_data = [
    (1, "Alice", "Paris"),
    (2, "Bob", "London"),
    (3, "Charlie", "Berlin"),
    (4, "Diana", "Madrid")
]
customers_df = spark.createDataFrame(customers_data, ["customer_id", "name", "city"])

# Orders data
orders_data = [
    (101, 1, "2024-01-15", 150.00),
    (102, 1, "2024-01-16", 200.00),
    (103, 2, "2024-01-15", 300.00),
    (104, 3, "2024-01-17", 450.00),
    (105, 5, "2024-01-18", 100.00)  # Customer 5 doesn't exist
]
orders_df = spark.createDataFrame(orders_data, ["order_id", "customer_id", "order_date", "amount"])

print("Customers:")
customers_df.show()
print("Orders:")
orders_df.show()

In [None]:
# Different types of joins

# Inner join (only matching rows)
print("INNER JOIN:")
customers_df.join(orders_df, "customer_id", "inner").show()

# Left join (all customers, matching orders)
print("LEFT JOIN:")
customers_df.join(orders_df, "customer_id", "left").show()

# Right join (all orders, matching customers)
print("RIGHT JOIN:")
customers_df.join(orders_df, "customer_id", "right").show()

# Full outer join (all rows from both)
print("FULL OUTER JOIN:")
customers_df.join(orders_df, "customer_id", "outer").show()

In [None]:
# Anti join (rows from left that don't match right)
print("Customers without orders (LEFT ANTI):")
customers_df.join(orders_df, "customer_id", "left_anti").show()

# Semi join (rows from left that have a match in right, but don't include right columns)
print("Customers with orders (LEFT SEMI):")
customers_df.join(orders_df, "customer_id", "left_semi").show()

### Questions - Exercise 5

**Q5.1** Using the sales data, calculate:
- The percentage of total revenue contributed by each category
- The top-selling product in each category
- Daily revenue growth rate (percentage change from previous day)

**Q5.2** Create a customer segmentation based on their total spending:
- "Bronze": total spending < 200
- "Silver": total spending 200-500
- "Gold": total spending > 500
Show the count of customers in each segment.

**Q5.3** Download data from Wikidata about:
- Countries and their populations
- Cities and their countries
- Universities and their cities
Perform joins to find the top 10 countries by number of universities, normalized by population.

In [None]:
# Your solutions here


---

## Exercise 6: Partitioning, Caching, and Optimization [★★★]

### Performance Optimization

Spark performance depends heavily on:
1. **Partitioning**: How data is distributed across nodes
2. **Caching**: Keeping frequently accessed data in memory
3. **Avoiding shuffles**: Minimizing data movement between nodes
4. **Broadcast variables**: Efficiently sharing small data across nodes

In [None]:
# Understanding partitions
large_df = spark.range(1000000)  # 1 million rows

print(f"Default partitions: {large_df.rdd.getNumPartitions()}")

# Repartition to specific number
repartitioned = large_df.repartition(8)
print(f"After repartition(8): {repartitioned.rdd.getNumPartitions()}")

# Coalesce (reduce partitions without full shuffle)
coalesced = large_df.coalesce(4)
print(f"After coalesce(4): {coalesced.rdd.getNumPartitions()}")

In [None]:
# Partition by column (useful for filtering)
sales_partitioned = sales_df.repartition(4, "category")
print(f"Partitions: {sales_partitioned.rdd.getNumPartitions()}")

# View partition contents
def show_partition_info(df):
    partitions = df.rdd.glom().collect()
    for i, partition in enumerate(partitions):
        print(f"Partition {i}: {len(partition)} rows")

show_partition_info(sales_partitioned)

In [None]:
# Caching DataFrames
from pyspark import StorageLevel

# Create a large DataFrame
large_df = spark.range(100000).withColumn("squared", col("id") ** 2)

# Without caching - multiple computations
import time

start = time.time()
count1 = large_df.filter(col("squared") > 1000).count()
count2 = large_df.filter(col("squared") > 2000).count()
count3 = large_df.filter(col("squared") > 3000).count()
print(f"Without caching: {time.time() - start:.4f}s")

# With caching
large_df.cache()  # or large_df.persist(StorageLevel.MEMORY_ONLY)

# First action triggers caching
_ = large_df.count()

start = time.time()
count1 = large_df.filter(col("squared") > 1000).count()
count2 = large_df.filter(col("squared") > 2000).count()
count3 = large_df.filter(col("squared") > 3000).count()
print(f"With caching: {time.time() - start:.4f}s")

# Unpersist when done
large_df.unpersist()

In [None]:
# Storage levels
print("Available storage levels:")
print(f"  MEMORY_ONLY: {StorageLevel.MEMORY_ONLY}")
print(f"  MEMORY_AND_DISK: {StorageLevel.MEMORY_AND_DISK}")
print(f"  DISK_ONLY: {StorageLevel.DISK_ONLY}")
print(f"  MEMORY_ONLY_2: {StorageLevel.MEMORY_ONLY_2}")       # 2x replication
print(f"  MEMORY_AND_DISK_2: {StorageLevel.MEMORY_AND_DISK_2}") # 2x replication
print(f"  OFF_HEAP: {StorageLevel.OFF_HEAP}")

# Note: MEMORY_ONLY_SER / MEMORY_AND_DISK_SER exist in Scala/Java but not in PySpark.
# In PySpark, data is already serialized (pickled), so the _SER distinction does not apply.

In [None]:
# Broadcast variables for small datasets in joins
from pyspark.sql.functions import broadcast

# Small lookup table
lookup_data = [("A", "Category A"), ("B", "Category B"), ("C", "Category C")]
lookup_df = spark.createDataFrame(lookup_data, ["code", "description"])

# Large fact table
fact_data = [(i, ["A", "B", "C"][i % 3], i * 10) for i in range(10000)]
fact_df = spark.createDataFrame(fact_data, ["id", "code", "value"])

# Normal join
start = time.time()
result1 = fact_df.join(lookup_df, "code")
_ = result1.count()
print(f"Normal join: {time.time() - start:.4f}s")

# Broadcast join (small table is broadcast to all nodes)
start = time.time()
result2 = fact_df.join(broadcast(lookup_df), "code")
_ = result2.count()
print(f"Broadcast join: {time.time() - start:.4f}s")

In [None]:
# Explain query execution plan
df = sales_df.filter(col("revenue") > 100).groupBy("category").sum("revenue")

# Simple explanation
print("=== Simple Explain ===")
df.explain()

# Extended explanation
print("\n=== Extended Explain ===")
df.explain(extended=True)

In [None]:
# Predicate pushdown demonstration
# When reading from files, Spark can push filters to the data source

# Write sample data to parquet
sales_df.write.mode("overwrite").parquet("sales_data.parquet")

# Reading with filter - Spark will only read necessary data
filtered = spark.read.parquet("sales_data.parquet").filter(col("category") == "Electronics")
print("Query plan with predicate pushdown:")
filtered.explain()

In [None]:
# Column pruning - only read necessary columns
# Select specific columns before applying transformations

# Inefficient: reads all columns
all_cols = spark.read.parquet("sales_data.parquet")
result_all = all_cols.filter(col("revenue") > 1000).select("product", "revenue")

# Efficient: only reads needed columns
selected = spark.read.parquet("sales_data.parquet").select("product", "revenue")
result_selected = selected.filter(col("revenue") > 1000)

print("Both approaches produce the same result:")
result_all.show()
result_selected.show()

### Questions - Exercise 6

**Q6.1** Create a DataFrame with 10 million rows. Compare the performance of:
- Running the same aggregation 5 times without caching
- Running it 5 times with caching
- Using different storage levels (MEMORY_ONLY vs MEMORY_AND_DISK)

**Q6.2** Demonstrate the impact of partitioning on join performance:
- Create two large DataFrames (1 million rows each)
- Join them with different partitioning strategies
- Compare execution times and explain the differences

**Q6.3** Analyze query plans:
- Write a complex query with filters, joins, and aggregations
- Use `explain()` to understand the execution plan
- Optimize the query based on the plan analysis
- Compare before/after performance

In [None]:
# Your solutions here


---

## Exercise 7: Processing Large-Scale Datasets [★★★]

### Columnar File Formats

For large-scale data processing, columnar formats like Parquet and ORC offer significant advantages:
- **Efficient compression**: Similar values stored together compress better
- **Column pruning**: Read only needed columns
- **Predicate pushdown**: Filter at the storage level
- **Schema evolution**: Add/remove columns without rewriting data

In [None]:
# Generate a larger dataset for demonstration
import random
from pyspark.sql.functions import rand, randn, floor, concat, lit

# Create a larger sales dataset
large_sales = spark.range(100000) \
    .withColumn("date", concat(lit("2024-"), 
                               ((floor(rand() * 12) + 1).cast("string")), 
                               lit("-"), 
                               ((floor(rand() * 28) + 1).cast("string")))) \
    .withColumn("category", 
                when(rand() < 0.3, "Electronics")
                .when(rand() < 0.6, "Clothing")
                .otherwise("Books")) \
    .withColumn("price", floor(rand() * 1000) + 10) \
    .withColumn("quantity", floor(rand() * 50) + 1) \
    .withColumn("revenue", col("price") * col("quantity"))

print(f"Generated {large_sales.count()} rows")
large_sales.show(5)

In [None]:
# Write to different formats and compare sizes
import os

# Write as CSV
large_sales.write.mode("overwrite").option("header", "true").csv("large_sales_csv")

# Write as Parquet (default compression: snappy)
large_sales.write.mode("overwrite").parquet("large_sales_parquet")

# Write as Parquet with gzip compression
large_sales.write.mode("overwrite").option("compression", "gzip").parquet("large_sales_parquet_gzip")

# Write as ORC
large_sales.write.mode("overwrite").orc("large_sales_orc")

In [None]:
# Compare file sizes (simplified - actual implementation depends on file system)
def get_folder_size(path):
    """Calculate total size of files in a folder"""
    total = 0
    if os.path.exists(path):
        for root, dirs, files in os.walk(path):
            for f in files:
                total += os.path.getsize(os.path.join(root, f))
    return total

csv_size = get_folder_size("large_sales_csv")
parquet_size = get_folder_size("large_sales_parquet")
parquet_gzip_size = get_folder_size("large_sales_parquet_gzip")
orc_size = get_folder_size("large_sales_orc")

print(f"CSV size: {csv_size / 1024:.2f} KB")
print(f"Parquet (snappy) size: {parquet_size / 1024:.2f} KB")
print(f"Parquet (gzip) size: {parquet_gzip_size / 1024:.2f} KB")
print(f"ORC size: {orc_size / 1024:.2f} KB")

if csv_size > 0:
    print(f"\nCompression ratios vs CSV:")
    print(f"  Parquet (snappy): {csv_size / parquet_size:.2f}x")
    print(f"  Parquet (gzip): {csv_size / parquet_gzip_size:.2f}x")
    print(f"  ORC: {csv_size / orc_size:.2f}x")

In [None]:
# Compare read performance
import time

def benchmark_read(path, format_type):
    start = time.time()
    if format_type == "csv":
        df = spark.read.option("header", "true").option("inferSchema", "true").csv(path)
    elif format_type == "parquet":
        df = spark.read.parquet(path)
    else:
        df = spark.read.orc(path)
    count = df.count()
    return time.time() - start

print("Read performance (seconds):")
print(f"  CSV: {benchmark_read('large_sales_csv', 'csv'):.4f}s")
print(f"  Parquet: {benchmark_read('large_sales_parquet', 'parquet'):.4f}s")
print(f"  ORC: {benchmark_read('large_sales_orc', 'orc'):.4f}s")

In [None]:
# Partitioned writes - partition by category
large_sales.write.mode("overwrite").partitionBy("category").parquet("large_sales_partitioned")

# List the partition directories
for item in os.listdir("large_sales_partitioned"):
    if os.path.isdir(os.path.join("large_sales_partitioned", item)):
        print(f"Partition: {item}")

In [None]:
# Reading partitioned data - partition pruning
# Only reads the Electronics partition
electronics = spark.read.parquet("large_sales_partitioned").filter(col("category") == "Electronics")

print("Query plan with partition pruning:")
electronics.explain()

In [None]:
# Schema evolution with Parquet
# Original data
original_data = [(1, "A", 100), (2, "B", 200)]
original_df = spark.createDataFrame(original_data, ["id", "code", "value"])
original_df.write.mode("overwrite").parquet("schema_evolution_test")

# New data with additional column
new_data = [(3, "C", 300, "new_info"), (4, "D", 400, "more_info")]
new_df = spark.createDataFrame(new_data, ["id", "code", "value", "extra"])
new_df.write.mode("append").option("mergeSchema", "true").parquet("schema_evolution_test")

# Read with merged schema
merged = spark.read.option("mergeSchema", "true").parquet("schema_evolution_test")
print("Merged schema:")
merged.printSchema()
merged.show()

In [None]:
# Working with multiple data sources
# Example: Join data from different files

# Write some reference data
categories = [("Electronics", "Tech products", 0.1), 
              ("Clothing", "Apparel", 0.05), 
              ("Books", "Reading materials", 0.0)]
categories_df = spark.createDataFrame(categories, ["category", "description", "tax_rate"])
categories_df.write.mode("overwrite").parquet("categories_ref")

# Read and join
sales = spark.read.parquet("large_sales_parquet")
categories = spark.read.parquet("categories_ref")

enriched = sales.join(broadcast(categories), "category")
enriched = enriched.withColumn("tax", col("revenue") * col("tax_rate"))
enriched = enriched.withColumn("total", col("revenue") + col("tax"))

print("Enriched sales data:")
enriched.select("id", "category", "revenue", "tax_rate", "tax", "total").show(10)

In [None]:
# Clean up temporary files
import shutil

cleanup_dirs = [
    "large_sales_csv", "large_sales_parquet", "large_sales_parquet_gzip",
    "large_sales_orc", "large_sales_partitioned", "schema_evolution_test",
    "categories_ref", "sales_data.parquet", "languages.orc", "languages.parquet", "languages.csv"
]

for d in cleanup_dirs:
    if os.path.exists(d):
        shutil.rmtree(d) if os.path.isdir(d) else os.remove(d)
        print(f"Cleaned up: {d}")

### Questions - Exercise 7

**Q7.1** Download a large dataset (at least 1 million rows) from a public source (e.g., NYC Taxi data, Wikipedia page views):
- Load it into Spark
- Save in CSV, Parquet, and ORC formats
- Compare file sizes and read/write times
- Test query performance on each format

**Q7.2** Implement an ETL (Extract, Transform, Load) pipeline:
- Read data from multiple source files
- Clean and transform the data (handle nulls, normalize values)
- Join with reference data
- Write to partitioned Parquet files
- Measure and optimize performance

**Q7.3** Create a data warehouse star schema:
- Design fact and dimension tables
- Generate realistic synthetic data (10 million+ rows)
- Implement common analytical queries
- Optimize using partitioning, caching, and broadcast joins
- Document performance metrics

In [None]:
# Your solutions here


---

## Summary

In this practical, you learned:

1. **Spark Architecture**: Understanding drivers, executors, and cluster managers
2. **RDDs**: Creating and manipulating resilient distributed datasets
3. **Transformations vs Actions**: Lazy evaluation and optimization
4. **DataFrames**: Structured data processing with schemas
5. **Spark SQL**: Querying data using SQL syntax
6. **Advanced Operations**: Joins, window functions, aggregations
7. **Performance Optimization**: Partitioning, caching, broadcast variables
8. **File Formats**: Working with Parquet, ORC, and CSV at scale

### Key Takeaways

- Use DataFrames over RDDs when possible for better optimization
- Cache intermediate results that are used multiple times
- Use columnar formats (Parquet/ORC) for large datasets
- Partition data by frequently filtered columns
- Use broadcast joins for small lookup tables
- Monitor query plans with `explain()` to identify bottlenecks

### Further Reading

- [Apache Spark Documentation](https://spark.apache.org/docs/latest/)
- [PySpark API Reference](https://spark.apache.org/docs/latest/api/python/)
- [Spark: The Definitive Guide](https://www.oreilly.com/library/view/spark-the-definitive/9781491912201/)
- [Learning Spark, 2nd Edition](https://www.oreilly.com/library/view/learning-spark-2nd/9781492050032/)

In [None]:
# Stop Spark session when done
spark.stop()
sc.stop()
print("Spark session stopped.")