In [1]:
import pyspark
import os
java8_location= '/usr/lib/jvm/java-8-openjdk-amd64' # Set your own
os.environ['JAVA_HOME'] = java8_location

from pyspark import SparkContext
sc = SparkContext()

# 45

Sometimes we need to share some data with all of our executors -> broadcast variables

In [2]:
important_data = 21

In [3]:
rdd = sc.parallelize(range(5))

In [4]:
rdd.map(lambda x: x + important_data).collect()

[21, 22, 23, 24, 25]

In [5]:
rdd.map(lambda x: x + important_data).repartition(2).map(
    lambda x: x + important_data).collect()
# in this case we created duplicates of important_data variable and sent it for processing at two stages.
# This can be an issue if important_data was a huge file. Broadcasting variables deal with it.
# The are used for efficient moving of data across executors and have them remained between stages.
# It sends you data just once and not once per stage!

[42, 44, 45, 46, 43]

In [6]:
important_data = sc.broadcast(21)

In [7]:
important_data 

<pyspark.broadcast.Broadcast at 0x7f2488e87da0>

In [8]:
important_data.value

21

In [10]:
rdd.map(lambda x: x + important_data.value).collect()
# We don't want to pass broadcast.value, because it will defeat all the benefits of it.
# Instead we want to pass the broadcast variable itself and ask for the value at the last possible second.

[21, 22, 23, 24, 25]

Broadcast variables are read-only. You cannot change it at one place and expect it to change everywhere else, because executors are not syncing every time we make a change.

Don't change broadcast variables at the executors.

In [11]:
important_data.unpersist()
# deleting cached copies of the variable and releasing the memory (call after executors no longer use it).

# 46

Sometimes we want the job to output the processed data itself and some additional metadata (maybe, counters, debug info, timestamps). 

Accumulators -> type of data we can add to everywhere, but we can look into it only in the main driver.

In [25]:
evens = sc.accumulator(0) # starts with 0

In [26]:
numbers = sc.parallelize(range(10))

In [27]:
def inc_and_report(x):
    global evens
    if x % 2 == 0:
        evens += 1
    return x + 8 # some data processing on the element

In [28]:
numbers.map(inc_and_report).collect()

[8, 9, 10, 11, 12, 13, 14, 15, 16, 17]

In [29]:
evens.value

5

In [30]:
numbers.map(inc_and_report).collect()

[8, 9, 10, 11, 12, 13, 14, 15, 16, 17]

In [31]:
evens.value # it didn't get reset
# This might be an issue because sometimes Spark can internally rerun the map function, if the failure occurs. ->
# -> Counting inside a transformation is NOT guaranteed to be called just once.
# Counting inside an action is guaranteed to be called just once.
# foraech is often used to guarantee that everything was pushed through an accumulator.

10

In [32]:
evens = sc.accumulator(0)

In [33]:
numbers.map(inc_and_report)

PythonRDD[18] at RDD at PythonRDD.scala:53

In [34]:
evens.value # because transformations are lazy, noone forces the transformation above to run!

0

# 47

Built-in accumulators allow you to store only numeric data

In [36]:
ids = sc.parallelize(range(20))

In [37]:
ids.distinct().collect()
# how can we return it as a result of our job?

[0, 4, 8, 12, 16, 1, 5, 9, 13, 17, 2, 6, 10, 14, 18, 3, 7, 11, 15, 19]

Custom accumulators need to know about:
- zero valu
- add in place

In [38]:
from pyspark import AccumulatorParam

In [41]:
class SetAccumulatorParam(AccumulatorParam):
    
    def zero(self, initialValue):
        s = set()
        s.add(initialValue)
        return s
    
    def addInPlace(self, v1, v2):
        return v1.union(v2)

In [48]:
ids_seen = sc.accumulator(None, # -> initialValue
                          SetAccumulatorParam())

In [49]:
ids_seen

Accumulator<id=6, value=None>

In [50]:
def inc_and_note(x):
    global ids_seen
    ids_seen += x
    return x + 1

In [51]:
ids.map(inc_and_note).collect()
# we are getting an error because we define a custom class only in this REPL, and pickle is not able to find it
# in the list of classes, there is no such file to load from

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 13.0 failed 1 times, most recent failure: Lost task 1.0 in stage 13.0 (TID 51, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/home/victor/Desktop/Spark/spark-2.4.4-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 377, in main
    process()
  File "/home/victor/Desktop/Spark/spark-2.4.4-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 372, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/home/victor/Desktop/Spark/spark-2.4.4-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py", line 393, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/home/victor/Desktop/Spark/spark-2.4.4-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/util.py", line 99, in wrapper
    return f(*args, **kwargs)
  File "<ipython-input-50-8264dbbf73dd>", line 3, in inc_and_note
  File "/home/victor/Desktop/Spark/spark-2.4.4-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/accumulators.py", line 169, in __iadd__
    self.add(term)
  File "/home/victor/Desktop/Spark/spark-2.4.4-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/accumulators.py", line 165, in add
    self._value = self.accum_param.addInPlace(self._value, term)
  File "<ipython-input-41-87fdea121689>", line 9, in addInPlace
TypeError: 'int' object is not iterable

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:456)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:592)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:575)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
	at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:945)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:945)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1876)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2110)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:944)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:166)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/home/victor/Desktop/Spark/spark-2.4.4-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 377, in main
    process()
  File "/home/victor/Desktop/Spark/spark-2.4.4-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 372, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/home/victor/Desktop/Spark/spark-2.4.4-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py", line 393, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/home/victor/Desktop/Spark/spark-2.4.4-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/util.py", line 99, in wrapper
    return f(*args, **kwargs)
  File "<ipython-input-50-8264dbbf73dd>", line 3, in inc_and_note
  File "/home/victor/Desktop/Spark/spark-2.4.4-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/accumulators.py", line 169, in __iadd__
    self.add(term)
  File "/home/victor/Desktop/Spark/spark-2.4.4-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/accumulators.py", line 165, in add
    self._value = self.accum_param.addInPlace(self._value, term)
  File "<ipython-input-41-87fdea121689>", line 9, in addInPlace
TypeError: 'int' object is not iterable

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:456)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:592)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:575)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
	at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:945)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:945)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more


# 48

In [52]:
rdd = sc.parallelize(range(10), numSlices=3)

In [53]:
rdd.glom().collect() # don't do that on the big data -> can run out of memory

[[0, 1, 2], [3, 4, 5], [6, 7, 8, 9]]

We want our data to be evenly distributed across executors. 

If we have both small and large values, all executors should have approx. equal amount of each for max performance.

Paying attention to repartitions is important, because number of shuffles affect the runtime.

You can move your data across the executors, but then use the same partition schema to distribute it back to avoid repartitioning.

Rules for shuffling and partitioning

1) RDDs remember if they have already been partitioned.

2) A shuffle will see if RDD have already been partitioned aand reuse that partitioning whenever possible.

3) A shuffle of two RDDs will use the partition of one of them assuming they have one.

In [54]:
# Thus, it is fruitful when 2 RDDs are partitioned in a similar way.

In [None]:
RDD A
RDD B
(key-valu pairs RDDs)

In [None]:
# Example 1 (1 shuffle)
a.join(b).reduceByKey(...)
# 1 shuffle when a and b are joined
# the follow-up reduceByKey will see, that our data is already partitioned by key and run without shuffle

In [None]:
# Example 2 (2 shuffles)
a.join(b).map(...).reduceByKey(...)
# 1 shuffle when a and b are joined
# then we map a transformation across resulting RDD -> losing the shuffle information, 
# because map could cause keys to change which would invalidate the partition
# another shffle is needed when performing reduceByKey

In [None]:
# Example 3 (1 shuffle)
a.join(b).map(..., preservePartitioning=True).reduceByKey(...)
# in this case we say to Spark that in map function we are not doing anything to keys and thus the partition
# will remain valid

In [None]:
# Example 4 (1 shuffle)
a.join(b).mapValues(...).reduceByKey(...)
# mapValues applies function only to values and thus keys will remain unchanged -> 
# -> reduceByKey doesn't trigger another shuffle