In [1]:
"""
font-family

"objektiv-mk1",Helvetica,Arial,sans-serif !important

"Helvetica Neue", Helvetica, Roboto, Arial, sans-serif

Linotte Light

Clear Sans
"""

print('font-family')

font-family


# google: what is shuffle in spark

## [The main article to reference: Apache Spark - Performance](https://blog.scottlogic.com/2018/03/22/apache-spark-performance.html)

The task today is to process the London Cycle Hire data into <font color='red'>two separate sets, Weekends and Weekdays</font>. 

<font color='red'>Grouping data into smaller subsets for further processing</font> is a common business requirement

and we’re going to see how Spark can help us with the task.

## Partitioning

### Ideal Partitioning

<p style="text-align: center; font-family: 'objektiv-mk1,Helvetica,Arial,sans-serif,Linotte Light';">
    To <font color='#52bd86'>distribute work across the cluster</font> and <font color='#52bd86'>reduce the memory requirements of each node</font>, 
    Spark will <font color='red' size='5px'>split the data into smaller parts</font> called <font color='red' size='5px'>Partitions</font>.
</p>

<table>
    <tr>
        <td>
            <img src='https://blog.scottlogic.com/mdebeneducci/assets/Ideal-Partitioning.png' width='500'/>
            <p style="text-align: center; font-family: 'objektiv-mk1';"> Fig: Diagram of Ideal Partitioning </p>
        </td>
</table>

### Imperfect Partitioning - when data is split into partitions

#### Data Skew

<font color='#0091fa'>Often the data is split into partitions based on a key</font>, for instance the first letter of a name. 
If values are <font color='red'>not evenly distributed throughout this key</font> then more data will be placed in one partition than another. An example would be:



<table>
    <tr>
        <td style="white-space:pre-wrap; word-wrap:break-word">
            {Adam, Alex, Anja, Beth, Claire}
            -> A: {Adam, Alex, Anja}
            -> B: {Beth}
            -> C: {Clair}
        </td>
        <td>
            <img src='https://blog.scottlogic.com/mdebeneducci/assets/Skewed-Partitions.png' width='500'/>
            <p style="text-align: center;"> Fig: Diagram of Skewed Partitioning </p>
        </td>
    </tr>
</table>



- Here the A partition is 3 times larger than the other two, and therefore will take approximately 3 times as long to compute. <font color='#0091fa' size='3px'>As the next stage of processing cannot begin until all three partitions are evaluated, the overall results from the stage will be delayed.</font>

#### Scheduling

- If <font color='red' size='3px'>too few partitions to correctly cover the number of executors available</font>, this results in <font color='#0091fa'>Executor</font> 2 being <font color='#0091fa'>idle and unused for half the job time</font>.

<table>
    <tr>
        <td>
            <img src='https://blog.scottlogic.com/mdebeneducci/assets/Inefficient-Scheduling.png' width='500'/>
            <p style="text-align: center;"> Fig: Diagram of Badly Scheduled Partitioning </p>
        </td>
    </tr>
</table>


### Solution

The simplest solution to the above two problems is to <font color='#52bd86'>increase the number of partitions</font> used for computations. This will reduce the effect of skew into a single partition and will also allow better matching of scheduling to CPUs.

<font color='#0091fa' size='3px'>A common recommendation is to have 4 partitions per CPU</font>, <font color='red' size='5px'>however settings related to Spark performance are very case dependent, and so this value should be fine-tuned with your given scenario.</font>

## Shuffling


A <font color='red' size='5px'>shuffle</font> occurs when <font color='red' size='5px'>data is rearranged between partitions.</font>
This is required when a transformation requires information from other partitions, such as summing all the values in a column. <font color='#0091fa'>Spark will gather the required data from each partition and combine it into a new partition, likely on a different executor.</font>


<font color='red' size='5px'>During a shuffle, data is written to disk and transferred across the network, halting Spark’s ability to do processing in-memory and causing a performance bottleneck.</font> Consequently we want to try to reduce the number of shuffles being done or reduce the amount of data being shuffled.



<table>
    <tr>
        <td>
            <img src='https://blog.scottlogic.com/mdebeneducci/assets/Shuffle-Diagram.png' width='500'/>
            <p style="text-align: center;"> Fig: Diagram of Shuffling Between Executors </p>
        </td>
    </tr>
</table>

### Map-Side Reduction


When aggregating data during a shuffle, rather than pass all the data, it is <font color='#52bd86' size='3px'>preferred to combine the values in the current partition and pass only the result in the shuffle</font>. This process is known as <font color='#52bd86' size='3px'>**Map-Side Reduction**</font> and improves performance by reducing the quantity of data being transferred during a shuffle.


<table>
    <tr>
        <td>
            <img src='https://blog.scottlogic.com/mdebeneducci/assets/Map-Side-Reduction.png' width='500'/>
            <p style="text-align: center;"> Fig: Diagram of Map-Side Reduction </p>
        </td>
    </tr>
</table>

## In Practice


### Round 1

<font color='#52bd86' size='3px'>When the job is run, we see the repartition command does a shuffle and produces 200 partitions (the spark default)</font>, which should offer excellent levels of parallelisation; let’s look at the execution timeline.

- <font color='#ffb804'>The timeline does not look balanced.</font>

In this case it has occurred because <font color='#4cf0c2' size='4px'>calling `repartition` moves all values for the same key into the same partition on one Executor.</font> <font color='red' size='3px'>Here our key isWeekend is a boolean value, meaning that only two partitions will be populated with data. Spark is not able to account for this in its internal optimisation and therefore offers 198 other partitions with no data in them.</font> If we had more than two executors available, they would receive only empty partitions and would be idle throughout this process, greatly reducing the total throughput of the cluster.


Grouping in this fashion is also a common source of memory exceptions as, with a large data set, a single partition can easily be given multiple GBs of data and quickly exceed the allocated RAM. Therefore we must consider the likely proportion of data for each key we have chosen and how that correlates to our cluster.


---
data<font color='#40e0d0' size='5px'>.repartition(</font>data.col("isWeekend")<font color='#40e0d0' size='5px'>).write()</font>
        .parquet("cycle-data-results" + Time.now());
---

<table>
    <tr>
        <td>
            <img src='https://blog.scottlogic.com/mdebeneducci/assets/Unbalanced-Shuffles.png' width='800'/>
            <p style="text-align: center;"> Fig: 200 Partitions Execution Timeline and Metrics </p>
        </td>
    </tr>
</table>


### Round 2

To improve on the above issues, we need to make changes to our query so that it more evenly spreads the data across our partitions and executors.

<font color='red'>Another way of writing the 'repartition query' is to <font color='red' size='5px'>delegate the repartitioning to the write method</font>.</font>

### reason

- In the previous case Spark loaded the CSV files into 69 partitions, split these based on isWeekend and shuffled the results into 200 new partitions for writing. 

- In the new solution Spark still loads the CSVs into 69 partitions, however it is then able to <font color='red' size='5px'>skip the shuffle stage, realising that it can split the existing partitions based on the key and then write that data directly to parquet files</font>. Looking at the execution timeline, we can see a much healthier spread between the partitions and the nodes, and no shuffle occurring.

---
data<font color='red' size='5px'>.write().partitionBy(</font>"isWeekend"<font color='red' size='5px'>)</font>
        .parquet("cycle-data-results" + Time.now());
---

<table>
    <tr>
        <td>
            <img src='https://blog.scottlogic.com/mdebeneducci/assets/Better-Balancing.png' width='800'/>
            <p style="text-align: center;"> Fig: 200 Partitions Execution Timeline and Metrics </p>
        </td>
    </tr>
</table>

### Conclusion

> A repository with the example code can be found [here](https://github.com/MatdeB-SL/Spark-Performance---Cycle-Hire-Data)

### [repartition() vs partitionBy()](https://sparkbyexamples.com/pyspark/pyspark-repartition-vs-partitionby/)

- `repartition()` creates a specified number of partitions in memory

- `partitionBy()` will write files to disk for each memory partition and partition column. 

---

### [Difference between df.repartition and DataFrameWriter partitionBy?](https://stackoverflow.com/questions/40416357/difference-between-df-repartition-and-dataframewriter-partitionby)

--


The first part of the accepted answer is correct: calling `df`<font color='#40e0d0' size='5px'>.repartition(</font>COL, numPartitions=k<font color='#40e0d0' size='5px'>)</font> will <font color='#40e0d0' size='5px'>create a dataframe with k partitions using a hash-based partitioner</font>. <font color='#ffb804'>COL</font> here <font color='#ffb804'>defines the partitioning key</font>--it can be a single column or a list of columns. <font color='#40e0d0' size='3px'>The hash-based partitioner takes each input row's partition key, hashes it into a space of k partitions</font> via something like `partition = hash(partitionKey) % k`. This guarantees that all rows with the same partition key end up in the same partition. However, rows from multiple partition keys can also end up in the same partition (when a hash collision between the partition keys occurs) and some partitions might be empty.

In summary, the unintuitive aspects of `df.repartition(COL, numPartitions=k)` are that

- partitions will not strictly segregate partition keys

- some of your <font color='#40e0d0' size='5px'>k</font> partitions may be empty, whereas others may contain rows from multiple partition keys


--


The behavior of `df`<font color='red' size='5px'>.write.partitionBy</font> is quite different, in a way that many users won't expect. Let's say that you want your output files to be date-partitioned, and your data spans over 7 days. Let's also assume that df has 10 partitions to begin with. When you run df.write.partitionBy('day'), how many output files should you expect? The answer is 'it depends'. If each partition of your starting partitions in df contains data from each day, then the answer is 70. If each of your starting partitions in df contains data from exactly one day, then the answer is 10.

How can we explain this behavior? When you run <font color='red' size='5px'>df.write, each of the original partitions in df is written independently</font>. That is, each of your original 10 partitions is sub-partitioned separately on the 'day' column, and a separate file is written for each sub-partition.

I find this behavior rather annoying and wish there were a way to do a global repartitioning when writing dataframes.


In [1]:
from pyspark.sql import SparkSession  # , Window
# from pyspark.sql.types import *
import pyspark.sql.functions as F


spark_mode = 'local'


def spark_session(mode='yarn', machine='hadoop_node'):
    """
    machine=['hadoop_node', 'mac']
    mode=['yarn', 'local']
    """
    APP_NAME = 'get_history_vbs_by_batt'
    if machine == 'hadoop_node':
        spark_sess = SparkSession \
            .builder \
            .appName(APP_NAME) \
            .master(mode) \
            .getOrCreate()
    else:
        # add config to bind host and address to your machine ip when testing on Mac locally
        spark_sess = SparkSession \
            .builder \
            .appName(APP_NAME) \
            .master(mode) \
            .config('spark.driver.host', '127.0.0.1') \
            .config('spark.driver.bindAddress', '127.0.0.1') \
            .getOrCreate()
    return spark_sess


spark = spark_session(mode=spark_mode, machine='hadoop_node')
spark.sparkContext.setLogLevel('WARN')  # to reduce unnecessary logs
sc = spark.sparkContext

- [code below - reference](https://intellipaat.com/community/16097/pyspark-repartition-vs-partitionby)

In [3]:
pairs = sc.parallelize([1, 2, 3, 4, 2, 4, 1, 5, 6, 7, 7, 5, 5, 6, 4]).map(lambda x: (x, x))

print('pairs.partitionBy(3).glom().collect():', pairs.partitionBy(3).glom().collect())

# [[(3, 3), (6, 6), (6, 6)],
#  [(1, 1), (4, 4), (4, 4), (1, 1), (7, 7), (7, 7), (4, 4)],
#  [(2, 2), (2, 2), (5, 5), (5, 5), (5, 5)]]

pairs_2 = sc.parallelize([1, 2, 3, 4, 2, 4, 1, 5, 6, 7, 7, 5, 5, 6, 4]).map(lambda x: (x, x))
print('pairs_2.repartition(3).glom().collect():', pairs_2.repartition(3).glom().collect())

# [[(4, 4), (2, 2), (6, 6), (7, 7), (5, 5), (5, 5)],
#  [(1, 1), (4, 4), (6, 6), (4, 4)],
#  [(2, 2), (3, 3), (1, 1), (5, 5), (7, 7)]]


pairs.partitionBy(3).glom().collect(): [[(3, 3), (6, 6), (6, 6)], [(1, 1), (4, 4), (4, 4), (1, 1), (7, 7), (7, 7), (4, 4)], [(2, 2), (2, 2), (5, 5), (5, 5), (5, 5)]]
pairs_2.repartition(3).glom().collect(): [[], [(1, 1), (2, 2), (3, 3), (4, 4), (2, 2), (4, 4), (1, 1), (5, 5), (6, 6), (7, 7)], [(7, 7), (5, 5), (5, 5), (6, 6), (4, 4)]]


### [Spark Partitions](https://luminousmen.com/post/spark-partitions)

Let us imagine that the size of our input dataset is about 30 GB (~30000 MB) of an uncompressed text file on the HDFS which is distributing it on 10 nodes.

When Spark reads a file from HDFS, it creates a single partition for a single input split. Input split is set by the Hadoop InputFormat used to read this file. If you have a 30GB uncompressed text file stored on HDFS, then with the default HDFS block size setting (128MB) and default spark.files.maxPartitionBytes(128MB) it would be stored in 240 blocks, which means that the dataframe you read from this file would have 240 partitions.

This is equal to the Spark default parallelism (spark.default.parallelism) value.



# spark 3
https://teepika-r-m.medium.com/apache-spark-3-0-exciting-capabilities-f57132e158f

# different spark mode

cluster mode, client mode, local mode 
-> figure illustration
https://blog.knoldus.com/cluster-vs-client-execution-modes-for-a-spark-application/