# Spark Handbook
## Apache Spark: A Comprehensive Guide for Data Engineers

This handbook provides a comprehensive overview of [Apache Spark](#apache-spark), a powerful [distributed data processing framework](#distributed-data-processing-framework) designed for handling [big data](#big-data) workloads with speed, ease of use, and flexibility.

## Table of Contents
- [1. What is Apache Spark?](#1-what-is-apache-spark)
- [2. How Apache Spark Works](#2-how-apache-spark-works)
  - [2.1. Key Concepts in Spark’s Operation](#21-key-concepts-in-sparks-operation)
  - [2.2. Spark’s Execution Workflow](#22-sparks-execution-workflow)
  - [2.3. Performance Optimizations in Spark](#23-performance-optimizations-in-spark)
  - [2.4. Spark Workloads and Ecosystem Components](#24-spark-workloads-and-ecosystem-components)
- [3. Apache Spark Architecture](#3-apache-spark-architecture)
  - [3.1. Driver Program (Master Node)](#31-driver-program-master-node)
  - [3.2. Cluster Manager](#32-cluster-manager)
  - [3.3. Executors (Worker Nodes)](#33-executors-worker-nodes)
  - [3.4. Resilient Distributed Dataset (RDD) - The Core Abstraction](#34-resilient-distributed-dataset-rdd-the-core-abstraction)
  - [3.5. Directed Acyclic Graph (DAG)](#35-directed-acyclic-graph-dag)
  - [3.6. Execution Modes in Spark](#36-execution-modes-in-spark)
    - [3.6.1. Cluster Mode](#361-cluster-mode)
    - [3.6.2. Client Mode](#362-client-mode)
    - [3.6.3. Local Mode](#363-local-mode)
- [4. Main Abstractions of Apache Spark](#4-main-abstractions-of-apache-spark)
  - [4.1. Resilient Distributed Datasets (RDDs)](#41-resilient-distributed-datasets-rdds)
  - [4.2. Directed Acyclic Graph (DAG)](#42-directed-acyclic-graph-dag)
- [5. Spark Core Components and Libraries](#5-spark-core-components-and-libraries)
  - [5.1. Spark SQL](#51-spark-sql)
  - [5.2. MLlib](#52-mllib)
- [6. Why Do Data Engineers Need Spark?](#6-why-do-data-engineers-need-spark)
- [7. Typical Use Cases](#7-typical-use-cases)
- [8. RDDs and DataFrames in Apache Spark](#8-rdds-and-dataframes-in-apache-spark)
  - [8.1. Introduction](#81-introduction)
  - [8.2. RDD: Resilient Distributed Dataset](#82-rdd-resilient-distributed-dataset)
    - [8.2.1. What is an RDD?](#821-what-is-an-rdd)
    - [8.2.2. Key Features](#822-key-features)
    - [8.2.3. Creating or Loading Data into an RDD](#823-creating-or-loading-data-into-an-rdd)
    - [8.2.4. RDD Transformation and Actions](#824-rdd-transformation-and-actions)
  - [8.3. DataFrames](#83-dataframes)
    - [8.3.1. What is a DataFrame?](#831-what-is-a-dataframe)
    - [8.3.2. Key Features](#832-key-features)
    - [8.3.3. Creating or Loading Data into a DataFrame](#833-creating-or-loading-data-into-a-dataframe)
    - [8.3.4. Common DataFrame Operations](#834-common-dataframe-operations)
  - [8.4. Conversion Between RDD and DataFrame](#84-conversion-between-rdd-and-dataframe)
  - [8.5. RDD vs. DataFrame - Comparison](#85-rdd-vs-dataframe-comparison)
  - [8.6. Use Case Summary](#86-use-case-summary)
  - [8.7. Conclusion](#87-conclusion)
- [9. Apache Spark Local Setup](#9-apache-spark-local-setup)
  - [9.1. Installing Spark Locally (Native Installation)](#91-installing-spark-locally-native-installation)
    - [9.1.1. Prerequisites](#911-prerequisites)
    - [9.1.2. Download and Install Spark](#912-download-and-install-spark)
    - [9.1.3. Set Environment Variables](#913-set-environment-variables)
    - [9.1.4. Install Required Python Libraries](#914-install-required-python-libraries)
    - [9.1.5. Test Your Installation](#915-test-your-installation)
  - [9.2. Using Docker to Set Up Spark](#92-using-docker-to-set-up-spark)
    - [9.2.1. Prerequisites](#921-prerequisites)
    - [9.2.2. Standalone Setup](#922-standalone-setup)
      - [9.2.2.1. Pull a Spark Docker Image](#9221-pull-a-spark-docker-image)
      - [9.2.2.2. Run a Spark Container](#9222-run-a-spark-container)
    - [9.2.3. Set Up Spark Cluster](#923-set-up-spark-cluster)
      - [9.2.3.1. Start the Cluster](#9231-start-the-cluster)
      - [9.2.3.2. Access the Spark Web UI](#9232-access-the-spark-web-ui)
      - [9.2.3.3. Submit Jobs](#9233-submit-jobs)
      - [9.2.3.4. Setting Up Jupyter Notebook Container for Spark (Optional)](#9234-setting-up-jupyter-notebook-container-for-spark-optional)
      - [9.2.3.5. Test Notebook Code](#9235-test-notebook-code)
- [10. Extracting and Transforming Data with Apache Spark](#10-extracting-and-transforming-data-with-apache-spark)
  - [10.1. Extracting Data with Spark](#101-extracting-data-with-spark)
    - [10.1.1. Overview](#1011-overview)
    - [10.1.2. Supported Data Sources](#1012-supported-data-sources)
    - [10.1.3. Reading Data](#1013-reading-data)
    - [10.1.4. Key Options](#1014-key-options)
    - [10.1.5. Scenario: Extracting Data from JSON, CSV, and Parquet](#1015-scenario-extracting-data-from-json-csv-and-parquet)
  - [10.2. Transforming Data with Spark](#102-transforming-data-with-spark)
    - [10.2.1. Overview](#1021-overview)
    - [10.2.2. Common Transformation Operations](#1022-common-transformation-operations)
    - [10.2.3. Scenario: Joining and Aggregating Sales Data](#1023-scenario-joining-and-aggregating-sales-data)
  - [10.3. Best Practices](#103-best-practices)


## 1. What is Apache Spark?

[Apache Spark](#apache-spark) is an open-source, [distributed analytics engine](#distributed-analytics-engine) designed for large-scale data processing and [machine learning](#machine-learning). It is renowned for its speed, versatility, and ability to scale from a single machine to large clusters of computers. Spark offers APIs in several popular languages, including Python (using PySpark), Scala, Java, and R, making it accessible to a wide audience of data professionals.


## 2. How Apache Spark Works


![How Spark Works](img/how_spark_works.png)

[Apache Spark](#apache-spark) is a [distributed data processing framework](#distributed-data-processing-framework) designed to handle [big data](#big-data) workloads with speed, ease of use, and flexibility. The fundamental principle behind Spark's operation is its [master-slave architecture](#master-slave-architecture), which allows it to execute tasks in parallel across a cluster of machines.

### 2.1. Key Concepts in Spark’s Operation

#### 2.1.1. Driver Program
The [driver](#driver) is the central coordinator and controller of a Spark application. When you start a Spark application, the driver runs your main program. It is responsible for:
- Creating a [SparkContext](#sparkcontext), which is the entry point to all Spark functionalities.
- Converting the user’s code (written in Scala, Python, Java, or R) into a [logical execution plan](#logical-execution-plan).
- Breaking down the application into smaller pieces called [jobs](#jobs) and subsequently into [stages](#stages) and [tasks](#tasks).
- Scheduling tasks on [executors](#executors) and managing their lifecycle.
- Handling [fault tolerance](#fault-tolerance) by retrying failed tasks and reallocating resources.

#### 2.1.2. SparkContext
[SparkContext](#sparkcontext) represents the connection to the computing cluster. It acts as the interface between your Spark application and the [cluster manager](#cluster-manager), letting your program create [distributed collections](#distributed-collections) ([RDDs](#rdds) — Resilient Distributed Datasets), [accumulators](#accumulators), and [broadcast variables](#broadcast-variables).

#### 2.1.3. Executors
[Executors](#executors) are worker processes that run on cluster nodes. Each executor:
- Receives [tasks](#tasks) from the [driver](#driver).
- Executes those tasks concurrently.
- Stores intermediate data and results either in memory or on disk.
- Returns results and task status (success or failure) back to the driver.
Their lifespan is tied to the lifecycle of the Spark application.

#### 2.1.4. Cluster Manager
The [cluster manager](#cluster-manager) is a separate system responsible for managing cluster resources and allocating them to various applications. Spark can operate with several cluster managers:
- [Standalone cluster manager](#standalone-cluster-manager) (provided by Spark itself for simple setups).
- [Apache Mesos](#apache-mesos) (a general-purpose cluster manager).
- [Hadoop YARN](#hadoop-yarn) (resource manager used with Hadoop clusters).
- [Kubernetes](#kubernetes) (for container orchestration).

The cluster manager launches the Spark [driver](#driver) and [executors](#executors) on cluster nodes, depending on the execution mode.

### 2.2. Spark’s Execution Workflow
1. When an application starts, the [driver program](#driver) is launched, which creates the [SparkContext](#sparkcontext).
2. The driver creates a [Directed Acyclic Graph (DAG)](#directed-acyclic-graph-dag) representing the computation flow based on user operations ([transformations](#transformations) and [actions](#actions)).
3. The [DAG Scheduler](#dag-scheduler) breaks this DAG into [stages](#stages), grouping tasks based on [shuffle boundaries](#shuffle-boundaries) and data dependencies.
4. The [Task Scheduler](#task-scheduler) then schedules individual [tasks](#tasks) within the stages for execution on the [executors](#executors).
5. Tasks are assigned to executors running on worker nodes.
6. Executors perform computations, cache data as needed (to speed up repeated data processing), and report results and status back to the driver.
7. The driver aggregates results and completes the [job](#jobs).

### 2.3. Performance Optimizations in Spark
- [In-Memory Computation](#in-memory-computation): Spark loads data into memory and performs computations there, minimizing slower disk I/O operations.
- [Data Caching](#data-caching): Frequently used datasets can be cached in memory across iterations to enhance performance, particularly useful for [machine learning](#machine-learning) and iterative algorithms.
- [Stage Pipelining](#stage-pipelining): Multiple operations can be pipelined within a stage if they do not require a shuffle, avoiding unnecessary disk writes.
- [Fault Tolerance](#fault-tolerance): Spark maintains [lineage information](#lineage-information) of [RDDs](#rdds), so it knows how to recompute lost data partitions in case of executor failures.

### 2.4. Spark Workloads and Ecosystem Components
Spark is not just a batch processing engine but a full ecosystem for diverse workloads:
- [Spark Core](#spark-core): Handles basic operations like [job scheduling](#job-scheduling), memory management, [fault recovery](#fault-recovery), and task dispatching.
- [Spark SQL](#spark-sql): Provides interactive querying capabilities using SQL or Hive Query Language with high-performance engines.
- [Spark Streaming](#spark-streaming): Enables real-time data processing through micro-batch streaming of live data sources such as [Kafka](#kafka) and Twitter.
- [MLlib](#mllib): Spark’s machine learning library providing scalable algorithms including classification, regression, clustering, and collaborative filtering.
- [GraphX](#graphx): Framework for graph processing and computation across [distributed datasets](#distributed-datasets).


## 3. Apache Spark Architecture

[Apache Spark](#apache-spark)’s architecture comprises several key components and follows a modular, layered design optimized for [distributed processing](#distributed-processing).

![Spark Architecture](img/spark_architecture.png)

### 3.1. Driver Program (Master Node)
- Runs your application containing the user’s code.
- Manages [SparkContext](#sparkcontext) and coordinates the execution of [tasks](#tasks).
- Converts user operations into a [DAG](#directed-acyclic-graph-dag).
- Interacts with the [cluster manager](#cluster-manager) for resource allocation.
- Oversees [job scheduling](#job-scheduling) via the [DAG Scheduler](#dag-scheduler) and [Task Scheduler](#task-scheduler).
- Maintains cluster state and tracks job progress and [fault handling](#fault-handling).

The [driver](#driver) is critical because it manages job orchestration and monitors system health and task execution.

### 3.2. Cluster Manager
- A standalone service or integration with other cluster management tools.
- Manages resources across the cluster.
- Launches the [driver](#driver) and [executor](#executors) processes as per the requested resources.
- Monitors node health and manages failures within the cluster.

Supported cluster managers include:
- [Spark Standalone](#standalone-cluster-manager) (simple and easy setup).
- [Apache Mesos](#apache-mesos) (multi-framework support).
- [Hadoop YARN](#hadoop-yarn) (common in Hadoop ecosystems).
- [Kubernetes](#kubernetes) (for containerized Spark deployments).

### 3.3. Executors (Worker Nodes)
- Executor processes run on each worker node in the cluster.
- Perform actual data processing by executing [tasks](#tasks) assigned by the [driver](#driver).
- Cache data in memory or on disk for efficient reuse.
- Handle communication with the driver, sending back task execution results.
- Their number can be configured based on workload and cluster size.

### 3.4. Resilient Distributed Dataset (RDD) - The Core Abstraction
- [RDDs](#rdds) are immutable distributed collections of objects partitioned across the cluster.
- They provide [fault tolerance](#fault-tolerance) by logging [lineage information](#lineage-information), enabling automatic recomputation.
- Users can perform [transformations](#transformations) (lazy evaluated) and [actions](#actions) on RDDs.
- RDD abstractions facilitate parallel computations without explicit data movement handling.

### 3.5. Directed Acyclic Graph (DAG)
- The [DAG](#directed-acyclic-graph-dag) abstraction represents [stages](#stages) and [tasks](#tasks) of computation.
- Directed graph with no cycles that represents the dependencies between [transformations](#transformations).
- The [DAG Scheduler](#dag-scheduler) converts the program's DAG into stages for execution optimization.
- Enables [pipeline execution](#pipeline-execution) within stages and minimizes overhead of disk I/O.

### 3.6. Execution Modes in Spark
Spark supports three main modes of execution which influence where the [driver](#driver) and [executors](#executors) run:

#### 3.6.1. Cluster Mode
- [Driver](#driver) runs inside the cluster on one of the worker nodes.
- [Cluster manager](#cluster-manager) manages driver and all executor processes.
- Suitable for production deployments.

#### 3.6.2. Client Mode
- [Driver](#driver) runs on the client machine from which the job was submitted.
- [Executors](#executors) run on the cluster nodes.
- Useful for interactive debugging or testing.

#### 3.6.3. Local Mode
- Entire Spark application executes on a single machine.
- Parallelism is achieved using multiple threads.
- Mostly used for development, experimentation, and debugging.
- Not recommended for production jobs.


## 4. Main Abstractions of Apache Spark

### 4.1. Resilient Distributed Datasets (RDDs)

[RDDs](#rdds) are the fundamental data structure in Spark. They represent immutable, [distributed collections](#distributed-collections) of objects partitioned across the cluster. RDDs support two types of operations:

- **[Transformations](#transformations):** Lazy operations that define a new RDD from an existing one (e.g., map, filter).
- **[Actions](#actions):** Operations that trigger computation and return results (e.g., collect, count).

RDDs enable [fault tolerance](#fault-tolerance) by tracking [lineage information](#lineage-information), allowing the system to recompute lost data partitions in case of node failure.

### 4.2. Directed Acyclic Graph (DAG)

[Apache Spark](#apache-spark) uses a [DAG](#directed-acyclic-graph-dag) to represent the sequence of [transformations](#transformations) applied to [RDDs](#rdds). When a [job](#jobs) is submitted, Spark’s [DAG Scheduler](#dag-scheduler) breaks the computation into [stages](#stages) of [tasks](#tasks) that can be executed in parallel. This [DAG](#directed-acyclic-graph-dag)-based execution plan enables optimization and efficient [job scheduling](#job-scheduling).


## 5. Spark Core Components and Libraries

### 5.1. Spark SQL

[Spark SQL](#spark-sql) is Spark’s module for working with structured data. It allows querying data using:
- Standard SQL.
- Hive Query Language (HQL).
- Support for numerous data sources including Hive tables, Parquet, and JSON.

[Spark SQL](#spark-sql) integrates SQL queries with Spark’s programmatic APIs ([RDDs](#rdds), [DataFrames](#dataframes)) in Python, Scala, and Java. This tight integration supports complex analytics and interactive querying within a unified application framework.

### 5.2. MLlib

[MLlib](#mllib) is Spark’s scalable [machine learning](#machine-learning) library. It provides:
- Algorithms for classification, regression, clustering, and collaborative filtering.
- Utilities for model evaluation and data import.
- Low-level primitives such as a generic gradient descent optimization algorithm.

### 5.3. GraphX

GraphX is Spark’s graph processing library, enabling:
- Creation and manipulation of graphs with properties on vertices and edges.
- Graph-parallel computations like PageRank and triangle counting.
- Operators such as subgraph extraction and vertex mapping.


## 6. Why Do Data Engineers Need Spark?

### 1. Speed and Performance
- Spark performs in-memory computing, reducing costly disk read/write operations.
- It can be up to 100× faster than Hadoop MapReduce for iterative and interactive workloads.

### 2. Scalability
- Spark scales from a single machine to thousands of cluster nodes.
- Handles petabyte-scale data through distributed processing.

### 3. Unified Processing Engine
- Supports batch processing, real-time streaming, SQL querying, machine learning, and graph analytics all within one platform.

### 4. Language Flexibility and Ease of Use
- Provides APIs in Python, Scala, Java, and R.
- High-level abstractions (RDDs, DataFrames, Datasets) simplify complex data transformations.

### 5. Ecosystem and Integration
- Integrates with Hadoop HDFS, Amazon S3, Apache Kafka, and other platforms.
- Supports multiple cluster managers for flexible deployment.

### 6. Essential for Modern Workloads
- Enables ETL pipelines, real-time dashboards, machine learning workflows, and large-scale interactive queries.




## 7. Typical Use Cases

[Apache Spark](#apache-spark) is widely used for:
- **ETL Processes**: Extracting, transforming, and loading data from various sources like CSV, JSON, and Parquet.
- **Real-Time Analytics**: Processing streaming data from sources like [Kafka](#kafka) for live dashboards.
- **Machine Learning**: Training models on large datasets using [MLlib](#mllib).
- **Graph Analytics**: Analyzing relationships in data with [GraphX](#graphx).

These use cases are exemplified in the fictional "Global Gadgets" dataset, where Spark integrates customer, product, and order data for business intelligence.


## 8. RDDs and DataFrames in Apache Spark

Dataset: `./data/customers.csv`

### 8.1. Introduction
Apache Spark has two core abstractions for working with distributed data:
- **[RDD (Resilient Distributed Dataset)](#rdds):** The original low-level distributed data structure.
- **[DataFrame](#dataframes):** A high-level abstraction built on top of RDDs, offering a tabular data structure similar to a database table or Pandas DataFrame.

### 8.2. RDD: Resilient Distributed Dataset

#### 8.2.1. What is an RDD?
An [RDD](#rdds) is an immutable [distributed collection](#distributed-collections) of objects that can be processed in parallel.

#### 8.2.2. Key Features
- [Fault-tolerant](#fault-tolerance)
- Lazy evaluation
- Supports [transformations](#transformations) (e.g., `map`, `filter`) and [actions](#actions) (e.g., `collect`, `count`)
- Type-safe (in Scala/Java)
- No built-in schema

#### 8.2.3. Creating or Loading Data into an RDD

##### Creating an RDD (PySpark):


In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("RDDExample").getOrCreate()
sc = spark.sparkContext
rdd = sc.parallelize([1, 2, 3, 4, 5])

##### Loading Data into an RDD


In [None]:
# Load file (skip header)
rdd = sc.textFile("./data/customers.csv")
header = rdd.first()
rdd_data = rdd.filter(lambda line: line != header)

#### 8.2.4. RDD Transformation and Actions


In [None]:
# Split CSV into fields
customers_rdd = rdd_data.map(lambda line: line.split(","))

# Extract customer names
names = customers_rdd.map(lambda x: f"{x[1]} {x[2]}").collect()
print(names)

### 8.3. DataFrames

#### 8.3.1. What is a DataFrame?
A [DataFrame](#dataframes) is a higher-level abstraction built on [RDDs](#rdds), providing a structured API similar to pandas DataFrames, optimized for performance with [Spark SQL](#spark-sql).

#### 8.3.2. Key Features
- Schema-aware
- Optimized with Catalyst optimizer
- Supports SQL-like operations
- Integration with various data sources (CSV, JSON, Parquet)

#### 8.3.3. Creating or Loading Data into a DataFrame


In [None]:
# Load CSV into DataFrame
df = spark.read.option("header", "true").csv("./data/customers.csv")
df.show()

#### 8.3.4. Common DataFrame Operations


In [None]:
# Select specific columns
df.select("first_name", "email").show()

# Filter customers with missing join dates
df.filter(df.join_date.isNull()).show()

# Count customers who joined
df.filter(df.join_date.isNotNull()).count()

### 8.4. Conversion Between RDD and DataFrame

#### From RDD to DataFrame


In [None]:
from pyspark.sql import Row

# Convert RDD to Row RDD
row_rdd = customers_rdd.map(lambda x: Row(
    customer_id=int(x[0]),
    first_name=x[1],
    last_name=x[2],
    email=x[3],
    join_date=x[4] if x[4] != "" else None
))

df_from_rdd = spark.createDataFrame(row_rdd)
df_from_rdd.show()

#### From DataFrame to RDD


In [None]:
rdd_from_df = df.rdd
rdd_from_df.take(3)

### 8.5. RDD vs. DataFrame - Comparison

| Feature           | RDD                        | DataFrame               |
|-------------------|----------------------------|--------------------------|
| Abstraction Level | Low                        | High                    |
| API Style         | Functional                 | SQL-like                |
| Schema            | Not enforced               | Schema-aware            |
| Performance       | Lower                      | Optimized with Catalyst |
| Best for          | Custom, fine-grained logic | Queries, aggregations   |


### 8.6. Use Case Summary

| Task                                     | Recommended |
|------------------------------------------|-------------|
| Load structured CSV data                 | DataFrame   |
| Filter or select fields efficiently      | DataFrame   |
| Custom parsing, transformation, or logic | RDD         |
| SQL-like querying and grouping           | DataFrame   |


### 8.7. Conclusion

- Use [DataFrames](#dataframes) when working with structured data like CSV, JSON, or Parquet.
- Use [RDDs](#rdds) when you need custom logic, performance tuning, or low-level transformations.

This practical section using your `customers.csv` helps you clearly see how both abstractions work and when to use them.


## 9. Apache Spark Local Setup

In this section, we'll cover two common ways to set up [Apache Spark](#apache-spark) on a local development machine:

1. **Installing Spark Locally (Native Installation)**
2. **Using Docker to Set Up Spark**

### 9.1. Installing Spark Locally (Native Installation)

This method involves manually installing Spark and its dependencies on your machine.

#### 9.1.1. Prerequisites
- **Java (JDK 8 or 11):** [Apache Spark](#apache-spark) runs on the JVM.
- **Python 3.x:** Required for PySpark.

#### 9.1.2. Download and Install Spark
- Download Spark from the [Official Apache Spark website](https://spark.apache.org/downloads.html).
  - Choose a version (e.g., Spark 3.4.1) and a pre-built package for Hadoop (e.g., "Pre-built for Apache Hadoop 3.3 and later").

##### Extract the archive to a directory of your choice

**On Linux:**


In [None]:
tar -xzf spark-3.4.1-bin-hadoop3.tgz -C /path/to/your/directory

**On Windows:**
- Use a tool like 7-Zip or WinRAR.
  - Right-click the downloaded `.tgz` file.
  - Select "Extract Here" or "Extract to spark-3.4.1-bin-hadoop3".
  - Move the extracted folder to your desired location.

#### 9.1.3. Set Environment Variables
Set the following environment variables so your system can find Spark and Java.


In [None]:
# Linux (add to ~/.bashrc or ~/.zshrc)
export SPARK_HOME=/path/to/your/directory/spark-3.4.1-bin-hadoop3
export PATH=$PATH:$SPARK_HOME/bin
export JAVA_HOME=/path/to/your/java

On Windows, set environment variables via System Properties > Environment Variables.

#### 9.1.4. Install Required Python Libraries


In [None]:
!pip install pyspark findspark

#### 9.1.5. Test Your Installation
Start the PySpark shell:


In [None]:
!pyspark

Or test with a small script:


In [None]:
import findspark
findspark.init()

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Test").getOrCreate()
print(spark.range(5).collect())

### 9.2. Using Docker to Set Up Spark

An alternative way is to run [Apache Spark](#apache-spark) inside Docker containers. This avoids manual setup and ensures a clean environment.

#### 9.2.1. Prerequisites
- Docker installed on your system ([Install Docker](https://docs.docker.com/get-docker/))

#### 9.2.2. Standalone Setup

##### 9.2.2.1. Pull a Spark Docker Image
You can use an existing image from Docker Hub or customize it using a Dockerfile.


In [None]:
docker pull bitnami/spark

##### 9.2.2.2. Run a Spark Container
Start a Spark standalone container:


In [None]:
docker run -it bitnami/spark pyspark

#### 9.2.3. Set Up Spark Cluster
You can create a local Spark cluster with [`docker-compose.yaml`](./docker-compose.yaml).

##### 9.2.3.1. Start the Cluster
Run the following command to start the cluster:


In [None]:
docker compose up -d

##### 9.2.3.2. Access the Spark Web UI
- Master: [http://localhost:8080](http://localhost:8080)
- Worker: [http://localhost:8081](http://localhost:8081)

##### 9.2.3.3. Submit Jobs
You can submit jobs using the spark-submit tool or run a PySpark shell inside the container:


In [None]:
docker exec -it spark-master pyspark --master spark://spark-master:7077

##### 9.2.3.4. Setting Up Jupyter Notebook Container for Spark (Optional)

Running a Jupyter Notebook container alongside your Spark services is a great way to interactively test Spark code using PySpark.

- Uncomment the `jupyter` service block in the [`docker-compose.yaml`](./docker-compose.yaml) file.
- Ensure the `notebooks` directory exists in the same location as your `docker-compose.yaml`:
    ```bash
    mkdir notebooks
    ```
  This directory will be mounted into the Jupyter container so that your notebooks are saved persistently.

- To start the whole cluster (including Jupyter):
    ```bash
    docker-compose up -d
    ```
- To start only the Jupyter container (after cluster is running):
    ```bash
    docker-compose up -d jupyter
    ```
- You can now access the notebook UI at: [http://localhost:8888](http://localhost:8888)

  Use the token shown in the terminal (when the Jupyter container starts) to log in.

##### 9.2.3.5. Test Notebook Code

In a new notebook, run:


In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("NotebookSpark") \
    .master("spark://spark-master:7077") \
    .getOrCreate()

spark.range(5).show()

## 10. Extracting and Transforming Data with Apache Spark

This section provides a comprehensive guide to extracting and transforming data using Apache Spark, focusing on Spark SQL and DataFrame APIs. It includes detailed explanations, practical scenarios, code examples, and best practices to help you master these critical aspects of Spark.

### 10.1. Extracting Data with Spark

#### 10.1.1. Overview
Extracting data in Apache Spark involves loading data from various sources into DataFrames for further processing. Spark’s DataFrame API provides a unified interface to read data from file formats like JSON, CSV, and Parquet, as well as databases and cloud storage.

#### 10.1.2. Supported Data Sources
Spark supports a wide range of data sources, including:
- `File Formats`: CSV, JSON, Parquet, ORC, Avro, Text
- `Databases`: JDBC/ODBC (MySQL, PostgreSQL, SQL Server, etc.)
- `Big Data Systems`: Hadoop HDFS, Apache Hive, Apache HBase
- `Cloud Storage`: AWS S3, Google Cloud Storage, Azure Blob Storage
- `Other`: Kafka, NoSQL databases like Cassandra

#### 10.1.3. Reading Data
Spark provides the `spark.read` API to load data into DataFrames. Common methods include:
- `spark.read.csv(path)`: Reads CSV files
- `spark.read.json(path)`: Reads JSON files
- `spark.read.parquet(path)`: Reads Parquet files
- `spark.read.jdbc(url, table, properties)`: Reads from JDBC databases.

#### 10.1.4. Key Options
- `header=True`: Treats the first row as column names (CSV).
- `inferSchema=True`: Automatically infers column data types.
- `schema=StructType`: Specifies a custom schema to avoid inference overhead.
- `mode`: Controls error handling (permissive, dropmalformed, failfast).

#### 10.1.5. Scenario: Extracting Data from JSON, CSV, and Parquet
**Problem**: Load the `products.json`, `customers.csv`, and `orders.parquet` datasets into Spark DataFrames, ensuring proper schema handling and error management for missing or inconsistent data.

**Solution**: 
- Define explicit schemas to ensure correct data types.
- Handle missing or malformed data during extraction.
- Cache DataFrames for repeated use and save them in Parquet for unified storage.

**Code Example**:

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, DateType

# Initialize Spark session
spark = SparkSession.builder \
    .appName("PySparkHandbook") \
    .getOrCreate()

# Define schema for products.json
products_schema = StructType([
    StructField("product_id", IntegerType(), False),
    StructField("product_name", StringType(), True),
    StructField("category", StringType(), True),
    StructField("price", DoubleType(), True)
])

# Define schema for customers.csv
customers_schema = StructType([
    StructField("customer_id", IntegerType(), False),
    StructField("first_name", StringType(), True),
    StructField("last_name", StringType(), True),
    StructField("email", StringType(), True),
    StructField("join_date", DateType(), True)
])

# Read products.json
products_df = spark.read \
    .schema(products_schema) \
    .json("./data/products.json")
 

# Read customers.csv
customers_df = spark.read \
    .schema(customers_schema) \
    .option("header", "true") \
    .option("mode", "dropmalformed") \
    .csv("./data/customers.csv")

# Read orders.parquet
orders_df = spark.read.parquet("./data/orders_parquet/orders.parquet")

# Cache DataFrames for performance
products_df.cache()
customers_df.cache()
orders_df.cache()

# Show sample data
print("Products:")
products_df.show(5, truncate=False)
print("Customers:")
customers_df.show(5, truncate=False)
print("Orders:")
orders_df.show(5, truncate=False)

# Save to Parquet for unified storage
products_df.write.mode("overwrite").parquet("./data/cleaned/products_clean.parquet")
customers_df.write.mode("overwrite").parquet("./data/cleaned/customers_clean.parquet")
orders_df.write.mode("overwrite").parquet("./data/cleaned/orders_clean.parquet")

# Stop Spark session
spark.stop()

**Explanation**:
- Defines explicit schemas to ensure correct data types and avoid inference overhead.
- Uses `dropmalformed` mode for the CSV to skip any malformed rows.
- Reads `orders.parquet` directly, as Parquet files include schema metadata.
- Caches DataFrames to improve performance for subsequent transformations.
- Saves cleaned DataFrames to Parquet for efficient storage and querying.

### 10.2. Transforming Data with Spark

#### 10.2.1. Overview
Transforming data in Spark involves manipulating [DataFrames](#dataframes) to clean, enrich, or aggregate data for analysis. Spark's DataFrame API and SQL queries support operations like joins, aggregations, filtering, and window functions, optimized for [distributed execution](#distributed-processing). The provided datasets will be used to demonstrate these transformations in practical scenarios.

#### 10.2.2. Common Transformation Operations

**Joins**: <br>
Joins combine [DataFrames](#dataframes) based on a key. Common types include:
- `Inner Join`: Returns only matching rows.
- `Left Outer Join`: Includes all rows from the left DataFrame, with nulls for non-matching rows.
- `Right Outer Join`: Includes all rows from the right DataFrame.
- `Full Outer Join`: Includes all rows from both DataFrames.

**Syntax**:
```python 
result_df = df1.join(df2, df1.key == df2.key, "inner")
```

**Aggregations**: <br>
Aggregations summarize data using functions like `sum`, `avg`, `count`, `min`, `max`, typically after `groupBy`

**Syntax**: <br>
```python 
agg_df = df.groupBy("column").agg({"other_column": "sum"})
```

**Filtering and Selecting**: <br>
- **Filtering**: Select rows with filter or where.
- **Selecting**: Choose columns with select.

**Syntax**:
```python 
filtered_df = df.filter(col("column") > value)
selected_df = df.select("column1", "column2")
```

**Handling Missing Data**
- `Drop nulls`: df.dropna(subset=["column"])
- `Fill nulls`: df.fillna(value, subset=["column"])
- `Replace values`: df.replace(old_value, new_value, subset=["column"])

#### 10.2.3. Scenario: Joining and Aggregating Sales Data
**Problem**:
Combine the `products`, `customers`, and `orders` datasets to calculate total sales and average order amount per product category. Standardize the category column in products and handle missing data in customers.

**Solution**: <br>
- Join the three [DataFrames](#dataframes) on appropriate keys.
- Standardize `category` in `products`.
- Handle missing `last_name` and `join_date` in customers.
- Aggregate by `category` to compute sales metrics.

**Code Example**:

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum as sum_, avg, count, coalesce, lit, upper

# Initialize Spark session
spark = SparkSession.builder.appName("SparkTransformation").getOrCreate()

# Load data
products_df = spark.read.parquet("./data/cleaned/products_clean.parquet")
customers_df = spark.read.parquet("./data/cleaned/customers_clean.parquet")
orders_df = spark.read.parquet("./data/cleaned/orders_clean.parquet")

# Standardize category in products (convert to uppercase)
products_df = products_df.withColumn("category", upper(col("category")))

# Handle missing data in customers
customers_df = customers_df \
    .withColumn("last_name", coalesce(col("last_name"), lit("Unknown"))) \
    .withColumn("join_date", coalesce(col("join_date"), lit("2023-01-01").cast(DateType())))

# Join DataFrames
joined_df = orders_df \
    .join(customers_df, "customer_id", "left_outer") \
    .join(products_df, "product_id", "inner")

# Print joined dataframe
print("Joined DataFrame:")
joined_df.show(truncate=False)

# Aggregate by category
summary_df = joined_df.groupBy("category") \
    .agg(
        sum_("total_price").alias("total_sales"),
        avg("total_price").alias("avg_order_amount"),
        count("order_id").alias("order_count")
    )

# Format numerical columns
summary_df = summary_df.select(
    col("category"),
    col("total_sales").cast("decimal(10,2)"),
    col("avg_order_amount").cast("decimal(10,2)"),
    col("order_count")
)

# Save and show results
summary_df.write.mode("overwrite").parquet("./data/cleaned/sales_summary.parquet")
print("Sales Summary:")
summary_df.show(truncate=False)

# Stop Spark session
spark.stop()


**Explanation**: <br>
- Standardizes `category` in `products_df` to uppercase to fix any inconsistencies (e.g., “accessories” vs. “Accessories”).
- Fills missing `last_name` with “Unknown” and `join_date` with a default date in `customers_df`.
- Performs a left outer join for `orders` and `customers` to include all orders, and an inner join with `products` to ensure valid products.
- Aggregates by `category`, computing total sales, average order amount, and order count.
- Formats numerical columns to `decimal(10,2)` for readability.
- Saves results to Parquet.

### 10.3. Best Practices

- **Explicit Schemas**: Always define schemas for JSON and CSV to ensure correct data types and avoid inference costs.
- **Handle Inconsistencies**: Standardize case-sensitive fields (e.g., `category`) early in the pipeline.
- **Null Handling**: Address missing data before joins or aggregations to prevent unexpected results.
- **Join Optimization**: Use inner joins when possible; use left outer joins to preserve data when needed.
- **Columnar Storage**: Use Parquet for intermediate and output data to leverage compression and columnar access.
- **SQL for Readability**: Use [Spark SQL](#spark-sql) for complex transformations when it improves clarity.