# SparkContext - number of workers and lazy evaluation

## Checking the impact of number of workers
While initializing the `SparkContext`, we can specify number of worker nodes. Generally, it is recommended to have one worker per core of the machine. But it can be smaller or larger. In the following code, we will examine the impact of number of worker cores on some parallelized operation.

In [1]:
import os
os.environ["JAVA_HOME"] = "/opt/sdkman/candidates/java/current"

In [2]:
from time import time
from pyspark import SparkContext

In [3]:
for j in range(1,5):
    sc= SparkContext(master = "local[%d]"%(j))
    t0=time()
    for i in range(10):
        sc.parallelize([1,2]*10000).reduce(lambda x,y:x+y)
    print(f"{j} executors, time = {time()-t0}")
    sc.stop()

1 executors, time = 1.6398680210113525
2 executors, time = 1.1656532287597656
3 executors, time = 1.1847617626190186
4 executors, time = 1.1492693424224854


#### We observe that it takes almost double time for 1 worker, and after that time reduces to a flat level for 2,3,4 workers etc. This is because this code run on a Linux virtual box using only 2 cores from the host machine. If you run this code on a machine with 4 cores, you will see benefit upto 4 cores and then the flattening out of the time taken. It also become clear that using more than one worker per core is not beneficial as it just does context-switching in that case and does not speed up the parallel computation.

## Showing the essence of _lazy_ evaluation
![](https://raw.githubusercontent.com/rubensa/spark-with-python/master/images/lazy.jpg)

In [4]:
sc = SparkContext(master="local[2]")

### Make a RDD with 1 million elements

In [5]:
%%time
rdd1 = sc.parallelize(range(1000000))

CPU times: user 1.46 ms, sys: 542 µs, total: 2 ms
Wall time: 3.69 ms


### Some computing function - `taketime`

In [6]:
from math import cos
def taketime(x):
    [cos(j) for j in range(100)]
    return cos(x)

### Check how much time is taken by `taketime` function

In [7]:
%%time
taketime(2)

CPU times: user 42 µs, sys: 15 µs, total: 57 µs
Wall time: 63.2 µs


-0.4161468365471424

### Now do the `map` operation on the function

In [8]:
%%time
interim = rdd1.map(lambda x: taketime(x))

CPU times: user 22 µs, sys: 8 µs, total: 30 µs
Wall time: 35.5 µs


#### How come each taketime function takes 45.8 us but the map operation with a 10000 element RDD also took similar time?<br><br>Because of _lazy_ evaluation i.e. nothing was computed in the previous step, just a plan of execution was made. The variable `interim` does not point to a data structure, instead it points to a plan of execution, expressed as a dependency graph. The dependency graph defines how RDDs are computed from each other.

### Let's see the "Dependency Graph" using `toDebugString` method

In [9]:
print(interim.toDebugString().decode())

(2) PythonRDD[1] at RDD at PythonRDD.scala:53 []
 |  ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:195 []


![](https://raw.githubusercontent.com/rubensa/spark-with-python/master/images/rdd_dependency_graph.png)

### The actual execution by `reduce` method

In [10]:
%%time
print('output =',interim.reduce(lambda x,y:x+y))

output = -0.28870546796843666
CPU times: user 8.95 ms, sys: 4.46 ms, total: 13.4 ms
Wall time: 9.64 s


In [11]:
1000000*31e-6

31.0

#### It is less than what we would have expected considering 1 million operations with the `taketime` function. This is the result of parallel operation of 2 cores.

### Now, we have not saved (materialized) any intermediate results in `interim`, so another simple operation (e.g. counting elements > 0) will take almost same time

In [12]:
%%time
print(interim.filter(lambda x:x>0).count())

500000
CPU times: user 13.7 ms, sys: 136 µs, total: 13.8 ms
Wall time: 9.55 s


## Caching to reduce computation time on similar operation (spending memory)

### Run the same computation as before with `cache` method to tell the dependency graph to plan for caching

In [13]:
%%time
interim = rdd1.map(lambda x: taketime(x)).cache()

CPU times: user 5.89 ms, sys: 1.2 ms, total: 7.09 ms
Wall time: 17.7 ms


In [14]:
print(interim.toDebugString().decode())

(2) PythonRDD[4] at RDD at PythonRDD.scala:53 [Memory Serialized 1x Replicated]
 |  ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:195 [Memory Serialized 1x Replicated]


In [15]:
%%time
print('output =',interim.reduce(lambda x,y:x+y))

output = -0.28870546796843666
CPU times: user 5.78 ms, sys: 3.32 ms, total: 9.11 ms
Wall time: 9.15 s


### Now run the same `filter` method with the help of cached result

In [16]:
%%time
print(interim.filter(lambda x:x>0).count())

500000
CPU times: user 9.29 ms, sys: 0 ns, total: 9.29 ms
Wall time: 311 ms


#### This time it took much shorter time due to cached result, which it could use to compare to 0 and count easily.