In [None]:
%%HTML
<link rel="stylesheet" type="text/css" href="https://fonts.googleapis.com/css?family=Quicksand:300,700" />
<link rel="stylesheet" type="text/css" href="https://fonts.googleapis.com/css?family=Fira Code" />
<link rel="stylesheet" type="text/css" href="rise.css">

In [1]:
app_name = "spark-3"
master   = "spark://spark-master:7077"
data_dir = 'data/weather'

import os
import pyspark
import pyspark.sql.functions as sf
import pyspark.sql.types as st

spark = (
    pyspark.sql.SparkSession.builder
    .appName(app_name)
    .master(master) 
    .getOrCreate()
)
spark

def generate_data(row_count=100):
    counter = 0
    while counter < row_count:
        yield [random.randint(0, 300),''.join(random.choice('abcdefghijklmnopqrstuvwxyz') for i in range(10))]
        counter += 1

def count_rows_in_partitions(df):
    def count_in_a_partition(iterator):
      yield sum(1 for _ in iterator)

    return df.rdd.mapPartitions(count_in_a_partition).collect()

spark

# Spark Advanced


![footer_logo_new](images/logo_new.png)

## Overview
- Partitions
- The anatomy of a Spark job
- Narrow and wide transformation (and why you should care).
- How Spark reads data (from HDFS).
- The Catalyst optimizer.
- Caching and persistence.

## Partitions

In order to allow for distributed computing, the data needs to be split up in some way. We call this partitioning. Spark has some built-in ways of controlling the partitioning:

 - Changing to a fixed number of partitions.
 - Hash-based distribution across partitions.
 - Range-based distribution across partitions.

Before we dive into partitioning, note that we already saw (the effects of) partitioning before.
<img src="images/partitioning.png" width="70%" align="left"/>

Moreover, when reading from data formats like Parquet, you automatically get partitions, as they were stored on disk.

For all the partitioning methods mentioned, if you do not specify a number of partitions, Spark will default to whatever has been configured in `spark.default.parallelism`. This config setting is set to 200 by default.

#### Our Data

In [2]:
import random
sdf1 = spark.createDataFrame(generate_data(row_count=1000), schema=['number', 'letter'])
sdf1.toPandas()

Unnamed: 0,number,letter
0,33,xqgogmhaqo
1,201,jeeqthyxii
2,259,kbcvfrsyca
3,126,dnkdzvurqr
4,193,bgqpbrqiax
...,...,...
995,34,vffmougvrn
996,47,qmdkvvjdzv
997,248,qljsbthqrp
998,273,ebjuygmtww


In [8]:
sdf1.rdd.getNumPartitions()

2

In [10]:
count_rows_in_partitions(sdf1)

[500, 500]

In [11]:
spark.sparkContext.defaultParallelism

2

#### Why do we end up with 2 partitions?

When we conjure up a new DataFrame, Spark uses the `defaultParallelim` setting from the underlying Spark Context, which in turn is configured from the `spark.default.parallelism` setting, whose default depends on the cluster mode. In our mode local mode this means:

In [6]:
spark.sparkContext.master, spark.sparkContext.defaultParallelism

('spark://spark-master:7077', 2)

Some DataFrame transformations implicitly repartition the data. When this happens the value of `spark.sql.shuffle.partitions` (default: 200) determines the number of partitions from that point onwards.

The older RDD-based API had different rules; beware that the documentation and a lot of Internet information was incorrect about how this worked. If you find yourself needing to understand this, test and verify for your situation.

### Controlling Partition Count

In the last module we saw the `.partition()` transformation: this sets a fixed number of partitions from this point onwards until another transformation implictly changes it.

In [15]:
num_part = sdf1.repartition(5)
print(num_part.rdd.getNumPartitions())
print(count_rows_in_partitions(num_part))

5
[200, 200, 200, 200, 200]


Upsides:

+ Very simple.

Downsides:

+ You haven't specified how the data will be distributed across the partitions.
+ It can be hard to choose a good number. (Spark can't choose one for you; it's a hard problem.)

   - Too many partitions, and the overhead of an action goes up: lots of tasks doing not very much.
   - Too few partitions, and the task may have "too much" work to and run out of memory.

### Range Partitioning

Next to the number of partitions, we can control how the data is distributed across them. Range partitioning:

 - Orders the dataframe by the columns given.
 - Allocates the data based on ranges of values.

In [13]:
range_part = sdf1.repartitionByRange(sf.col("number"))
print(range_part.rdd.getNumPartitions())
print(count_rows_in_partitions(range_part))

200
[7, 3, 9, 7, 4, 4, 3, 5, 6, 2, 7, 3, 7, 5, 3, 5, 8, 2, 6, 4, 8, 2, 8, 5, 2, 5, 9, 1, 5, 6, 5, 6, 3, 8, 6, 1, 5, 8, 6, 3, 5, 5, 5, 5, 4, 5, 5, 5, 4, 11, 4, 4, 3, 8, 4, 1, 7, 9, 4, 3, 3, 5, 4, 7, 5, 3, 5, 5, 6, 9, 4, 6, 2, 4, 6, 3, 8, 3, 4, 5, 8, 4, 3, 9, 5, 4, 4, 5, 4, 5, 5, 5, 9, 2, 5, 5, 3, 11, 6, 2, 5, 4, 4, 5, 8, 2, 5, 6, 2, 7, 3, 9, 5, 2, 4, 5, 8, 5, 2, 5, 10, 4, 3, 4, 5, 4, 8, 2, 7, 3, 5, 5, 6, 5, 6, 5, 4, 7, 3, 10, 1, 4, 5, 6, 4, 6, 5, 4, 9, 3, 2, 7, 7, 3, 7, 2, 8, 4, 8, 3, 1, 5, 5, 5, 5, 5, 5, 7, 4, 6, 7, 3, 5, 11, 5, 3, 3, 2, 4, 5, 7, 3, 5, 6, 8, 5, 3, 4, 5, 4, 6, 4, 5, 8, 5, 2, 7, 4, 5, 4]


In [None]:
range_part_letter = sdf1.repartitionByRange(sf.col("letter"))
print(range_part_letter.rdd.getNumPartitions())
print(count_rows_in_partitions(range_part_letter))

Upsides:

+ Relatively easy.

Downsides:

+ You need to know your data quite well.
+ Duplicate keys can cause skew, making 1 range very popular, and thus the partition very large
+ In order to determine the ranges, Spark needs to sample your data. This actually means evaluating the DataFrame/RDD. Therefore, repartitioning by range is actually a transformation __and__ an action.
+ Each record in the dataframe will be shuffled to the right partition based on the range of the key.

### Hash Partitioning

The normal partitioning is hash-based. Hashing column values _hopefully_ produces a uniform distribution without any sampling. This is the default mechanism.

In [21]:
hash_part = sdf1.repartition(sf.col("number"))
print(hash_part.rdd.getNumPartitions())
# print(count_rows_in_partitions(hash_part))

2


In [20]:
hash_part_letter = sdf1.repartition(10, sf.col("letter"))
print(hash_part_letter.rdd.getNumPartitions())
# print(count_rows_in_partitions(hash_part_letter))

10


Upsides:
+ Works out of the box, without complex configuration.

Downsides:
+ You need to pick your partition key wisely, or you may end up with very uneven distribution of data.

### Partition Skew
Each type of partitioning has its own advantages and disadvantages. Dealing with skewed partitions can be very challenging. A few general tips:
+ If you have skewed partitions, try to figure out what in your data is causing it. In some cases, adding an extra column to partition on can help you out.
+ Consider if you can hardcode the number of partitions. Note that this may cause downstream operations to require huge amounts of shuffling as the data will be spread randomly.

### Pro-Tip

_Salting_ is a technique for reducing partition skew:

1. Add a column containing a random number.
2. Include it in your set of columns to repartition.
3. Drop it after the repartition.

In [None]:
hash_salty = sdf1 \
   .withColumn("salt", sf.rand(seed=0)) \
   .repartition(sf.col("salt"), sf.col("number")) \
   .drop("salt")

print(hash_salty.rdd.getNumPartitions())
print(count_rows_in_partitions(hash_salty))

## The Anatomy of a Spark Job
<center><img src="images/spark-job-anatomy.png"></center>

## The Anatomy of a Spark Job
+ __Application__: A `SparkSession` or `SparkContext` object
+ __Job__: Each action creates a Spark *job*
+ __Stage__: Each stage represents one wide transformation (i.e. every wide transformation introduces a new stage
+ __Task__: One parallellizable unit of computation. There is one task per partition per stage.

## Narrow and wide operations (and why you should care)

Let's consider a `filter` operation:

<center>
<img src="images/narrow_op.svg" width="70%" />
</center>

This is called a **narrow** operation, as there is no shuffling between the workers. Spark reserves the right to combine subsequent narrow operations together.

It means that in case we do a `select` afterwards multiplying the second column by 2, Spark would not do this:

<center>
<img src="images/narrow_op_two.svg" width="80%" />
</center>

but this:

<center>
<img src="images/pipelining.svg" width="70%" />
</center>

The above is a **fundamental** difference compared to Hadoop, where subsequent Map (and Reduce) operations always needed to write to disk. Instead, Spark combines the two operations in memory, resulting in much higher efficiency.

If, on the other hand, we have an operation such as `groupBy`, we need to shuffle between machines. This is called a **wide** operation, and Spark cannot combine them together like **narrow** operations. For example (note that we call the two machines on the right **D** and **E** because they could be completely different workers)

<center>
<img src="images/groupBy.svg" width="80%" />
</center>

## How Spark reads data (from HDFS)

First of all we should take a look at how HDFS store the data. In principle it works like this:

![hdfs](images/hdfs.svg)

When you read `Lorem.txt` in Spark, each operation on a **block** becomes a **task**. The larger the file, the more blocks you have, the higher number of Spark tasks you have.

<center>
<img src="images/spark_count.svg"/>
</center>

## The Catalyst optimizer

People could write books about the Catalyst optimizer, but the concept is as follows: Spark can not only chain narrow operations together, but also shuffle the operations order. Let's assume we have the following code (don't worry if you don't grok the API yet):

```python
(
    df.groupBy('name')
      .agg(sf.mean('amount').alias('avg_amount'))
      .filter(sf.col('name') == 'name_1')
)
```



This is a very stupid thing to do, because we should really write this:

```python
(
    df.filter(sf.col('name') == 'name_1')
      .groupBy('name')
      .agg(sf.mean('amount').alias('avg_amount'))
)
```
Luckily the Catalyst optimizer takes care of the above shuffling by itself, resulting in quicker execution.

The two most common things we talk about are:

 - Predicate pushdown;
 - Projection pushdown.

These are both ways of discarding data as early in the process as possible. Spark can even push them down to the data-sources that are reading data:

 - Parquet in particular will only read columns that are needed (projection pushdown);
 - The latest Avro source will skip the rest of a record if it determines it is being filtered (predicate pushdown).

Many more optimisations are supported, and these also end up combination with specialized execution strategies for common combinations of operation.

The exact optimizations are outside the scope of this training, but if you look into it you'll discover that you can implement and plug in your own optimizations.

![catalyst](images/catalyst.png)

Cost-based optimization relies on table/column statistics, which aren't always available. So more often you only see rule-based optimizations. Note that sometimes you may wish to disable the cost-based optimizer: it can be unpredictable and in production sometimes slower but more predictable is what you want.

## SparkUI
Detailed overview of your running job
- Console prints the URL when starting up Spark.
- Each spark-shell/spark application will start a new SparkUI with a higher port number.
- The default (first) SparkUI http://localhost:4040

![SparkUI](images/sparkUI-jobs.png)

![SparkUI](images/SparkUI-stages.png)
- Spark action are executed through multiple stages.
- Track the progress of the stages.
- View your accumulators by clicking on the description.

![SparkUI](images/SparkUI-storage.png)


![SparkUI](images/SparkUI-executors.png)
- Spark can run on multiple executors.
- All executors are listed here.
- You find the logs of the executors here.

## Caching and Persistence

What happens when you call `count`, then `filter`, and then `count` again?



Calling `count` again will execute all the transformations (including reading from file) twice!!

Persisting a DataFrame to memory will cache the data to avoid recomputing.

This also helps in algorithms which execute functions iterative on a DataFrame like KMeans for example.


![persistence](images/persistence.svg)

### `cache()`: Persists DataFrame with the default storage level

Tell Spark to (temporarily) store the results so it doesn't have to recompute.

Note:
- You'll keep all resources allocated to you so far.
You could permanently hog all the resources if you share a cluster.
- Keeping stuff in memory means less room for computations.
You may get weird Spark errors.

In [None]:
sdf1 = spark.createDataFrame([[1, 'a'], [2, 'b'], [2,  'c'], [3, 'd']],
                             schema=['number', 'letter'])
sdf1.cache()

Stop! Go check the Spark UI, tab *Storage*. I'll wait here! What do you see?

Wait, Spark is lying to us! We asked it to `cache` but there's nothing in storage! The reason is that `cache` is not an action, and Spark is lazy!

Try invoking `count()`:

In [None]:
sdf1.count()

Go back to the Spark UI. What do you see now?

It is important to note that **you need to trigger an action, i.e. a computation** in order to cache your DataFrame.

<center>
<img src="images/caching.svg" width="80%" />
</center>

But where are we caching? By default calling `.cache` will cache a DataFrame in memory. There are other options accessible via the `.persist` method, namely:

- `MEMORY_ONLY`
- `DISK_ONLY`
- `MEMORY_AND_DISK`
- `MEMORY_ONLY_SER`
- `MEMORY_AND_DISK_SER`
- `MEMORY_ONLY_2`
- `DISK_ONLY_2`
- `MEMORY_AND_DISK_2`
- `MEMORY_ONLY_SER_2`
- `MEMORY_AND_DISK_SER_2`
- `OFF_HEAP`

- `MEMORY_ONLY` is the default;
- `.cache()` is the same as `persist(MEMORY_ONLY)`
- `_SER` means store in Serialized (binary) form;
- `_2` means replicate the storage over 2 executor nodes;
- `OFF_HEAP` is experimental feature.

# Summary

In this chapter we learned about:
- What components are in a Spark Job
- Narrow and wide operations (and why you should care);
- How Spark reads data (from HDFS);
- The Catalyst optimizer;
- Caching and persistence.

## Exercise - Catalyst advanced

For more detail on how Catalyst works, read the following blog on a deep dive into the Catalyst optimizer:
https://databricks.com/blog/2015/04/13/deep-dive-into-spark-sqls-catalyst-optimizer.html. 

If you're feeling more adventurous, you can also read the accompanying paper: http://people.csail.mit.edu/matei/papers/2015/sigmod_spark_sql.pdf.

After reading the material, see if you can answer the following questions:

1. How does Catalyst represent your computation interally? What is the main advantage of this representation?
2. Can you draw this representation for the Heroes computation from the previous exercise?
3. Can you explain what the below optimizations do?
    * Constant folding
    * Predicate pushdown
    * Projection pruning
    * Null propagation
    * Boolean expression simplification
4. Which of these optimizations were used by Spark to improve performance in the Heroes example?
5. Which abstract rules do these represent? (Try to define the rule in terms of operations on the Catalyst computation structure.)