# *Using Spark Efficiently*

*Focus of this notebook is on Spark constructs that can make programs more efficient. 
In general, this means minimizing the amount of data transfer across nodes, since this is usually the bottleneck for big data analysis problems.*

* *Shared variables*
    * *Accumulators*
    * *Broadcast variables*
* *DataFrames*
* *Partitioning and the Spark shuffle*
* *Piping to external programs*

In [1]:
import numpy as np
import string

In [2]:
sc

<pyspark.context.SparkContext at 0x7fdd50499780>

## Accumulators

*Spark functions such as __map__ can use variables defined in the driver program, but they make local copies of the variable that are not passed back to the driver program. **Accumulators** are shared variable that allow the aggregation of results from workers back to the driver program, for example, as an event counter. Suppose we want to count the number of rows of data with missing information. The most efficient way is to use an accumulator.*

In [5]:
ulysses = sc.textFile('/resources/data/Datasets/Ulysses.txt')
ulysses.take(10)

['',
 '',
 '',
 'The Project Gutenberg EBook of Ulysses, by James Joyce',
 '',
 'This eBook is for the use of anyone anywhere at no cost and with almost',
 'no restrictions whatsoever. You may copy it, give it away or re-use',
 'it under the terms of the Project Gutenberg License included with this',
 'eBook or online at www.gutenberg.org',
 '']

### Event counting

In [27]:
# Notice that we have some empty lines. We want to count the number of non-empty lines.
num_lines = sc.accumulator(0)
#nums_lines = 0

def tokenize(line):
    remove_punc_map = dict.fromkeys(map(ord, string.punctuation),None)
    return line.translate(remove_punc_map).lower().strip().split()
    
def tokenize_count(line):
    global num_lines
    print(num_lines)
    if line:
        num_lines += 1

    return tokenize(line)

counter = ulysses.flatMap(lambda line: tokenize_count(line)).countByValue()

In [29]:
num_lines

Accumulator<id=3, value=25302>

In [8]:
num_lines.value

25302

In [14]:
counter['circle']

20

## Broadcast Variables

*Sometimes we need to send a large read only variable to all workers. For example, we might want to share a large feature matrix to all workers as a part of a machine learning application. This same variable will be sent separately for each parallel operation unless you use a broadcast variable. Also, the default variable passing mechanism is optimized for small variables and can be slow when the variable is large.*

In [None]:
# Lets broadcast a variable that holds a dictionary map of all ascii letters.  

In [42]:
string.ascii_letters

'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ'

In [44]:
from itertools import count

mapper = dict(zip(string.ascii_letters, count()))
print(mapper)

{'i': 8, 'd': 3, 't': 19, 'L': 37, 'S': 44, 's': 18, 'K': 36, 'c': 2, 'X': 49, 'V': 47, 'D': 29, 'E': 30, 'R': 43, 'B': 27, 'P': 41, 'z': 25, 'A': 26, 'e': 4, 'o': 14, 'g': 6, 'y': 24, 'I': 34, 'j': 9, 'p': 15, 'F': 31, 'l': 11, 'r': 17, 'q': 16, 'a': 0, 'm': 12, 'Q': 42, 'x': 23, 'Y': 50, 'J': 35, 'f': 5, 'h': 7, 'n': 13, 'O': 40, 'Z': 51, 'M': 38, 'T': 45, 'v': 21, 'k': 10, 'w': 22, 'W': 48, 'u': 20, 'U': 46, 'C': 28, 'G': 32, 'H': 33, 'N': 39, 'b': 1}


In [45]:
# lets pass the mapper variable to spark context
bcv_mapper = sc.broadcast(mapper)

In [46]:
# Whereever you wnat to use the broadcast variable, use variable.value
#The broadcast variable is sent once to each node and can be re-used many times
# lets write a func
def weight_first_bc(line, bcv_mapper):
    words = tokenize(line)
    return sum(bcv_mapper.value.get(word[0], 0) for word in words if word.isalpha())

ulysses.map(lambda line: weight_first_bc(line, bcv_mapper)).sum()

2867999

Although it looks like "bcv_mapper" is being passed to each function, all that is passed is a path to the variable. The worker checks if the path has been cached and uses the cache instead of loading from the path.

When a spark job is submitted, the jobs are divided into stages-> tasks. The tasks actually carries out the execution of the transformations and actions on worker nodes. The drivers's sumbitTask() will serialize the functions and metadata about the broadcast variable to all nodes.

__Anatomy of how broadcast works__

The Driver creates a local directory to store the data to be broadcasted and launches a HttpServer with access to the directory. The data is actually written into the directory when the broadcast is called (val bdata = sc.broadcast(data)). At the same time, the data is also written into driver's blockManger with a StorageLevel memory + disk. Block manager allocates a blockId (of type BroadcastBlockId) for the data.

The real data is broadcasted only when an executor deserializes the task it has received, it also gets the broadcast variable's metadata, in the form of a Broadcast object. It then calls the readObject() method of the metadata object (bdata variable). This method will first check the local block manager to see if there's already a local copy. If not, the data will be fetched from the driver. Once the data is fetched, it's stored in the local block manager for subsequent uses.

# The Spark Shuffle and Partitioning

### Random Data genaration

In [59]:
list(map(''.join, np.random.choice(list(string.ascii_lowercase),(,4))))

['bkxk', 'nxam', 'zauu']

In [72]:
def fake_data(n, val):
    users = list(map(''.join, np.random.choice(list(string.ascii_lowercase), (n,2))))
    comments = [val]*n
    return tuple(zip(users, comments))

In [73]:
data = fake_data(10000, 'lkndnlasdwn')
list(data)[:10]

[('dq', 'lkndnlasdwn'),
 ('tp', 'lkndnlasdwn'),
 ('ig', 'lkndnlasdwn'),
 ('aw', 'lkndnlasdwn'),
 ('hu', 'lkndnlasdwn'),
 ('cv', 'lkndnlasdwn'),
 ('su', 'lkndnlasdwn'),
 ('vf', 'lkndnlasdwn'),
 ('bf', 'lkndnlasdwn'),
 ('vv', 'lkndnlasdwn')]

In [74]:
rdd = sc.parallelize(data).reduceByKey(lambda x, y: x+y)

In [75]:
new_data = fake_data(1000,  'newcomment')
list(new_data)[:10]

[('jv', 'newcomment'),
 ('bd', 'newcomment'),
 ('my', 'newcomment'),
 ('fh', 'newcomment'),
 ('nj', 'newcomment'),
 ('kx', 'newcomment'),
 ('mf', 'newcomment'),
 ('td', 'newcomment'),
 ('dj', 'newcomment'),
 ('uk', 'newcomment')]

In [76]:
rdd_new = sc.parallelize(new_data).reduceByKey(lambda x, y: x+y).cache()

In [77]:
rdd_updated = rdd.join(rdd_new)

In [79]:
rdd_updated.take(3)

[('kt',
  ('lkndnlasdwnlkndnlasdwnlkndnlasdwnlkndnlasdwnlkndnlasdwnlkndnlasdwnlkndnlasdwnlkndnlasdwnlkndnlasdwnlkndnlasdwnlkndnlasdwnlkndnlasdwnlkndnlasdwn',
   'newcomment')),
 ('wg',
  ('lkndnlasdwnlkndnlasdwnlkndnlasdwnlkndnlasdwnlkndnlasdwnlkndnlasdwnlkndnlasdwnlkndnlasdwnlkndnlasdwnlkndnlasdwnlkndnlasdwn',
   'newcommentnewcommentnewcomment')),
 ('zw',
  ('lkndnlasdwnlkndnlasdwnlkndnlasdwnlkndnlasdwnlkndnlasdwnlkndnlasdwnlkndnlasdwnlkndnlasdwnlkndnlasdwnlkndnlasdwnlkndnlasdwnlkndnlasdwnlkndnlasdwnlkndnlasdwnlkndnlasdwnlkndnlasdwnlkndnlasdwnlkndnlasdwnlkndnlasdwnlkndnlasdwnlkndnlasdwn',
   'newcommentnewcomment'))]

### Using partitionBy

The join operation will hash all the keys of both rdd and rdd_new, sending keys with the same hashes to the same node for the actual join operation. There is a lot of unnecessary data transfer. Since rdd is a much larger data set than rdd_new, we can instead fix the partitioning of rdd and just transfer the keys of rdd_new. This is done by rdd.partitionBy(numPartitions) where numPartitions should be at least twice the number of cores.

In [80]:
rdd2 = sc.parallelize(data).reduceByKey(lambda x, y: x+y)
rdd2 = rdd2.partitionBy(10).cache()

In [81]:
rdd2_updated = rdd2.join(rdd_new)

In [82]:
rdd2_updated.take(10)

[('zw',
  ('lkndnlasdwnlkndnlasdwnlkndnlasdwnlkndnlasdwnlkndnlasdwnlkndnlasdwnlkndnlasdwnlkndnlasdwnlkndnlasdwnlkndnlasdwnlkndnlasdwnlkndnlasdwnlkndnlasdwnlkndnlasdwnlkndnlasdwnlkndnlasdwnlkndnlasdwnlkndnlasdwnlkndnlasdwnlkndnlasdwnlkndnlasdwn',
   'newcommentnewcomment')),
 ('tp',
  ('lkndnlasdwnlkndnlasdwnlkndnlasdwnlkndnlasdwnlkndnlasdwnlkndnlasdwnlkndnlasdwnlkndnlasdwnlkndnlasdwnlkndnlasdwnlkndnlasdwnlkndnlasdwnlkndnlasdwnlkndnlasdwnlkndnlasdwnlkndnlasdwn',
   'newcomment')),
 ('ey',
  ('lkndnlasdwnlkndnlasdwnlkndnlasdwnlkndnlasdwnlkndnlasdwnlkndnlasdwnlkndnlasdwnlkndnlasdwnlkndnlasdwnlkndnlasdwnlkndnlasdwn',
   'newcommentnewcommentnewcomment')),
 ('uy',
  ('lkndnlasdwnlkndnlasdwnlkndnlasdwnlkndnlasdwnlkndnlasdwnlkndnlasdwnlkndnlasdwnlkndnlasdwnlkndnlasdwnlkndnlasdwnlkndnlasdwnlkndnlasdwn',
   'newcomment')),
 ('yq',
  ('lkndnlasdwnlkndnlasdwnlkndnlasdwnlkndnlasdwnlkndnlasdwnlkndnlasdwnlkndnlasdwnlkndnlasdwnlkndnlasdwnlkndnlasdwnlkndnlasdwnlkndnlasdwnlkndnlasdwnlkndnlasdwnlkndnlas

# Piping to External Programs