# **DSC 232R: Week 3 Discussion**
**Topic:** Apache Spark DataFrames

This notebook covers the essentials of setting up a Spark environment in Colab, creating DataFrames, and performing basic to advanced transformations.

**Key Concepts:**
* SparkSession Entry Point
* Schema Definition
* Transformations vs. Actions
* Immutability
* UDFs (User Defined Functions)

## 1. Environment Setup & Installation
Since Google Colab does not have Spark installed by default, we need to install the Java Development Kit (JDK) and download Spark manually.

In [1]:
# Update apt-get repositories
!sudo apt update

# Install Java (OpenJDK 8) - Spark runs on the JVM
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

# Download Apache Spark 3.2.1 with Hadoop 3.2
!wget -q https://dlcdn.apache.org/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz

# Unzip the downloaded file
!tar xf spark-3.2.1-bin-hadoop3.2.tgz

# Install findspark (helps locate Spark in the system)
!pip install -q findspark

# Install PySpark library
!pip install pyspark

# Install Py4J (enables Python to dynamically access Java objects)
!pip install py4j

import os
import sys

[33m0% [Working][0m            Get:1 http://security.ubuntu.com/ubuntu jammy-security InRelease [129 kB]
[33m0% [Connecting to archive.ubuntu.com (185.125.190.83)] [1 InRelease 0 B/129 kB [0m                                                                               Get:2 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease [3,632 B]
Hit:3 https://cli.github.com/packages stable InRelease
Hit:4 http://archive.ubuntu.com/ubuntu jammy InRelease
Get:5 https://r2u.stat.illinois.edu/ubuntu jammy InRelease [6,555 B]
Get:6 http://archive.ubuntu.com/ubuntu jammy-updates InRelease [128 kB]
Get:7 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ Packages [83.8 kB]
Hit:8 https://ppa.launchpadcontent.net/deadsnakes/ppa/ubuntu jammy InRelease
Hit:9 https://ppa.launchpadcontent.net/ubuntugis/ppa/ubuntu jammy InRelease
Get:10 http://archive.ubuntu.com/ubuntu jammy-backports InRelease [127 kB]
Get:11 http://security.ubuntu.com/ubuntu jammy-security/main amd64 Packa

## 2. Initialize SparkSession
The `SparkSession` is the entry point to programming Spark with the Dataset and DataFrame API.

In [2]:
from pyspark.sql import SparkSession

In [3]:
# Create a SparkSession
# .master("local[*]") sets the master to local mode using all available cores (*)
spark = SparkSession.builder \
    .appName("DataFrame Tutorial") \
    .master("local[*]") \
    .getOrCreate()

# 3. DataFrames
Apache Spark DataFrames are distributed collections of data organized into named columns.



They are conceptually equivalent to tables in a relational database or DataFrames in Python's pandas library, but with the added benefit of being **distributed** across a cluster for big data processing.

### Creating DataFrames

#### Method 1: Inferring Schema from a List
Spark can automatically infer the data types based on the data provided.

In [4]:
# Create DataFrame from a list of tuples
data = [("John", 25, "New York"),
        ("Jane", 30, "San Francisco"),
        ("Mike", 35, "Chicago")]

# Define column names
columns = ["name", "age", "city"]

# Create the DataFrame (Spark infers types here)
df = spark.createDataFrame(data, columns)

# Show the data (Action)
df.show()

+----+---+-------------+
|name|age|         city|
+----+---+-------------+
|John| 25|     New York|
|Jane| 30|San Francisco|
|Mike| 35|      Chicago|
+----+---+-------------+



#### Method 2: Defining a Strict Schema
For production pipelines, it is best practice to define a schema explicitly to ensure data integrity and improve performance.

In [5]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType

# Define schema using StructType and StructField
# nullable=False means the field cannot contain Null values
schema = StructType([
    StructField("name", StringType(), nullable=False),
    StructField("age", IntegerType(), nullable=False),
    StructField("city", StringType(), nullable=True)
])

# Create DataFrame enforcing the defined schema
df = spark.createDataFrame(data, schema)
df.show()

+----+---+-------------+
|name|age|         city|
+----+---+-------------+
|John| 25|     New York|
|Jane| 30|San Francisco|
|Mike| 35|      Chicago|
+----+---+-------------+



#### Method 3: Reading from External Sources
Spark supports reading from various formats like CSV, JSON, Parquet, and JDBC.

```python
# CSV (Setting inferSchema=True forces Spark to read the file once to guess types)
df_csv = spark.read.csv("path/to/file.csv", header=True, inferSchema=True)

# JSON
df_json = spark.read.json("path/to/file.json")

# Parquet (Efficient, columnar storage format - highly recommended for Spark)
df_parquet = spark.read.parquet("path/to/file.parquet")

# Database (JDBC)
df_jdbc = spark.read \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://localhost:5432/database") \
    .option("dbtable", "table_name") \
    .option("user", "username") \
    .option("password", "password") \
    .load()
```



## 4. Basic DataFrame Operations
Explaining **Actions** vs **Transformations**:
* **Transformations** (e.g., `select`, `filter`) are *lazy*. They build a logical plan but don't execute immediately.
* **Actions** (e.g., `show`, `count`) trigger the actual computation.

### Viewing Data

In [6]:
# Display the first 20 rows by default (Action)
df.show()

# Display the first n rows
df.show(5)

# Show the schema (tree structure of columns and types)
df.printSchema()

# Get basic statistics (count, mean, stddev, min, max) - similar to pandas df.describe()
df.describe().show()

+----+---+-------------+
|name|age|         city|
+----+---+-------------+
|John| 25|     New York|
|Jane| 30|San Francisco|
|Mike| 35|      Chicago|
+----+---+-------------+

+----+---+-------------+
|name|age|         city|
+----+---+-------------+
|John| 25|     New York|
|Jane| 30|San Francisco|
|Mike| 35|      Chicago|
+----+---+-------------+

root
 |-- name: string (nullable = false)
 |-- age: integer (nullable = false)
 |-- city: string (nullable = true)

+-------+----+----+-------------+
|summary|name| age|         city|
+-------+----+----+-------------+
|  count|   3|   3|            3|
|   mean|NULL|30.0|         NULL|
| stddev|NULL| 5.0|         NULL|
|    min|Jane|  25|      Chicago|
|    max|Mike|  35|San Francisco|
+-------+----+----+-------------+



### Selecting Columns
You can select columns using string names or the `col()` function (useful for expressions).

In [7]:
from pyspark.sql import functions as F

# Select a single column by name
df.select("name").show()

# Select multiple columns
df.select("name", "age").show()

# Select with expressions using F.col()
# This allows arithmetic or other operations during selection
df.select(F.col("name"), F.col("age") + 1).show()

+----+
|name|
+----+
|John|
|Jane|
|Mike|
+----+

+----+---+
|name|age|
+----+---+
|John| 25|
|Jane| 30|
|Mike| 35|
+----+---+

+----+---------+
|name|(age + 1)|
+----+---------+
|John|       26|
|Jane|       31|
|Mike|       36|
+----+---------+



### Filtering Data
Filtering rows based on conditions (equivalent to SQL `WHERE`).

In [8]:
# Filter by condition using column object
df.filter(df.age > 25).show()

# Multiple conditions using bitwise operators (& for AND, | for OR)
df.filter((df.age > 25) & (df.city == "Chicago")).show()

# Using SQL expression string directly
df.filter("age > 25 AND city = 'Chicago'").show()

+----+---+-------------+
|name|age|         city|
+----+---+-------------+
|Jane| 30|San Francisco|
|Mike| 35|      Chicago|
+----+---+-------------+

+----+---+-------+
|name|age|   city|
+----+---+-------+
|Mike| 35|Chicago|
+----+---+-------+

+----+---+-------+
|name|age|   city|
+----+---+-------+
|Mike| 35|Chicago|
+----+---+-------+



### Adding and Modifying Columns
**Note:** DataFrames are **immutable**. `withColumn` returns a *new* DataFrame with the requested change; it does not modify the original `df` in place.

In [9]:
# Add a new column based on an existing one
df = df.withColumn("age_plus_ten", df.age + 10)
df.show()

# Rename an existing column
df = df.withColumnRenamed("age", "years_old")
df.show()

# Drop a column
df = df.drop("age_plus_ten")
df.show()

+----+---+-------------+------------+
|name|age|         city|age_plus_ten|
+----+---+-------------+------------+
|John| 25|     New York|          35|
|Jane| 30|San Francisco|          40|
|Mike| 35|      Chicago|          45|
+----+---+-------------+------------+

+----+---------+-------------+------------+
|name|years_old|         city|age_plus_ten|
+----+---------+-------------+------------+
|John|       25|     New York|          35|
|Jane|       30|San Francisco|          40|
|Mike|       35|      Chicago|          45|
+----+---------+-------------+------------+

+----+---------+-------------+
|name|years_old|         city|
+----+---------+-------------+
|John|       25|     New York|
|Jane|       30|San Francisco|
|Mike|       35|      Chicago|
+----+---------+-------------+



## 5. Advanced DataFrame Operations

### Grouping and Aggregation
Spark provides the `groupBy` transformation which returns a `GroupedData` object, on which you can perform aggregations.

In [10]:
# Group by 'city' and count the number of rows per city
df.groupBy("city").count().show()

+-------------+-----+
|         city|count|
+-------------+-----+
|     New York|    1|
|San Francisco|    1|
|      Chicago|    1|
+-------------+-----+



In [11]:
# Perform multiple aggregations at once using the .agg() method
df.groupBy("city").agg(
    F.count("*").alias("count"),          # Count rows
    F.avg("years_old").alias("avg_age"),  # Calculate average age
    F.min("years_old").alias("min_age"),  # Find minimum age
    F.max("years_old").alias("max_age")   # Find maximum age
).show()

+-------------+-----+-------+-------+-------+
|         city|count|avg_age|min_age|max_age|
+-------------+-----+-------+-------+-------+
|     New York|    1|   25.0|     25|     25|
|San Francisco|    1|   30.0|     30|     30|
|      Chicago|    1|   35.0|     35|     35|
+-------------+-----+-------+-------+-------+



### Joining DataFrames
Combining two DataFrames based on a common column.

In [12]:
# Create 'employee' DataFrame
employee_data = [
    (1, "John", "Engineering"),
    (2, "Jane", "Marketing"),
    (3, "Mike", "Sales")
]
employee_df = spark.createDataFrame(employee_data, ["id", "name", "department"])

# Create 'salary' DataFrame
salary_data = [
    (1, 70000),
    (2, 80000),
    (3, 65000)
]
salary_df = spark.createDataFrame(salary_data, ["id", "salary"])

# Inner Join: Returns records that have matching values in both tables
employee_df.join(salary_df, "id").show()

# Left Join: Returns all records from the left table, and the matched records from the right table
employee_df.join(salary_df, "id", "left").show()

# Right Join: Returns all records from the right table, and the matched records from the left table
employee_df.join(salary_df, "id", "right").show()

# Full Outer Join: Returns all records when there is a match in either left or right table
employee_df.join(salary_df, "id", "outer").show()

+---+----+-----------+------+
| id|name| department|salary|
+---+----+-----------+------+
|  1|John|Engineering| 70000|
|  2|Jane|  Marketing| 80000|
|  3|Mike|      Sales| 65000|
+---+----+-----------+------+

+---+----+-----------+------+
| id|name| department|salary|
+---+----+-----------+------+
|  1|John|Engineering| 70000|
|  3|Mike|      Sales| 65000|
|  2|Jane|  Marketing| 80000|
+---+----+-----------+------+

+---+----+-----------+------+
| id|name| department|salary|
+---+----+-----------+------+
|  1|John|Engineering| 70000|
|  3|Mike|      Sales| 65000|
|  2|Jane|  Marketing| 80000|
+---+----+-----------+------+

+---+----+-----------+------+
| id|name| department|salary|
+---+----+-----------+------+
|  1|John|Engineering| 70000|
|  2|Jane|  Marketing| 80000|
|  3|Mike|      Sales| 65000|
+---+----+-----------+------+



### Handling Missing Values
Spark provides the `DataFrameNaFunctions` submodule (accessed via `df.na`) to handle `null` or `NaN` values.

In [13]:
# Create DataFrame with explicit None (null) values
data_with_nulls = [
    ("John", 25, "New York"),
    ("Jane", None, "San Francisco"),
    ("Mike", 35, None),
    (None, 40, "Boston")
]
df_nulls = spark.createDataFrame(data_with_nulls, ["name", "age", "city"])

# drop(): Removes rows that contain null values
df_nulls.na.drop().show()

# drop(subset=...): Removes rows where specific columns are null
df_nulls.na.drop(subset=["name"]).show()

# fill(): Replaces null values with specified values
df_nulls.na.fill({"age": 0, "name": "Unknown", "city": "Unknown"}).show()

+----+---+--------+
|name|age|    city|
+----+---+--------+
|John| 25|New York|
+----+---+--------+

+----+----+-------------+
|name| age|         city|
+----+----+-------------+
|John|  25|     New York|
|Jane|NULL|San Francisco|
|Mike|  35|         NULL|
+----+----+-------------+

+-------+---+-------------+
|   name|age|         city|
+-------+---+-------------+
|   John| 25|     New York|
|   Jane|  0|San Francisco|
|   Mike| 35|      Unknown|
|Unknown| 40|       Boston|
+-------+---+-------------+



### User Defined Functions (UDFs)
UDFs allow you to extend Spark's functionality by registering standard Python functions.

*Note: UDFs can be slower than native Spark functions because they require serializing data between the JVM and Python. Use native functions (like `F.when`, `F.col`) whenever possible.*

In [14]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

df.show()

# 1. Define a standard Python function
def age_category(age):
    if age is None:
        return "Unknown"
    elif age < 30:
        return "Young"
    else:
        return "Senior"

# 2. Register the function as a Spark UDF, specifying the return type
age_category_udf = udf(age_category, StringType())

# 3. Apply the UDF to a column
df.withColumn("age_category", age_category_udf(df.years_old)).show()

+----+---------+-------------+
|name|years_old|         city|
+----+---------+-------------+
|John|       25|     New York|
|Jane|       30|San Francisco|
|Mike|       35|      Chicago|
+----+---------+-------------+

+----+---------+-------------+------------+
|name|years_old|         city|age_category|
+----+---------+-------------+------------+
|John|       25|     New York|       Young|
|Jane|       30|San Francisco|      Senior|
|Mike|       35|      Chicago|      Senior|
+----+---------+-------------+------------+



## 6. Exporting Data
Writing DataFrames to storage.

```python
# Save as CSV
df.write.csv("path/to/output/csv", header=True)

# Save as JSON
df.write.json("path/to/output/json")

# Save as Parquet (Preserves schema and is compressed)
df.write.parquet("path/to/output/parquet")

# Save to a database via JDBC
df.write \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://localhost:5432/database") \
    .option("dbtable", "output_table") \
    .option("user", "username") \
    .option("password", "password") \
    .mode("overwrite") \
    .save()
```



## Resources For DataFrames
1.  [Spark By Examples - PySpark Tutorial](https://sparkbyexamples.com/pyspark-tutorial/)
2.  [Official PySpark Documentation](https://spark.apache.org/docs/latest/api/python/index.html)