# Optimizing and Tuning Spark Applications
## Optimizing and Tuning Spark for Efficiency
In this chapter will only cover a handful of the most important and commonly tuned configurations. For a comprehensive list grouped by functional themes, you can peruse the <a href="https://spark.apache.org/docs/latest/configuration.html">documentation</a>.

### Viewing and Setting Apache Spark Configurations
There are three ways you can get and set Spark properties. The first is through a set of configuration files. In your deployment’s directory
`$SPARK_HOME`, there are a number of config files: `conf/spark-defaults.conf.template`, `conf/log4j.properties.template`, and `conf/spark-env.sh.template`. Changing the default values in these files and saving them without the .template suffix instructs Spark to use these new values.

The second way is to specify Spark configurations directly in your Spark application or on the command line when submitting the application with spark-submit, using the --conf flag:

Here’s how you would do this in the Spark application itself:

In [1]:
import findspark

# If you know spark path you can specify it as init function parameter
findspark.init()

In [2]:
from pyspark.sql import SparkSession

# Create a SparkSession
spark = (SparkSession
            .builder
            .appName("SparkOptimizingTuning")
            .getOrCreate())

In [3]:
spark.sparkContext.getConf().getAll()

[('spark.driver.extraJavaOptions',
  '-XX:+IgnoreUnrecognizedVMOptions --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED'),
 ('spark.app.startTime', '1671362818086'),
 ('spark.driver.host', 'yamen'),
 ('spark.executor.id', 'driver'),
 ('spark.app.submitTime', '1671362817924'),
 ('spark.app.id', 'local-1671362820512'),
 ('spark.rdd.compress', 'True'),
 ('s

In [4]:
spark.conf.set("spark.app.name", "SparkOptimizingAndTuning")

In [5]:
spark.conf.get("spark.app.name")

'SparkOptimizingAndTuning'

Among all the ways that you can set Spark properties, an order of precedence determines which values are honored. Any values or flags defined in `spark-defaults.conf` will be read first, followed by those supplied on the command line with sparksubmit, and finally those set via  `SparkSession` in the Spark application. All these properties will be merged, with any duplicate properties reset in the Spark application
taking precedence. Likewise, values supplied on the command line will supersede settings in the configuration file, provided they are not overwritten in the application itself.

### Scaling Spark for Large Workloads
Large Spark workloads are often batch jobs—some run on a nightly basis, while some are scheduled at regular intervals during the day. In either case, these jobs may process tens of terabytes of data or more.To avoid job failures due to resource starvation or gradual performance degradation, there are a handful of Spark configurations that you can enable or alter. These configurations affect three Spark components: the
Spark driver, the executor, and the shuffle service running on the executor.

The Spark driver’s responsibility is to coordinate with the cluster manager to launch executors in a cluster and schedule Spark tasks on them. With large workloads, you may have hundreds of tasks. This section explains a few configurations you can tweak or enable to optimize your resource utilization, parallelize tasks, and avoid bottlenecks for large numbers of tasks.

#### Static versus dynamic resource allocation
When you specify compute resources as command-line arguments to spark-submit, as we did earlier, you cap the limit. This means that if more resources are needed later as tasks queue up in the driver due to a larger than anticipated workload, Spark cannot accommodate or allocate extra resources.

If instead you use Spark’s dynamic resource allocation configuration, the Spark driver can request more or fewer compute resources as the demand of large workloads flows and ebbs. In scenarios where your workloads are dynamic—that is, they vary in their demand for compute capacity—using dynamic allocation helps to accommodate sudden peaks.

One use case where this can be helpful is streaming, where the data flow volume may be uneven. Another is on-demand data analytics, where you might have a high volume of SQL queries during peak hours. Enabling dynamic resource allocation allows Spark to achieve better utilization of resources, freeing executors when not in use and acquiring new ones when needed.

To enable and configure dynamic allocation, you can use settings like the following. Note that the numbers here are arbitrary; the appropriate settings will depend on the nature of your workload and they should be adjusted accordingly. Some of these configs cannot be set inside a Spark REPL(SHELL), so you will have to set them programmatically:

spark.dynamicAllocation.enabled true
spark.dynamicAllocation.minExecutors 2
spark.dynamicAllocation.schedulerBacklogTimeout 1m
spark.dynamicAllocation.maxExecutors 20
spark.dynamicAllocation.executorIdleTimeout 2min

By default `spark.dynamicAllocation.enabled` is set to `false`. When enabled with the settings shown here, the Spark driver will request that the cluster manager create two executors to start with, as a minimum `(spark.dynamicAllocation.minExecutors)`. As the task queue backlog increases, new executors will be requested each time the backlog timeout `(spark.dynamicAllocation.schedulerBacklogTimeout)` is exceeded. In this case, whenever there are pending tasks that have not been scheduled for over 1 minute, the driver will request that a new executor be launched to schedule backlogged tasks, up to a maximum of 20 `(spark.dynamicAllocation.maxExecutors)`. By contrast, if an executor finishes a task and is idle for 2 minutes `(spark.dynamicAllocation.executorIdleTimeout)`, the Spark driver will terminate it.

#### Configuring Spark executors’ memory and the shuffle service
Simply enabling dynamic resource allocation is not sufficient. You also have to understand how executor memory is laid out and used by Spark so that executors are not starved of memory or troubled by JVM garbage collection.

The amount of memory available to each executor is controlled by `spark.executor.memory`. This is divided into three sections, `execution memory`, `storage memory`, and `reserved memor`y. The default division is `60% for execution memory` and `40% for storage`, after allowing for `300 MB for reserved memory`, to safeguard against OOM errors.
The Spark documentation advises that this will work for most cases, but you can adjust what fraction of `spark.executor.memory` you want either section to use as a baseline. When storage memory is not being used, Spark can acquire it for use in execution memory for execution purposes, and vice versa.
Execution memory is used for Spark shuffles, joins, sorts, and aggregations. Since different queries may require different amounts of memory, the fraction `(spark.memory.fraction is 0.6 by default)` of the available memory to dedicate to this can be tricky to tune but it’s easy to adjust. By contrast, storage memory is primarily used for caching user data structures and partitions derived from DataFrames.

In the below table, we capture a few recommended configurations to adjust so that the map, split, and merge processes during these operations are not encumbered by inefficient I/O and to enable these operations to employ buffer memory before writing the final shuffle partitions to disk. Tuning the shuffle service running on each executor can also aid in increasing overall performance for large Spark workloads.

| Configuration      | Default value, recommendation, and description |
| :----:      |    :----   |
| spark.driver.memory      | Default is 1g (1 GB). This is the amount of memory allocated to the Spark driver to receive data from executors. This is often changed during sparksubmit with --driver-memory. Only change this if you expect the driver to receive large amounts of data back from operations like collect(), or if you run out of driver memory.       |
| spark.shuffle.file.buffer   | Default is 32 KB. Recommended is 1 MB. This allows Spark to do more buffering before writing final map results to disk.        |
| spark.file.transferTo   | Default is true. Setting it to false will force Spark to use the file buffer to transfer files before finally writing to disk; this will decrease the I/O activity.        |
| spark.shuffle.unsafe.file.output.buffer   | Default is 32 KB. This controls the amount of buffering possible when merging files during shuffle operations. In general, large values (e.g., 1 MB) are more appropriate for larger workloads, whereas the default can work for smaller workloads.        |
| spark.io.compression.lz4.blockSize   | Default is 32 KB. Increase to 512 KB. You can decrease the size of the shuffle file by increasing the compressed size of the block.        |
| spark.shuffle.service.index.cache.size   | Default is 100m. Cache entries are limited to the specified memory footprint in byte.        |
| spark.shuffle.registration.timeout   | Default is 5000 ms. Increase to 120000 ms.        |
| spark.shuffle.registration.maxAttempts   | Default is 3. Increase to 5 if needed.        |

#### Maximizing Spark parallelism
Much of Spark’s efficiency is due to its ability to run multiple tasks in parallel at scale. To understand how you can maximize parallelism     i.e., read and process as much data in parallel as possible—you have to look into how Spark reads data into memory from storage and what partitions mean to Spark.
In data management parlance, a partition is a way to arrange data into a subset of configurable and readable chunks or blocks of contiguous data on disk. These subsets of data can be read or processed independently and in parallel, if necessary, by more than a single thread in a process. This independence matters because it allows for massive parallelism of data processing.

### How partitions are created
Spark’s tasks process data as partitions read from disk into memory. Data on disk is laid out in chunks or contiguous file blocks, depending on the store. By default, file blocks on data stores range in size from 64 MB to 128 MB. For example, on HDFS and S3 the default size is 128 MB
(this is configurable). A contiguous collection of these blocks constitutes a partition. The size of a partition in Spark is dictated by spark.`sql.files.maxPartitionBytes`. The default is 128 MB.

Partitions are also created when you explicitly use certain methods of the DataFrame API. For example, while creating a large DataFrame or reading a large file from disk, you can explicitly instruct Spark to create a certain number of partitions:

In [6]:
numDF = spark.range(1000 * 1000).repartition(8)
numDF.rdd.getNumPartitions

<bound method RDD.getNumPartitions of MapPartitionsRDD[8] at javaToPython at NativeMethodAccessorImpl.java:0>

Finally, shuffle partitions are created during the shuffle stage. By default, the number of shuffle partitions is set to 200 in `spark.sql.shuffle.partitions`. You can adjust this number depending on the size of the data set you have, to reduce the amount of small partitions being sent across the network to executors’ tasks.

<b>Note!</b> The default value for `spark.sql.shuffle.partitions` is too high for smaller or streaming workloads; you may want to reduce it to a
lower value such as the number of cores on the executors or less.

Created during operations like groupBy() or join(), also known as wide transformations, shuffle partitions consume both network and disk I/O resources. During these operations, the shuffle will spill results to executors’ local disks at the location specified in `spark.local.directory`. Having performant SSD disks for this operation will boost the performance.

There is no magic formula for the number of shuffle partitions to set for the shuffle stage; the number may vary depending on your use case, data set, number of cores, and the amount of executor memory available—it’s a trial-and-error approach.

<b>Note!</b> Check the follwing tow links for more information <a href="https://www.youtube.com/watch?v=5dga0UT4RI8&ab_channel=Databricks">Tuning Apache Spark for Large Scale Workloads</a>, <a href="https://www.youtube.com/watch?v=6BD-Vv-ViBw&t=645s&ab_channel=Databricks">Hive Bucketing in Apache Spark</a>.

## Caching and Persistence of Data
What is the difference between caching and persistence? In Spark they are synonymous. Two API calls, `cache()` and `persist()`, offer these  capabilities. The latter provide more control over how and where your data is stored—in memory and on disk, serialized and unserialized. Both contribute to better performance for frequently accessed DataFrames or tables.

### DataFrame.cache()
`cache()` will store as many of the partitions read in memory across Spark executors as memory allows. While a DataFrame may be fractionally cached, partitions cannot be fractionally cached (e.g., if you have 8 partitions but only 4.5 partitions can fit in memory, only 4 will be cached). However, if not all your partitions are cached, when you want to access the data again, the partitions that are not cached will have to be recomputed, slowing down your Spark job.

Let’s look at an example of how caching a large DataFrame improves performance when accessing a DataFrame:

In [7]:
from pyspark.sql.functions import col

df = spark.range(1 * 10000000).toDF("id").withColumn("square", col("id") * col("id"))
df.cache() # Cache the data
df.count() # Materialize the cache

10000000

In [8]:
df.count()

10000000

<b>Note!</b> When you use `cache()` or `persist()`, the DataFrame is not fully cached until you invoke an action that goes through every record
(e.g., count()). If you use an action like `take(1)`, only one partition will be cached because Catalyst realizes that you do not need to compute all the partitions just to retrieve one record.

### DataFrame.persist()
`persist(StorageLevel.LEVEL)` is nuanced, providing control over how your data is cached via StorageLevel. Below table summarizes the different storage levels. Data on disk is always serialized using either Java or Kryo serialization.

| StorageLevel      | Description |
| :----      |    :----   |
| MEMORY_ONLY      | Data is stored directly as objects and stored only in memory.       |
| MEMORY_ONLY_SER   | Data is serialized as compact byte array representation and stored only in memory. To use it, it has to be deserialized at a cost.        |
| MEMORY_AND_DISK   | Data is stored directly as objects in memory, but if there’s insufficient memory the rest is serialized and stored on disk.        |
| DISK_ONLY   | Data is serialized and stored on disk.        |
| OFF_HEAP   | Data is stored off-heap. Off-heap memory is used in Spark for storage and query execution.        |
| MEMORY_AND_DISK_SER   | Like MEMORY_AND_DISK, but data is serialized when stored in memory. (Data is always serialized when stored on disk.)        |

In [9]:
from pyspark.storagelevel import StorageLevel

df2 = spark.range(1 * 20000000).toDF("id").withColumn("square", col("id") * col("id"))
df2.persist(StorageLevel.DISK_ONLY) # Serialize the data and cache it on disk
df2.count() # Materialize the cache

20000000

In [10]:
df2.count() # Now get it from the cache

20000000

<b>Note!</b> As you can see in Spark UI under Storage, the data is persisted on disk, not in memory. To unpersist your cached data, just call DataFrame.unpersist().

In [11]:
df.createOrReplaceTempView("dfTable")

spark.sql("CACHE TABLE dfTable")

spark.sql("SELECT count(*) FROM dfTable").show()

+--------+
|count(1)|
+--------+
|10000000|
+--------+



### When to Cache and Persist
Common use cases for caching are scenarios where you will want to access a large data set repeatedly for queries or transformations. Some examples include:
- DataFrames commonly used during iterative machine learning training
- DataFrames accessed commonly for doing frequent transformations during ETL or building data pipelines

### When Not to Cache and Persist
Not all use cases dictate the need to cache. Some scenarios that may not warrant caching your DataFrames include:
- DataFrames that are too big to fit in memory
- An inexpensive transformation on a DataFrame not requiring frequent use, regardless of size

As a general rule you should use memory caching judiciously, as it can incur resource costs in serializing and deserializing, depending on the StorageLevel used.