---
<span style="color:#000; font-family: 'Arial'; font-size: 2em;">BIG DATA</span>

<span style="color:#f00; font-family: 'Arial'; font-size: 1.5em;">Unit 7: Apache Spark: Cluster Deployment and Performance Optimization </span>

<span style="color:#300; font-family: 'Arial'; font-size: 1.5em;"></span>
<h4 style="color:darkblue"> Universidad de Deusto</h4>

<span style="color:#300; font-family: 'Arial'; font-size: 1em;">m.varo@deusto.es</span>

<h5 style="color:black">  11 de abril de 2025 - Donostia </h5>

---

### 1. Spark Memory Management

It's important to note that Spark does **not** automatically cache input data in memory. A common misconception is that Spark cannot be used effectively unless the input data fits entirely in memory. This is **not true**. Spark is capable of processing terabytes of data even on clusters with limited memory—for example, a cluster with only 5 GB of total memory.

Deciding what data to cache, and when to cache it during a data processing pipeline, is the responsibility of the application developer. In fact, if a Spark application only makes a single pass over the data, caching may not be necessary at all.

Another reason Spark outperforms Hadoop MapReduce is its advanced job execution engine. Like MapReduce, Spark represents jobs as Directed Acyclic Graphs (DAGs) of stages, but it processes these DAGs more efficiently, enabling better performance and reduced execution time.


In [6]:
from pyspark.sql import SparkSession

# Step 1: Start Spark session
spark = SparkSession.builder \
    .appName("CacheExample") \
    .config("spark.ui.showConsoleProgress", "true") \
    .getOrCreate()

In [7]:
# Step 2: Load a sample dataset
df = spark.range(0, 1000000)  # Creates a DataFrame with numbers from 0 to 999999

In [8]:
# Step 3: Cache the DataFrame in memory
df.cache()

DataFrame[id: bigint]

In [9]:
# Step 4: Trigger an action to materialize the cache
df.count()

1000000

In [10]:
# Step 5: Check if it's cached
print(f"Is the DataFrame cached? {df.is_cached}")

Is the DataFrame cached? True


- After running df.cache() and an action like df.count(), Spark will cache the data in memory.

- You can open your Spark UI in your browser at http://localhost:4040 — there you’ll see a tab called "Storage", which shows cached RDDs/DataFrames and their memory usage.

---
### 3. Optimization in Spark vs. Hadoop MapReduce

Hadoop MapReduce creates a Directed Acyclic Graph (DAG) with exactly two predefined stages—Map and Reduce—for every job. A complex data processing algorithm in MapReduce may require multiple jobs to be executed sequentially, which prevents any optimization across jobs.

In contrast, Spark offers greater flexibility. It does not force the developer to break a complex algorithm into multiple jobs. A Spark DAG can contain any number of stages, allowing both simple jobs with just one stage and more complex jobs with several stages. This ability enables Spark to perform optimizations that are not possible in MapReduce.

Spark executes a multi-stage complex job in a single run, leveraging the knowledge of all stages to optimize the execution. For example, it can minimize disk I/O and data shuffling, which involves transferring data across the network and significantly increasing application execution time. By reducing these costly operations, Spark can improve overall job performance.


In [13]:
from pyspark.sql.functions import col, when, avg, sum

# Example: Complex Optimization in Spark vs. Hadoop MapReduce

# Extend the existing data with additional columns
data = [(1, 'Alice', 1000, 'HR'), (2, 'Bob', 2000, 'IT'), (3, 'Charlie', 3000, 'Finance')]
columns = ['ID', 'Name', 'Salary', 'Department']
df = spark.createDataFrame(data, columns)

# Transformation 1: Add a new column with salary categories
df = df.withColumn(
    'Salary Category',
    when(col('Salary') < 1500, 'Low')
    .when((col('Salary') >= 1500) & (col('Salary') < 2500), 'Medium')
    .otherwise('High')
)

# Transformation 2: Group by Department and Salary Category, and calculate aggregates
aggregated_df = df.groupBy('Department', 'Salary Category').agg(
    avg('Salary').alias('Average Salary'),
    sum('Salary').alias('Total Salary')
)

# Explanation:
# - Spark optimizes the execution by combining transformations into a single DAG.
# - It minimizes intermediate data writes to disk and reduces data shuffling.
# - This approach is significantly more efficient than Hadoop MapReduce, which would require multiple jobs for similar operations.

In [14]:
import time

# Measure the time taken for the Spark operation
start_time = time.time()

# Perform the Spark operation (already defined in the notebook)
aggregated_df.show()

end_time = time.time()
spark_execution_time = end_time - start_time
print(f"Time taken for Spark operation: {spark_execution_time:.2f} seconds")

# Note: To compare with MapReduce, you would need to implement the same logic using a MapReduce framework,
# which is outside the scope of this notebook since it requires a Hadoop setup.

+----------+---------------+--------------+------------+
|Department|Salary Category|Average Salary|Total Salary|
+----------+---------------+--------------+------------+
|        HR|            Low|        1000.0|        1000|
|        IT|         Medium|        2000.0|        2000|
|   Finance|           High|        3000.0|        3000|
+----------+---------------+--------------+------------+

Time taken for Spark operation: 0.30 seconds


---
### 4. Scalability in Spark

Spark is highly scalable, allowing you to increase the data processing capacity of a cluster simply by adding more nodes. You can start with a small cluster and, as your dataset grows, scale your infrastructure by adding more computing resources. This flexibility makes Spark an economical choice for handling growing datasets.

One of Spark's key features is that it automatically handles scaling without requiring any changes to the application code. When you add nodes to a Spark cluster, the application can take advantage of the additional resources without any code modifications, making it easy to scale as needed.

In [16]:
# Example: Demonstrating Spark's scalability by processing a large dataset

# Generate a large dataset with 100 million rows
large_df = spark.range(0, 100000000)

# Perform a simple transformation and action to demonstrate scalability
# Transformation: Add a new column with values doubled
transformed_df = large_df.withColumn("Doubled Value", col("id") * 2)

# Action: Count the number of rows in the transformed DataFrame
row_count = transformed_df.count()

print(f"Number of rows in the transformed DataFrame: {row_count}")

Number of rows in the transformed DataFrame: 100000000


Spark itself does not generate nodes; instead, it utilizes the nodes provided by the cluster manager. The process of adding more nodes to a Spark cluster depends on the cluster manager being used (e.g., Standalone, YARN, or Mesos). Here's how it works:

1. **Cluster Manager**: Spark relies on a cluster manager to allocate resources (CPU, memory, etc.) for its applications. The cluster manager is responsible for managing the worker nodes in the cluster.

2. **Adding Nodes**:
    - **Standalone Mode**: In Spark's standalone mode, you can add more worker nodes by starting additional worker processes on new machines and connecting them to the master node.
    - **YARN**: In a Hadoop YARN cluster, you can add more nodes by adding new machines to the Hadoop cluster and configuring them as YARN NodeManagers.
    - **Mesos**: In a Mesos cluster, you can add nodes by adding new machines to the Mesos cluster and configuring them as Mesos agents.

3. **Dynamic Allocation**: Spark supports dynamic resource allocation, which allows it to request additional executors (processes running on worker nodes) during runtime if the workload increases. This feature is particularly useful in shared clusters where resources are allocated on demand.

4. **Scaling**: When you add nodes to the cluster, the cluster manager makes these resources available to Spark. Spark can then distribute tasks across the additional nodes, increasing parallelism and improving performance.

In summary, Spark does not generate nodes itself; it depends on the cluster manager to provide and manage the nodes. To scale a Spark cluster, you need to add nodes to the underlying cluster infrastructure managed by the cluster manager.

---

### 5. Fault Tolerance in Spark

Spark is designed to be fault-tolerant, ensuring reliable execution even in the face of hardware failures. In a cluster with hundreds of nodes, the probability of a node failure on any given day is significant—whether due to a hard disk crash or other hardware issues. However, Spark automatically handles the failure of a node in the cluster, ensuring that the application continues running.

While the failure of a node may cause some performance degradation, it will not cause the application to crash. This built-in fault tolerance means that application developers do not need to explicitly handle node failures in their code, simplifying the application development process and increasing reliability.


---
### 6. Iterative Algorithms in Spark

Iterative algorithms are data processing algorithms that repeatedly iterate over the same data. Applications such as machine learning and graph processing commonly use iterative algorithms, running tens or even hundreds of iterations over the same dataset. Spark is particularly well-suited for these types of applications.

The reason iterative algorithms run efficiently on Spark is its in-memory computing capabilities. Spark allows applications to cache data in memory, so even if an iterative algorithm performs 100 iterations, it only needs to read the data from disk during the first iteration. Subsequent iterations can read the data from memory, which is typically **100 times faster** than reading from disk. This dramatically speeds up the execution of these applications, often resulting in orders of magnitude improvements in performance.

---
### 7. Interactive Data Analysis with Spark

Interactive data analysis involves exploring a dataset in real-time, allowing for quick insights before running long and resource-intensive batch processing jobs. For instance, before executing a time-consuming job that might run for hours, a data analyst might perform summary analysis on a large dataset. Similarly, business analysts often require the ability to interactively analyze data using BI or visualization tools, running multiple queries on the same data. Spark is an ideal platform for such interactive analysis of large datasets.

The key advantage of Spark in interactive analysis is its **in-memory computing capabilities**. When an application caches the data to be interactively analyzed, the first query will read data from disk, but subsequent queries will access the cached data in memory. Since reading from memory is orders of magnitude faster than reading from disk, Spark can dramatically reduce query execution time. A query that would normally take over an hour when reading from disk can often be completed in just a few seconds when the data is cached in memory.

---

### 8. High-level architecture

<div style="text-align: center;">
    <img src="spark_diagram1.png" width="500"/>
</div>


---

### Key Components in Spark

- **Workers**: A worker provides CPU, memory, and storage resources to a Spark application. Workers run Spark applications as distributed processes across a cluster of nodes, enabling parallel computation.

- **Cluster Managers**: Spark uses a cluster manager to acquire and manage cluster resources for executing jobs. A cluster manager, as the name implies, is responsible for managing computing resources across a cluster of worker nodes. It provides low-level scheduling of cluster resources and enables multiple applications to share resources, allowing them to run on the same worker nodes. Spark supports three cluster managers:
  - **Standalone**: Spark's native cluster manager.
  - **Mesos**: A general-purpose cluster manager.
  - **YARN**: The Hadoop cluster manager.

  Mesos and YARN allow Spark to run alongside Hadoop applications on the same worker nodes.

- **Driver Programs**: A driver program is an application that uses Spark as a library to process data. The driver provides the data processing code that Spark executes on the worker nodes. It can launch one or more jobs on a Spark cluster.

- **Executors**: An executor is a JVM (Java Virtual Machine) process created by Spark on each worker node for an application. It executes application code concurrently in multiple threads and can also cache data in memory or on disk. The lifespan of an executor is tied to the lifespan of the application. When the Spark application terminates, all executors associated with it are also terminated.

- **Tasks**: A task is the smallest unit of work that Spark sends to an executor. It is executed by a thread in an executor on a worker node. Each task performs computations to either return a result to the driver program or partition its output for shuffling. Spark creates one task per data partition, and an executor runs multiple tasks concurrently. The level of parallelism is determined by the number of partitions—more partitions lead to more tasks running in parallel.



---
### 9. Application execution

This section briefly describes how data processing code is executed on a Spark cluster.

#### Terminology

Before we dive into the execution details, let's define some key terms:

- **Shuffle**: A shuffle is the process of redistributing data across the nodes of a cluster. It is an expensive operation because it involves moving data over the network. However, a shuffle does not randomly distribute data; instead, it groups data elements into buckets based on specific criteria. Each bucket forms a new partition.
  
- **Job**: A job is a set of computations that Spark performs to return results to the driver program. Essentially, it represents the execution of a data processing algorithm on a Spark cluster. An application can launch multiple jobs. The specifics of how a job is executed will be covered later in this chapter.
  
- **Stage**: A stage is a collection of tasks. Spark splits a job into a Directed Acyclic Graph (DAG) of stages, and stages may depend on one another. For example, a job could be divided into two stages—stage 0 and stage 1—where stage 1 cannot begin until stage 0 has completed. Spark groups

---
 
 Reference: 'Big Data Analytics with Spark, Mohammed Guller'
