# Spark partitioning demo

## Initialize the spark application

In [None]:
%load_ext sparkmagic.magics

In [None]:
import os
from IPython import get_ipython

# set the application name as "<your_gaspar_id>-homework3"
username = os.environ['RENKU_USERNAME']
server = "http://iccluster029.iccluster.epfl.ch:8998"

get_ipython().run_cell_magic(
    'spark',
    line='config', 
    cell="""{{ "name": "{0}-week7", "executorMemory": "2G", "executorCores": 2, "numExecutors": 4, "driverMemory": "2G"}}""".format(username)
)

In [None]:
get_ipython().run_line_magic(
    "spark", "add -s {0}-week7 -l python -u {1} -k".format(username, server)
)

## Default parallelization

In [None]:
%%spark
rdd = sc.parallelize(range(1,5))

In [None]:
%%spark
sc.defaultParallelism

In [None]:
%%spark
rdd.getNumPartitions()

In [None]:
%%spark
rdd.glom().collect()

Not ideal!
Can we do better?

We should reduce the number of partitions and have at least one element in each partition

In [None]:
%%spark
rdd2 = sc.parallelize(range(1,5),5)

In [None]:
%%spark
rdd2.getNumPartitions()

In [None]:
%%spark
rdd2.glom().collect()

That's much better!

## Repartition and coalesce

Now lets create a DataFrame, dont forget to give it the schema.

In [None]:
%%spark
df1 = sc.parallelize([[1,2,3], [4,5,6]]).toDF(("a", "b", "c"))

In [None]:
%%spark
df1.rdd.getNumPartitions()

In [None]:
%%spark
df1.rdd.glom().collect()

### Repartition

This will use a shuffle to redistribute data.

In [None]:
%%spark
df2 = df1.repartition("a")

Now lets check how many partitions were created

In [None]:
%%spark
df2.rdd.getNumPartitions()

`200` is the default number of partitions for shuffle operations in Spark, it's a configuration set in: `spark.sql.shuffle.partitions`

### Coalesce

In [None]:
%%spark
df3 = df1.coalesce(2)

In [None]:
%%spark
df3.rdd.glom().collect()

## Different types of partitioning

Different types of partitioning can be applied by using `partitionBy`. The data needs to be in the form of key, value.

First we initialize our RDD. 

In [None]:
%%spark
new_data = range(100)

In [None]:
%%spark
new_rdd = sc.parallelize(new_data).map(lambda x: (x, x))

We verify the partitions, we have 40 because that's the default of our Spark context.

In [None]:
%%spark
new_rdd.glom().collect()

### HashPartitioner

In [None]:
%%spark
partitioned_rdd = new_rdd.partitionBy(2)

In [None]:
%%spark
partitioned_rdd.getNumPartitions()

In [None]:
%%spark
partitioned_rdd.glom().collect()

### Custom partitioner

Sometimes we might need a different type of partitioner, to have more control on how the data is spread in the cluster.

In [None]:
%%spark
nobel_prizes = [
    {'name': 'Michel Mayor', 'field': 'Physics', 'year': 2019, 'country': 'Switzerland'},
    {'name': 'Tomas Lindahl', 'field': 'Chemistry', 'year': 2015, 'country': 'Sweden'},
    {'name': 'Didier Queloz', 'field': 'Physics', 'year': 2019, 'country': 'Switzerland'},
    {'name': 'Michael Levitt', 'field': 'Chemistry', 'year': 2013, 'country': 'South Africa'},
    {'name': 'Jacques Dubochet', 'field': 'Chemistry', 'year': 2017, 'country': 'Switzerland'},
    {'name': 'Tomas Tranströmer', 'field': 'Literature', 'year': 2011, 'country': 'Sweden'},
    {'name': 'Mario Vargas Llosa', 'field': 'Literature', 'year': 2010, 'country': 'Spain'},
]

In [None]:
%%spark
def string_partitioner(string_value):
    return hash(string_value)

In [None]:
%%spark
nobel_prizes_rdd = sc.parallelize(nobel_prizes).map(lambda x: (x['country'], x)).partitionBy(4, string_partitioner)

In [None]:
%%spark
nobel_prizes_rdd.glom().collect()

Pay attention to data skews!

In [None]:
%%spark
test = sc.parallelize(nobel_prizes).map(lambda x: (x['country'], x)).partitionBy(4)

In [None]:
%%spark
nobel_prizes_rdd2 = sc.parallelize(nobel_prizes).map(lambda x: (x['field'], x)).partitionBy(3, string_partitioner)

In [None]:
%%spark
nobel_prizes_rdd2.glom().collect()