<a href="https://colab.research.google.com/github/jcims123/spark_in_colab/blob/main/20250802_spark_colab_transformations_actions.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# PySpark Technical Interview Preparation Guide for Google Colab (2025)
## 🚀 Quick Google Colab Setup for PySpark
### 1. Complete PySpark Installation (Latest Version 3.5+)

In [1]:
# ===== CORRECTED PYSPARK SETUP FOR GOOGLE COLAB =====

# Step 1: Install Java (required for Spark)
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

# Step 2: Download Apache Spark 3.5.0 (stable version that works reliably)
!wget -q https://archive.apache.org/dist/spark/spark-3.5.0/spark-3.5.0-bin-hadoop3.tgz
!tar xf spark-3.5.0-bin-hadoop3.tgz

# Step 3: Install findspark
!pip install findspark

# Step 4: Set environment variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.0-bin-hadoop3"

# Step 5: Initialize Spark
import findspark
findspark.init()

print("✅ PySpark 3.5.0 installed successfully!")

Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl.metadata (352 bytes)
Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1
✅ PySpark 3.5.0 installed successfully!


### 2. Create Optimized Spark Session

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import pyspark.sql.functions as F

# Create optimized Spark session for interviews
spark = SparkSession.builder \
    .master("local[*]") \
    .appName("InterviewPrep") \
    .config("spark.driver.memory", "8g") \
    .config("spark.driver.maxResultSize", "2g") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .getOrCreate()

print(f"✅ Spark {spark.version} session created!")
print(f"🔧 Using {spark.sparkContext.defaultParallelism} cores")

✅ Spark 3.5.0 session created!
🔧 Using 2 cores


# Complete PySpark Transformations vs Actions Guide

## 🔍 Core Concept

**TRANSFORMATIONS** are **LAZY** - they build a computation graph (DAG) but don't execute until an **ACTION** is called.
**ACTIONS** are **EAGER** - they trigger the execution of all transformations in the lineage and return results.

---

## 📊 All PySpark Transformations (Lazy Operations)

### Basic Transformations Table

| Transformation | Purpose | Returns | Example |
|----------------|---------|---------|---------|
| `select()` | Choose specific columns | DataFrame | `df.select("name", "salary")` |
| `filter()` / `where()` | Filter rows by condition | DataFrame | `df.filter(col("age") > 25)` |
| `withColumn()` | Add/modify a column | DataFrame | `df.withColumn("bonus", col("salary") * 0.1)` |
| `withColumnRenamed()` | Rename a column | DataFrame | `df.withColumnRenamed("old_name", "new_name")` |
| `drop()` | Remove columns | DataFrame | `df.drop("unwanted_col")` |
| `distinct()` | Remove duplicate rows | DataFrame | `df.distinct()` |
| `dropDuplicates()` | Remove duplicates by columns | DataFrame | `df.dropDuplicates(["name", "dept"])` |

### Aggregation Transformations Table

| Transformation | Purpose | Returns | Example |
|----------------|---------|---------|---------|
| `groupBy()` | Group rows by columns | GroupedData | `df.groupBy("department")` |
| `agg()` | Apply aggregation functions | DataFrame | `df.groupBy("dept").agg(avg("salary"))` |
| `orderBy()` / `sort()` | Sort DataFrame | DataFrame | `df.orderBy("salary")` |
| `limit()` | Limit number of rows | DataFrame | `df.limit(10)` |

### Join & Union Transformations Table

| Transformation | Purpose | Returns | Example |
|----------------|---------|---------|---------|
| `join()` | Join two DataFrames | DataFrame | `df1.join(df2, "id")` |
| `union()` | Union two DataFrames | DataFrame | `df1.union(df2)` |
| `unionByName()` | Union by column names | DataFrame | `df1.unionByName(df2)` |
| `intersect()` | Find common rows | DataFrame | `df1.intersect(df2)` |
| `except()` / `subtract()` | Subtract rows | DataFrame | `df1.except(df2)` |

### Data Manipulation Transformations Table

| Transformation | Purpose | Returns | Example |
|----------------|---------|---------|---------|
| `cast()` | Change column data type | Column | `col("age").cast("string")` |
| `alias()` | Give column an alias | Column | `col("salary").alias("pay")` |
| `when()` / `otherwise()` | Conditional logic | Column | `when(col("age") > 30, "Senior")` |
| `coalesce()` | Reduce number of partitions | DataFrame | `df.coalesce(2)` |
| `repartition()` | Change number of partitions | DataFrame | `df.repartition(4)` |
| `sample()` | Sample fraction of data | DataFrame | `df.sample(0.1)` |

### Advanced Transformations Table

| Transformation | Purpose | Returns | Example |
|----------------|---------|---------|---------|
| `explode()` | Explode array/map column | Column | `df.select(explode("array_col"))` |
| `pivot()` | Pivot table transformation | DataFrame | `df.groupBy("A").pivot("B").sum("C")` |
| `unpivot()` | Unpivot transformation | DataFrame | `df.unpivot(["A"], ["B", "C"])` |
| `withColumn()` + window | Window functions | DataFrame | `df.withColumn("rank", row_number().over(window))` |

---

## ⚡ All PySpark Actions (Eager Operations)

### Data Collection Actions Table

| Action | Purpose | Returns | Performance Impact |
|--------|---------|---------|-------------------|
| `show()` | Display DataFrame rows | None | Low (only fetches displayed rows) |
| `collect()` | Bring all data to driver | List[Row] | **HIGH** - Avoid for large data |
| `take(n)` | Take first n rows | List[Row] | Low (only n rows) |
| `head(n)` | Same as take(n) | List[Row] | Low (only n rows) |
| `first()` | Get first row | Row | Low (only 1 row) |
| `tail(n)` | Get last n rows | List[Row] | Medium (requires sorting) |

### Counting & Statistical Actions Table

| Action | Purpose | Returns | Performance Impact |
|--------|---------|---------|-------------------|
| `count()` | Count total rows | int | Medium (scans all data) |
| `describe()` | Statistical summary | DataFrame | Medium (computes stats) |
| `summary()` | Extended statistics | DataFrame | Medium (computes stats) |
| `min()` | Minimum value of column | Row | Medium (scans column) |
| `max()` | Maximum value of column | Row | Medium (scans column) |

### Data Output Actions Table

| Action | Purpose | Returns | Performance Impact |
|--------|---------|---------|-------------------|
| `write()` | Write to storage | DataFrameWriter | Medium-High (depends on size) |
| `save()` | Save DataFrame | None | Medium-High (depends on size) |
| `saveAsTable()` | Save as Hive table | None | Medium-High (depends on size) |

### Advanced Actions Table

| Action | Purpose | Returns | Performance Impact |
|--------|---------|---------|-------------------|
| `foreach()` | Apply function to each row | None | High (processes all rows) |
| `foreachPartition()` | Apply function to each partition | None | Medium-High (partition-wise) |
| `reduce()` | Reduce rows to single value | Any | High (processes all data) |

---

## 💻 Complete Code Examples

In [3]:
# ===============================================
# COMPLETE PYSPARK TRANSFORMATIONS VS ACTIONS
# ===============================================

# Setup
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window
import time

# Create Spark session
spark = SparkSession.builder.appName("TransformationsVsActions").getOrCreate()

# Sample employee data
employees_data = [
    (1, "John", "Engineering", 75000, "2020-01-15", ["Python", "Spark"]),
    (2, "Alice", "Marketing", 65000, "2019-03-20", ["SQL", "Tableau"]),
    (3, "Bob", "Engineering", 80000, "2021-06-10", ["Java", "Scala"]),
    (4, "Carol", "Sales", 70000, "2020-11-05", ["Excel", "PowerBI"]),
    (5, "David", "Engineering", 85000, "2018-08-12", ["Python", "Java"]),
    (6, "Eve", "Marketing", 68000, "2021-02-28", ["SQL", "Python"]),
    (7, "Frank", "Sales", 72000, "2019-12-15", ["Salesforce"])
]

schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("name", StringType(), True),
    StructField("department", StringType(), True),
    StructField("salary", IntegerType(), True),
    StructField("hire_date", StringType(), True),
    StructField("skills", ArrayType(StringType()), True)
])

df = spark.createDataFrame(employees_data, schema)
print("=== Original DataFrame ===")
df.show()

# ================================================
# TRANSFORMATIONS (LAZY OPERATIONS) - BUILD GRAPH
# ================================================

print("\n🔄 ALL TRANSFORMATIONS (LAZY - NO EXECUTION)")

# 1. BASIC TRANSFORMATIONS
print("--- Basic Transformations ---")
select_trans = df.select("name", "department", "salary")
filter_trans = df.filter(col("salary") > 70000)
where_trans = df.where(col("department") == "Engineering")
bonus_trans = df.withColumn("bonus", col("salary") * 0.1)
category_trans = df.withColumn("salary_category",
                              when(col("salary") > 75000, "High")
                              .when(col("salary") > 65000, "Medium")
                              .otherwise("Low"))
renamed_trans = df.withColumnRenamed("department", "dept")
dropped_trans = df.drop("hire_date")
distinct_trans = df.select("department").distinct()
print("✅ select(), filter(), where(), withColumn(), withColumnRenamed(), drop(), distinct() - ALL LAZY")

# 2. AGGREGATION TRANSFORMATIONS
print("\n--- Aggregation Transformations ---")
dept_stats = df.groupBy("department").agg(
    avg("salary").alias("avg_salary"),
    count("*").alias("employee_count"),
    min("salary").alias("min_salary"),
    max("salary").alias("max_salary")
)
sorted_trans = df.orderBy(desc("salary"))
sort_trans = df.sort("name")
limited_trans = df.limit(3)
print("✅ groupBy(), agg(), orderBy(), sort(), limit() - ALL LAZY")

# 3. JOIN & UNION TRANSFORMATIONS
print("\n--- Join & Union Transformations ---")
projects_data = [(101, "Project Alpha", 1), (102, "Project Beta", 3), (103, "Project Gamma", 5)]
projects_df = spark.createDataFrame(projects_data, ["project_id", "project_name", "lead_id"])

inner_join_trans = df.join(projects_df, df.id == projects_df.lead_id, "inner")
left_join_trans = df.join(projects_df, df.id == projects_df.lead_id, "left")
broadcast_join_trans = df.join(broadcast(projects_df), df.id == projects_df.lead_id, "inner")

new_employees = spark.createDataFrame([(8, "Grace", "HR", 60000, "2022-01-10", ["Excel"])], schema)
union_trans = df.union(new_employees)
intersect_trans = df.select("department").intersect(projects_df.select(lit("Engineering").alias("department")))
except_trans = df.select("name").exceptAll(new_employees.select("name"))
print("✅ join(), union(), intersect(), exceptAll(), broadcast() - ALL LAZY")

# 4. ADVANCED TRANSFORMATIONS
print("\n--- Advanced Transformations ---")
exploded_skills = df.select("name", explode("skills").alias("skill"))

window_spec = Window.partitionBy("department").orderBy(desc("salary"))
windowed_trans = df.withColumn("dept_rank", row_number().over(window_spec))

pivot_trans = df.groupBy("department").pivot("name").agg(first("salary"))
repartitioned_trans = df.repartition(4, "department")
coalesced_trans = df.coalesce(2)
sampled_trans = df.sample(0.5, seed=42)

# Date transformations
date_trans = df.withColumn("hire_year", year(to_date("hire_date", "yyyy-MM-dd")))

# Null handling transformations
null_filled_trans = df.fillna({"salary": 0, "department": "Unknown"})
null_dropped_trans = df.dropna(subset=["name"])
print("✅ explode(), window functions, pivot(), repartition(), coalesce(), sample(), date functions, null handling - ALL LAZY")

print("\n🚨 IMPORTANT: ALL ABOVE OPERATIONS ARE LAZY - NO EXECUTION YET!")
print("The computation graph (DAG) is built but not executed.")

# ============================================
# ACTIONS (EAGER OPERATIONS) - TRIGGER EXECUTION
# ============================================

print("\n\n⚡ ALL ACTIONS (EAGER - TRIGGER EXECUTION)")

# 1. DATA COLLECTION ACTIONS
print("\n--- Data Collection Actions ---")
print("=== SHOW - Display sample data ===")
select_trans.show(3)  # Shows 3 rows

print("\n=== TAKE/HEAD - Get first n rows ===")
first_3 = df.take(3)
head_2 = df.head(2)
print(f"Take(3) returned {len(first_3)} rows")
print(f"Head(2) returned {len(head_2)} rows")

print("\n=== FIRST - Get first row ===")
first_row = df.first()
print(f"First employee: {first_row['name']}")

print("\n=== COLLECT - Bring all data to driver (CAREFUL!) ===")
small_collected = df.select("name", "department").collect()
print(f"Collected {len(small_collected)} rows - USE ONLY FOR SMALL DATA!")

# 2. COUNTING & STATISTICAL ACTIONS
print("\n--- Counting & Statistical Actions ---")
print("=== COUNT - Total rows ===")
total_rows = df.count()
print(f"Total employees: {total_rows}")

print("\n=== DESCRIBE - Statistical summary ===")
df.describe("salary").show()

print("\n=== MIN/MAX - Column extremes ===")
min_salary = df.agg(min("salary")).collect()[0][0]
max_salary = df.agg(max("salary")).collect()[0][0]
print(f"Salary range: ${min_salary:,} - ${max_salary:,}")

# 3. ADVANCED ACTIONS
print("\n--- Advanced Actions ---")
print("=== FOREACH - Process each row ===")
def print_high_earner(row):
    if row['salary'] > 75000:
        print(f"High earner: {row['name']} - ${row['salary']:,}")

high_earners = df.filter(col("salary") > 75000)
high_earners.foreach(print_high_earner)

print("\n=== REDUCE - Aggregate to single value ===")
total_salary = df.rdd.map(lambda row: row['salary']).reduce(lambda a, b: a + b)
print(f"Total company salary: ${total_salary:,}")

# 4. OUTPUT ACTIONS
print("\n--- Output Actions ---")
print("=== WRITE - Save to storage ===")
# df.write.mode("overwrite").parquet("output/employees")  # Uncomment to actually write
print("✅ write.parquet() - saves DataFrame to storage")

print("\n=== CACHE/PERSIST - Store in memory for reuse ===")
cached_df = df.cache()  # Marks for caching
cached_count = cached_df.count()  # Triggers caching
print(f"DataFrame cached with {cached_count} rows")

# ============================================
# PERFORMANCE DEMONSTRATION
# ============================================

print("\n\n📈 PERFORMANCE COMPARISON DEMO")

# Multiple transformations (all lazy)
start_time = time.time()
complex_transformation = df.filter(col("salary") > 60000) \
                          .withColumn("bonus", col("salary") * 0.15) \
                          .withColumn("total_comp", col("salary") + col("bonus")) \
                          .groupBy("department") \
                          .agg(avg("total_comp").alias("avg_total_comp")) \
                          .orderBy(desc("avg_total_comp"))
transform_time = time.time() - start_time
print(f"⏱️ Creating complex transformation chain: {transform_time:.6f} seconds (LAZY)")

# Trigger execution with action
start_time = time.time()
complex_transformation.show()
action_time = time.time() - start_time
print(f"⏱️ Executing with show() action: {action_time:.6f} seconds (EAGER)")

# ============================================
# CACHING PERFORMANCE DEMO
# ============================================

print("\n📊 CACHING PERFORMANCE DEMO")

expensive_transformation = df.filter(col("salary") > 70000) \
                             .groupBy("department") \
                             .agg(avg("salary").alias("avg_sal"), count("*").alias("count"))

# Without caching - multiple actions recompute
start_time = time.time()
count1 = expensive_transformation.count()
result1 = expensive_transformation.collect()
no_cache_time = time.time() - start_time
print(f"⏱️ Two actions WITHOUT caching: {no_cache_time:.6f} seconds")

# With caching - computation reused
cached_transformation = expensive_transformation.cache()
start_time = time.time()
count2 = cached_transformation.count()  # Computes and caches
result2 = cached_transformation.collect()  # Uses cache
cache_time = time.time() - start_time
print(f"⏱️ Two actions WITH caching: {cache_time:.6f} seconds")
print(f"🚀 Caching speedup: {no_cache_time/cache_time:.1f}x faster")

# ============================================
# COMMON INTERVIEW SCENARIOS
# ============================================

print("\n\n🎯 COMMON INTERVIEW SCENARIOS")

print("\n--- Scenario 1: Chain multiple transformations ---")
# This is efficient - all transformations are lazy until action
chained_result = df.select("name", "department", "salary") \
                   .filter(col("salary") > 65000) \
                   .withColumn("bonus", col("salary") * 0.1) \
                   .groupBy("department") \
                   .agg(avg("bonus").alias("avg_bonus")) \
                   .orderBy(desc("avg_bonus"))

print("✅ Chained 5 transformations efficiently (lazy)")
chained_result.show()  # Single action executes entire chain

print("\n--- Scenario 2: Window functions with ranking ---")
window_by_dept = Window.partitionBy("department").orderBy(desc("salary"))
ranked_employees = df.withColumn("rank", row_number().over(window_by_dept)) \
                     .withColumn("dense_rank", dense_rank().over(window_by_dept))
print("✅ Window function transformations created (lazy)")
ranked_employees.show()  # Action triggers execution

print("\n--- Scenario 3: Complex joins with aggregations ---")
dept_project_stats = df.join(projects_df, df.id == projects_df.lead_id, "left") \
                        .groupBy("department") \
                        .agg(count("project_id").alias("project_count"),
                             countDistinct("project_id").alias("unique_projects"),
                             avg("salary").alias("avg_salary"))
print("✅ Join + aggregation transformation created (lazy)")
dept_project_stats.show()  # Action executes join and aggregation

# ============================================
# COMMON MISTAKES TO AVOID
# ============================================

print("\n\n❌ COMMON MISTAKES TO AVOID")

print("\n--- Mistake 1: Multiple actions without caching ---")
print("❌ BAD: df.filter(...).count() + df.filter(...).show() - recomputes filter twice")
print("✅ GOOD: cached = df.filter(...).cache(); cached.count(); cached.show()")

print("\n--- Mistake 2: Using collect() on large data ---")
print("❌ BAD: large_df.collect() - brings all data to driver (OOM risk)")
print("✅ GOOD: large_df.show(20) or large_df.take(100)")

print("\n--- Mistake 3: Not understanding lazy evaluation ---")
print("❌ BAD: Expecting transformations to execute immediately")
print("✅ GOOD: Chain transformations, then trigger with action")

print("\n--- Mistake 4: Inefficient joins ---")
print("❌ BAD: large_df.join(huge_df) - both large datasets")
print("✅ GOOD: large_df.join(broadcast(small_df)) - broadcast small dataset")

print("\n\n✅ COMPLETE TRANSFORMATIONS vs ACTIONS DEMO FINISHED!")
print("🎯 You now understand the full spectrum of PySpark lazy vs eager operations!")

=== Original DataFrame ===
+---+-----+-----------+------+----------+----------------+
| id| name| department|salary| hire_date|          skills|
+---+-----+-----------+------+----------+----------------+
|  1| John|Engineering| 75000|2020-01-15| [Python, Spark]|
|  2|Alice|  Marketing| 65000|2019-03-20|  [SQL, Tableau]|
|  3|  Bob|Engineering| 80000|2021-06-10|   [Java, Scala]|
|  4|Carol|      Sales| 70000|2020-11-05|[Excel, PowerBI]|
|  5|David|Engineering| 85000|2018-08-12|  [Python, Java]|
|  6|  Eve|  Marketing| 68000|2021-02-28|   [SQL, Python]|
|  7|Frank|      Sales| 72000|2019-12-15|    [Salesforce]|
+---+-----+-----------+------+----------+----------------+


🔄 ALL TRANSFORMATIONS (LAZY - NO EXECUTION)
--- Basic Transformations ---
✅ select(), filter(), where(), withColumn(), withColumnRenamed(), drop(), distinct() - ALL LAZY

--- Aggregation Transformations ---
✅ groupBy(), agg(), orderBy(), sort(), limit() - ALL LAZY

--- Join & Union Transformations ---
✅ join(), union(), 

---

## 🎯 Key Takeaways for Interviews

### Must Remember:

1. **Transformations are LAZY** - they build a computation graph
2. **Actions are EAGER** - they trigger execution and return results
3. **Common transformations**: `select()`, `filter()`, `withColumn()`, `groupBy()`, `join()`
4. **Common actions**: `show()`, `count()`, `collect()`, `take()`, `write()`
5. **Performance**: Cache DataFrames used multiple times
6. **Memory**: Avoid `collect()` on large DataFrames

### Interview Questions to Expect:

- "What's the difference between transformations and actions?"
- "Why does Spark use lazy evaluation?"
- "When would you use `cache()` or `persist()`?"
- "What's wrong with using `collect()` on a large DataFrame?"
- "How do you optimize a slow Spark job?"

### Performance Best Practices:

#### ✅ DO:
- Chain transformations before actions
- Cache DataFrames used multiple times
- Use `broadcast()` for small datasets in joins
- Use `show()` for displaying data
- Use `take(n)` for small samples

#### ❌ DON'T:
- Use `collect()` on large DataFrames
- Create multiple actions without caching
- Ignore data skew in joins
- Use UDFs when built-in functions exist
- Forget to unpersist cached DataFrames

---

## 🚀 Final Interview Tips

### Core Concepts to Articulate:

1. **Lazy Evaluation Benefits:**
   - Query optimization through Catalyst optimizer
   - Fault tolerance through lineage tracking
   - Resource efficiency - only compute what's needed
   - Avoiding unnecessary intermediate results

2. **When to Cache:**
   - DataFrame is used multiple times
   - Expensive computations (joins, aggregations)
   - Iterative algorithms (machine learning)
   - Breaking long lineage chains

3. **Memory Management:**
   - Understanding driver vs executor memory
   - Avoiding driver OOM with `collect()`
   - Proper partitioning strategies
   - Using appropriate storage levels

4. **Optimization Strategies:**
   - Predicate pushdown
   - Projection pruning
   - Broadcast joins for small datasets
   - Bucketing for frequent joins
   - Proper data serialization

This comprehensive guide covers all essential transformations and actions you'll need for PySpark technical interviews! 🎯