## Partitioning

When an RDD is created, you can specify the number of partitions.
<br>The default is the number of workers defined when you setu th `SparkContext`

In [1]:
from pyspark import SparkContext

### Creating `SparkContext` with 2 workers

In [2]:
sc = SparkContext(master="local[2]")

In [3]:
A = sc.parallelize(range(1000000))

### Use `getNumPartition` to retrive the number of partitions created

In [4]:
print(A.getNumPartitions())

2


### We can repartition _A_ in any number of partitions we want

In [5]:
D = A.repartition(10)

In [6]:
print(D.getNumPartitions())

10


### We can also set the number of partitions while creating the RDD with `numSlices` argument 

In [7]:
A = sc.parallelize(range(1000000),numSlices=8)

In [8]:
print(A.getNumPartitions())

8


### Why partitions are important?

* They define the unit the executor works on
* You should have at least as many partitions as the number of worker nodes
* Smaller partitions may allow more parallelization

## Repartitioning for Load Balancing

Suppose we start with 10 partitions, all with exactly the same number of elements

In [9]:
A=sc.parallelize(range(1000000)).map(lambda x:(x,x)).partitionBy(10)
print(A.glom().map(len).collect())

[100000, 100000, 100000, 100000, 100000, 100000, 100000, 100000, 100000, 100000]


Suppose we want to use **`filter()`** to select some of the elements in A.<br>
Some partitions might have more elements remaining than others.

In [10]:
#select 10% of the entries
# A bad filter for numbers divisable by 5
B=A.filter(lambda pair: pair[0]%5==0)
# get no. of partitions
print(B.glom().map(len).collect())

[100000, 0, 0, 0, 0, 100000, 0, 0, 0, 0]


Future operations on B will use only two workers.<br>
The other workers will do nothing, because their partitions are empty.

### To fix the situation we need to repartition the unbalanced RDD. <br>One way to do that is to repartition using a new key using the method `partitionBy()`

* The method **`.partitionBy(k)`** expects to get a **`(key,value)`** RDD where keys are integers.
* Partitions the RDD into **`k`** partitions.
* The element **`(key,value)`** is placed into partition no. **`key % k`**

In [11]:
C=B.map(lambda pair:(pair[1]/10,pair[1])).partitionBy(10) 
print(C.glom().map(len).collect())

[20000, 20000, 20000, 20000, 20000, 20000, 20000, 20000, 20000, 20000]


Note, how **`C`** consists of only 200,000 elements from the unbalanced **`B`** partition but redistributes them in equal partitions of 20,000 elements each.

### Another approach is to use random partitioning using **`repartition(k)`**
* An **advantage** of random partitioning is that it does not require defining a key.
* A **disadvantage** of random partitioning is that you have no control on the partitioning i.e. which elements go to which partition.

In [12]:
C=B.repartition(10)
print(C.glom().map(len).collect())

[20000, 20000, 20000, 20000, 20000, 20000, 20000, 20000, 20000, 20000]


## `Glom()`
* In general, spark does not allow the worker to refer to specific elements of the RDD.
* Keeps the language clean, but can be a major limitation.

#### `glom()` transforms each partition into a tuple (immutabe list) of elements.<br> Creates an RDD of tules. One tuple per partition. <br>Workers can refer to elements of the partition by index but you cannot assign values to the elements, the RDD is still immutable.

* Consider **the command used above to count the number of elements in each partition.**: `print(C.glom().map(len).collect())`
* We used `glom()` to make each partition into a tuple.
* We used `len` on each partition to get the length of the tuple - size of the partition.
* We `collect`ed the results to print them out.

### A more elaborate example
There are many things that you can do using `glom()`.
<br>
For example, suppose we want to get the first element, the number of elements, and the sum of the elements of the unbalanced partitions we made from `A` into `B`. Of the partition is empty we just return `None`.

In [14]:
def getPartitionInfo(G):
    d=0
    if len(G)>1: 
        for i in range(len(G)-1):
            d+=abs(G[i+1][1]-G[i][1]) # access the glomed RDD that is now a tuple (immutable list)
        return (G[0][0],len(G),d)
    else:
        return(None)

output=B.glom().map(lambda B: getPartitionInfo(B)).collect()
print(output)

[(0, 100000, 999990), None, None, None, None, (5, 100000, 999990), None, None, None, None]
