## Viewing and Setting Apache Spark Configurations

There are three ways you can get and set Spark properties.

1. The first is through a set of
configuration files. In your deployment’s $SPARK_HOME directory (where you installed
Spark), there are a number of config files: conf/spark-defaults.conf.template, conf/
log4j.properties.template, and conf/spark-env.sh.template. Changing the default values
in these files and saving them without the .template suffix instructs Spark to use these
new values

2. The second way is to specify Spark configurations directly in your Spark application
or on the command line when submitting the application with spark-submit, using
the --conf flag:
```spark-submit --conf spark.sql.shuffle.partitions=5 --conf
"spark.executor.memory=2g" --class main.scala.chapter7.SparkConfig_7_1 jars/mainscala-
chapter7_2.12-1.0.jar
```

Here’s how you would do this in the Spark application itself:

```def main(args: Array[String]) {
// Create a session
val spark = SparkSession.builder
.config("spark.sql.shuffle.partitions", 5)
.config("spark.executor.memory", "2g")
.master("local[*]")
.appName("SparkConfig")
.getOrCreate()
printConfigs(spark)
spark.conf.set("spark.sql.shuffle.partitions",
spark.sparkContext.defaultParallelism)
println(" ****** Setting Shuffle Partitions to Default Parallelism")
printConfigs(spark)
```

3. The third option is through a programmatic interface via the Spark shell. As with
everything else in Spark, APIs are the primary method of interaction. Through the
SparkSession object, you can access most Spark config settings.

You can also view only the Spark SQL–specific Spark configs:

// In Scala
`spark.sql("SET -v").select("key", "value").show(5, false)`
 In Python
`spark.sql("SET -v").select("key", "value").show(n=5, truncate=False)`

|key|value |
|----|----|
|spark.sql.adaptive.enabled |false |
|spark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoin |0.2 |
|spark.sql.adaptive.shuffle.fetchShuffleBlocksInBatch.enabled|true |
|spark.sql.adaptive.shuffle.localShuffleReader.enabled |true |
|spark.sql.adaptive.shuffle.maxNumPostShufflePartitions |<"undefined">|


Alternatively, you can access Spark’s current configuration through the Spark UI’s
Environment tab, which we discuss later in this chapter, as read-only values

To set or modify an existing configuration programmatically, first check if the property
is modifiable. `spark.conf.isModifiable("<config_name>")` will return `true` or
`false`. All modifiable configs can be set to new values using the API:
In Python
```
>>> spark.conf.get("spark.sql.shuffle.partitions")
'200'
>>> spark.conf.set("spark.sql.shuffle.partitions", 5)
>>> spark.conf.get("spark.sql.shuffle.partitions")
'5'
```

## Spark properties, an order of precedence
Among all the ways that you can set Spark properties, an order of precedence determines
which values are honored. Any values or flags defined in spark-defaults.conf
will be read first, followed by those supplied on the command line with sparksubmit,
and finally those set via SparkSession in the Spark application. 

All these properties will be merged, with any duplicate properties reset in the Spark application
taking precedence. Likewise, values supplied on the command line will supersede settings
in the configuration file, provided they are not overwritten in the application itself.


## Scaling Spark for Large Workloads
there are a handful of Spark configurations that
you can enable or alter. These configurations affect three Spark components: the
**Spark driver, the executor, and the shuffle service** running on the executor.

### Static versus dynamic resource allocation
When you specify compute resources as command-line arguments to spark-submit,
as we did earlier, you cap the limit. This means that if more resources are needed later
as tasks queue up in the driver due to a larger than anticipated workload, Spark cannot
accommodate or allocate extra resources.

If instead you use Spark’s dynamic resource allocation configuration, the Spark driver
can request more or fewer compute resources as the demand of large workloads flows
and ebbs.

One use case where this can be helpful is streaming, where the data flow volume may
be uneven. Another is on-demand data analytics, where you might have a high volume
of SQL queries during peak hours. Enabling dynamic resource allocation allows
Spark to achieve better utilization of resources, freeing executors when not in use and
acquiring new ones when needed

To enable and configure dynamic allocation, you can use settings like the following.
Note that the numbers here are arbitrary; the appropriate settings will depend on the
nature of your workload and they should be adjusted accordingly. Some of these
configs cannot be set inside a Spark REPL, so you will have to set them
programmatically:

``` spark.dynamicAllocation.enabled true
spark.dynamicAllocation.minExecutors 2
spark.dynamicAllocation.schedulerBacklogTimeout 1m
spark.dynamicAllocation.maxExecutors 20
spark.dynamicAllocation.executorIdleTimeout 2min
```

By default spark.dynamicAllocation.enabled is set to false. When enabled with
the settings shown here, the Spark driver will request that the cluster manager create
two executors to start with, as a minimum (spark.dynamicAllocation.minExecu
tors). As the task queue backlog increases, new executors will be requested each time
the backlog timeout (spark.dynamicAllocation.schedulerBacklogTimeout) is
exceeded. In this case, whenever there are pending tasks that have not been scheduled
for over 1 minute, the driver will request that a new executor be launched to schedule
backlogged tasks, up to a maximum of 20 (spark.dynamicAllocation.maxExecu
tors). By contrast, if an executor finishes a task and is idle for 2 minutes
(spark.dynamicAllocation.executorIdleTimeout), the Spark driver will terminate
it.

### Configuring Spark executors’ memory and the shuffle service 

Simply enabling dynamic resource allocation is not sufficient. You also have to understand
how executor memory is laid out and used by Spark so that executors are not
starved of memory or troubled by JVM garbage collection.

The amount of memory available to each executor is controlled by
`spark.executor.memory`. This is divided into three sections, as depicted in
Figure 7-2: execution memory, storage memory, and reserved memory. The default
division is 60% for execution memory and 40% for storage, after allowing for 300 MB
for reserved memory, to safeguard against OOM errors. The Spark documentation
advises that this will work for most cases, but you can adjust what fraction of
spark.executor.memory you want either section to use as a baseline. When storage
memory is not being used, Spark can acquire it for use in execution memory for execution
purposes, and vice versa.

Execution memory is used for Spark shuffles, joins, sorts, and aggregations. Since different
queries may require different amounts of memory, the fraction (**spark.mem
ory.fraction is 0.6 by default**) of the available memory to dedicate to this can be
tricky to tune but it’s easy to adjust. By contrast, storage memory is primarily used for
caching user data structures and partitions derived from DataFrames.

During map and shuffle operations, Spark writes to and reads from the local disk’s
shuffle files, so there is heavy I/O activity. This can result in a bottleneck, because the
default configurations are suboptimal for large-scale Spark jobs. Knowing what configurations
to tweak can mitigate this risk during this phase of a Spark job.

In Table 7-1, we capture a few recommended configurations to adjust so that the map,
spill, and merge processes during these operations are not encumbered by inefficient
I/O and to enable these operations to employ buffer memory before writing the final
shuffle partitions to disk. Tuning the shuffle service running on each executor can
also aid in increasing overall performance for large Spark workloads

**Table 7-1,**
|setting|desc|
|-----|------|
|`spark.driver.memory` | Default is 1g (1 GB). This is the amount of memory allocated to the Spark driver to receive data from executors. This is often changed during sparksubmit with --driver-memory. Only change this if you expect the driver to receive large amounts of data back from operations like collect(), or if you run out of driver memory.|
|`spark.shuffle.file.buffer`| Default is 32 KB. Recommended is 1 MB. This allows Spark to do more buffering before writing final map results to disk.|
|`spark.file.transferTo` |Default is true. Setting it to false will force Spark to use the file buffer to transfer files before finally writing to disk; this will decrease the I/O activity.|
|`spark.shuffle.unsafe.file.output.buffer` |Default is 32 KB. This controls the amount of buffering possible when merging files during shuffle operations. In general, large values (e.g., 1 MB) are more appropriate for larger workloads, whereas the default can work for smaller workloads.|
|`spark.io.compression.lz4.blockSize` | Default is 32 KB. Increase to 512 KB. You can decrease the size of the shuffle file by increasing the compressed size of the block.|
|`spark.shuffle.service.index.cache.size` | Default is 100m. Cache entries are limited to the specified memory footprintin byte.|
|`spark.shuffle.registration.timeout` | Default is 5000 ms. Increase to 120000 ms.|
|`spark.shuffle.registration.maxAttempts`| Default is 3. Increase to 5 if needed.|

### Maximizing Spark parallelism
Spark will at best schedule a thread per task per
core, and each task will process a distinct partition. To optimize resource utilization
and maximize parallelism, **the ideal is at least as many partitions as there are cores on
the executor**

### How partitions are created. 
As mentioned previously, Spark’s tasks process data as partitions
read from disk into memory. Data on disk is laid out in chunks or contiguous
file blocks, depending on the store. By default, file blocks on data stores range in size
from 64 MB to 128 MB. For example, on HDFS and S3 the default size is 128 MB
(this is configurable). A contiguous collection of these blocks constitutes a partition.
The size of a partition in Spark is dictated by `spark.sql.files.maxPartitionBytes`.
The default is 128 MB. You can decrease the size, but that may result in what’s known
as the “small file problem”—many small partition files, introducing an inordinate
amount of disk I/O and performance degradation thanks to filesystem operations
such as opening, closing, and listing directories, which on a distributed filesystem can
be slow.

Partitions are also created when you explicitly use certain methods of the DataFrame
API. For example, while creating a large DataFrame or reading a large file from disk,
you can explicitly instruct Spark to create a certain number of partitions

// In Scala
val ds = spark.read.textFile("../README.md").repartition(16)
ds: org.apache.spark.sql.Dataset[String] = [value: string]
ds.rdd.getNumPartitions
res5: Int = 16
val numDF = spark.range(1000L * 1000 * 1000).repartition(16)
numDF.rdd.getNumPartitions
numDF: org.apache.spark.sql.Dataset[Long] = [id: bigint]
res12: Int = 16

Finally, shuffle partitions are created during the shuffle stage. By default, the number
of shuffle partitions is set to 200 in spark.sql.shuffle.partitions. You can adjust
this number depending on the size of the data set you have, to reduce the amount of
small partitions being sent across the network to executors’ tasks

**The default value for spark.sql.shuffle.partitions is too high
for smaller or streaming workloads; you may want to reduce it to a
lower value such as the number of cores on the executors or less.**

Created during operations like groupBy() or join(), also known as wide transformations,
shuffle partitions consume both network and disk I/O resources. During these
operations, the shuffle will spill results to executors’ local disks at the location specified
in `spark.local.directory`. Having performant SSD disks for this operation will
boost the performance.

There is no magic formula for the number of shuffle partitions to set for the shuffle
stage; the number may vary depending on your use case, data set, number of cores,
and the amount of executor memory available—it’s a trial-and-error approach

In addition to scaling Spark for large workloads, to boost your performance you’ll
want to consider caching or persisting your frequently accessed DataFrames or tables.
We explore various caching and persistence options in the next section.

## Caching and Persistence of Data
What is the difference between caching and persistence? In Spark they are synonymous.
Two API calls, `cache()` and `persist()`, offer these capabilities. The latter provides
more control over how and where your data is stored—in memory and on disk,
serialized and unserialized. Both contribute to better performance for frequently
accessed DataFrames or tables.

#### DataFrame.cache()
cache() will store as many of the partitions read in memory across Spark executors
as memory allows (see Figure 7-2). While a DataFrame may be fractionally cached,
partitions cannot be fractionally cached (e.g., if you have 8 partitions but only 4.5
partitions can fit in memory, only 4 will be cached). However, if not all your partitions
are cached, when you want to access the data again, the partitions that are not
cached will have to be recomputed, slowing down your Spark job.
Let’s look at an example of how caching a large DataFrame improves performance
when accessing a DataFrame:

```
// In Scala
// Create a DataFrame with 10M records
val df = spark.range(1 * 10000000).toDF("id").withColumn("square", $"id" * $"id")
df.cache() // Cache the data
df.count() // Materialize the cache
res3: Long = 10000000
Command took 5.11 seconds
df.count() // Now get it from the cache
res4: Long = 10000000
Command took 0.44 seconds
```
The first count() materializes the cache, whereas the second one accesses the cache,
resulting in a close to 12 times faster access time for this data set.
When you use cache() or persist(), the DataFrame is not fully
cached until you invoke an action that goes through every record
(e.g., count()). If you use an action like take(1), only one partition
will be cached because Catalyst realizes that you do not need
to compute all the partitions just to retrieve one record.

Observing how a DataFrame is stored across one executor on a local host, as displayed
in Figure 7-4, we can see they all fit in memory (recall that at a low level Data‐
Frames are backed by RDDs).

#### DataFrame.persist()
persist(StorageLevel.LEVEL) is nuanced, providing control over how your data is
cached via StorageLevel. Table 7-2 summarizes the different storage levels. Data on
disk is always serialized using either Java or Kryo serialization.


|StorageLevel| Description|
|----|----|
|MEMORY_ONLY| Data is stored directly as objects and stored only in memory.|
|MEMORY_ONLY_SER |Data is serialized as compact byte array representation and stored only in memory. To use it, it has to be deserialized at a cost.|
|MEMORY_AND_DISK |Data is stored directly as objects in memory, but if there’s insufficient memory the rest is serialized and stored on disk.|
|DISK_ONLY| Data is serialized and stored on disk.|
|OFF_HEAP| Data is stored off-heap. Off-heap memory is used in Spark for storage and query execution; see “Configuring Spark executors’ memory and the shuffle service” on page 178.|
|MEMORY_AND_DISK_SER |Like MEMORY_AND_DISK, but data is serialized when stored in memory. (Data is always serialized when stored on disk.)|

**note** 
== Each StorageLevel (except OFF_HEAP) has an equivalent LEVEL_NAME_2, which means replicate twice on two different Spark executors: MEMORY_ONLY_2, MEMORY_AND_DISK_SER_2, etc. While this option is expensive, it allows data locality in two places, providing fault tolerance and giving Spark the option to schedule a task local to a copy of the data ==

Let’s look at the same example as in the previous section, but using the persist()
method:
```
// In Scala
import org.apache.spark.storage.StorageLevel
// Create a DataFrame with 10M records
val df = spark.range(1 * 10000000).toDF("id").withColumn("square", $"id" * $"id")
df.persist(StorageLevel.DISK_ONLY) // Serialize the data and cache it on disk
df.count() // Materialize the cache
res2: Long = 10000000
Command took 2.08 seconds
df.count() // Now get it from the cache
res3: Long = 10000000
Command took 0.38 seconds
```
As you can see from Figure 7-5, the data is persisted on disk, not in memory. To
unpersist your cached data, just call `DataFrame.unpersist()`.

Finally, not only can you cache DataFrames, but you can also cache the tables or
views derived from DataFrames. This gives them more readable names in the Spark
UI. For example:
```
// In Scala
df.createOrReplaceTempView("dfTable")
spark.sql("CACHE TABLE dfTable")
spark.sql("SELECT count(*) FROM dfTable").show()
+--------+
|count(1)|
+--------+
|10000000|
+--------+
Command took 0.56 seconds
```

#### When to Cache and Persist
Common use cases for caching are scenarios where you will want to access a large
data set repeatedly for queries or transformations. Some examples include:
• DataFrames commonly used during iterative machine learning training
• DataFrames accessed commonly for doing frequent transformations during ETL
or building data pipelines

#### When Not to Cache and Persist
Not all use cases dictate the need to cache. Some scenarios that may not warrant caching
your DataFrames include:
• DataFrames that are too big to fit in memory
• An inexpensive transformation on a DataFrame not requiring frequent use,
regardless of size
As a general rule you should use memory caching judiciously, as it can incur resource
costs in serializing and deserializing, depending on the StorageLevel used.

## A Family of Spark Joins

Spark has five distinct join strategies by which it exchanges, moves, sorts, groups, and
merges data across executors: the 
```
broadcast hash join (BHJ), 
shuffle hash join (SHJ),
shuffle sort merge join (SMJ), 
broadcast nested loop join (BNLJ), and 
shuffle-andreplicatednested loop join (a.k.a. Cartesian product join)
```
We’ll focus on only two of
these here (BHJ and SMJ), because they’re the most common ones you’ll encounter


### Broadcast Hash Join
Also known as a map-side-only join, the broadcast hash join is employed when two
data sets, one small (fitting in the driver’s and executor’s memory) and another large
enough to ideally be spared from movement, need to be joined over certain conditions
or columns. Using a Spark broadcast variable, the smaller data set is broadcasted
by the driver to all Spark executors, as shown in Figure 7-6, and subsequently
joined with the larger data set on each executor. This strategy avoids the large
exchange.

By default Spark will use a broadcast join if the smaller data set is less than 10 MB.
This configuration is set in `spark.sql.autoBroadcastJoinThreshold`; you can
decrease or increase the size depending on how much memory you have on each
executor and in the driver. If you are confident that you have enough memory you
can use a broadcast join with DataFrames larger than 10 MB (even up to 100 MB).

For example, consider a simple case where you have a large data set of soccer
players around the world, playersDF, and a smaller data set of soccer clubs they play
for, clubsDF, and you wish to join them over a common key:
```
// In Scala
import org.apache.spark.sql.functions.broadcast
val joinedDF = playersDF.join(broadcast(clubsDF), "key1 === key2")
```
note: *In this code we are forcing Spark to do a broadcast join, but it will
resort to this type of join by default if the size of the smaller data set
is below the spark.sql.autoBroadcastJoinThreshold.*

The BHJ is the easiest and fastest join Spark offers, since it does not involve any shuffle
of the data set; all the data is available locally to the executor after a broadcast. You
just have to be sure that you have enough memory both on the Spark driver’s and the
executors’ side to hold the smaller data set in memory.
At any time after the operation, you can see in the physical plan what join operation
was performed by executing:

`joinedDF.explain(mode)`

#### When to use a broadcast hash join

Use this type of join under the following conditions for maximum benefit:
• When each key within the smaller and larger data sets is hashed to the same partition
by Spark
• When one data set is much smaller than the other (and within the default config
of 10 MB, or more if you have sufficient memory)
• When you only want to perform an equi-join, to combine two data sets based on
matching unsorted keys
• When you are not worried by excessive network bandwidth usage or OOM
errors, because the smaller data set will be broadcast to all Spark executors
Specifying a value of `-1 in spark.sql.autoBroadcastJoinThreshold` will cause
Spark to always resort to a shuffle sort merge join, which we discuss in the next
section.


#### Shuffle Sort Merge Join
The sort-merge algorithm is an efficient way to merge two large data sets over a common
key that is sortable, unique, and can be assigned to or stored in the same partition—
that is, two data sets with a common hashable key that end up being on the
same partition. From Spark’s perspective, this means that all rows within each data set
with the same key are hashed on the same partition on the same executor. Obviously,
this means data has to be colocated or exchanged between executors.

##### Optimizing the shuffle sort merge join
We can eliminate the Exchange step from this scheme if we create partitioned buckets
for common sorted keys or columns on which we want to perform frequent equijoins.
That is, we can create an explicit number of buckets to store specific sorted columns
(one key per bucket). Presorting and reorganizing data in this way boosts
performance, as it allows us to skip the expensive Exchange operation and go straight
to WholeStageCodegen.

##### When to use a shuffle sort merge join
Use this type of join under the following conditions for maximum benefit:
• When each key within two large data sets can be sorted and hashed to the same
partition by Spark
• When you want to perform only equi-joins to combine two data sets based on
matching sorted keys
• When you want to prevent Exchange and Sort operations to save large shuffles
across the network
So far we have covered operational aspects related to tuning and optimizing Spark,
and how Spark exchanges data during two common join operations. We also demonstrated
how you can boost the performance of a shuffle sort merge join operation by
using bucketing to avoid large exchanges of data

