### [Spark Tuning - Level of Parallelism](http://spark.apache.org/docs/latest/tuning.html#level-of-parallelism)

In [11]:
# Configure the necessary Spark environment
import os
import sys
import time

#First Make sure that the SPARK_HOME is already set. If not, then please set it before running this notebook.
spark_home = os.environ.get('SPARK_HOME', None)
sys.path.insert(0, spark_home + "/python")

# Add the py4j to the path.
# You may need to change the version number to match your install
sys.path.insert(0, os.path.join(spark_home, 'python/lib/py4j-0.9-src.zip'))

from pyspark import SparkContext, SparkConf
from pyspark import StorageLevel

### Level of Parallelism: Calculation of Pi example

#### Tip: Use repartition if data is not distributed well

In [12]:
from random import random

In [14]:
def sampleAndSum(p):
    points = [(random(),random()) for i in xrange(p)]
    return sum([1 for (a,b) in points if a*a + b*b < 1])

def calculate_pi(sc, NUM_SAMPLES):
    tasks = sc.defaultParallelism
    count = sc.parallelize([NUM_SAMPLES/tasks]*tasks) \
            .map(sampleAndSum) \
            .reduce(lambda a, b: a + b)
    #print "Pi is roughly %f" % (4.0 * count / NUM_SAMPLES)

#### Using 2 tasks on 2 cores

In [15]:
conf = SparkConf()
conf.setMaster("local[2]")
conf.set("spark.default.parallelism","2")
sc = SparkContext(conf=conf)

In [16]:
n=10000000
start_time = time.time()
calculate_pi(sc, n)
print "Time taken:", time.time() - start_time

Time taken: 3.72293496132


In [17]:
sc.stop()

#### Using 4 tasks on 4 cores

In [18]:
conf = SparkConf()
conf.setMaster("local[4]")
conf.set("spark.default.parallelism","4")
sc = SparkContext(conf=conf)

In [19]:
start_time = time.time()
calculate_pi(sc, n)
print "Time taken:", time.time() - start_time

Time taken: 2.11446905136


In [20]:
sc.stop()

#### Using 8 tasks on 4 cores

In [21]:
conf = SparkConf()
conf.setMaster("local[4]")
conf.set("spark.default.parallelism","8")
sc = SparkContext(conf=conf)

In [22]:
start_time = time.time()
calculate_pi(sc, n)
print "Time taken:", time.time() - start_time

Time taken: 2.1797208786


In [23]:
sc.stop()

#### Using 4 cores instead of 2 decreases the time considerably from 25sec to 17 sec. However, when using 4 cores, increasing the no. of tasks from 4 to 8 doesn't help much since it is not adding to the parallelism

### Memory usage of Tasks

Sometimes, you will get an OutOfMemoryError not because your RDDs don’t fit in memory, but because the working set of one of your tasks, such as one of the reduce tasks in groupByKey, was too large. Spark’s shuffle operations (sortByKey, groupByKey, reduceByKey, join, etc) build a hash table within each task to perform the grouping, which can often be large. The simplest fix here is to increase the level of parallelism, so that each task’s input set is smaller or increase the memory size for executors.

In [24]:
conf.set("spark.executor.memory","2g")

<pyspark.conf.SparkConf at 0x1087ba4d0>

In [25]:
conf.set("spark.driver.memory","2g")

<pyspark.conf.SparkConf at 0x1087ba4d0>

### Broadcasting

If your tasks use any large object from the driver program inside of them (e.g. a static lookup table), consider turning it into a broadcast variable.

In [26]:
import cPickle as cp
from operator import add

In [27]:
partition = cp.load(open("users-partition.pickle","rb"))

In [41]:
partition.items()[:5]  # a mapping of users to keys.

[(u'583105596', 6),
 (u'1490767454', 1),
 (u'1974217567', 4),
 (u'426902926', 4),
 (u'615496505', 3)]

In [42]:
tweets = [(i,"random text") for i in xrange(1000)]  # i refes to the user, the second part is dummy text.

In [43]:
conf = SparkConf()
conf.setMaster("local[4]")
sc = SparkContext(conf=conf)

In [44]:
tweets = sc.parallelize(tweets)  # distribute the tweets

In [45]:
start_time = time.time()
counts = tweets.map(lambda (user, text): (partition.get(user, 7), 1)) \
               .reduceByKey(add) \
               .sortByKey().collect()
print "Time taken:", time.time() - start_time

Time taken: 5.70145916939


#### Now broadcast first

In [32]:
partition_broadcasted = sc.broadcast(partition)  # send the table to all of the executors.

In [33]:
start_time = time.time()
counts = tweets.map(lambda (user, text): (partition_broadcasted.value.get(user, 7), 1)) \
               .reduceByKey(add) \
               .sortByKey().collect()
print "Time taken:", time.time() - start_time

Time taken: 0.75391292572


In [34]:
sc.stop()