# Lab Sheet 9, Spark Partitioning and Caching

These tasks are for working in the lab session and during the week. 

We'll work with partitions, and partitioning functions and persistence. 



The usual preparations.

In [None]:
# Load the Drive helper and mount
from google.colab import drive

# This will prompt for authorization.
drive.mount('/content/drive')

#install spark
%cd
!apt-get update -qq
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!tar -xzf "/content/drive/My Drive/Big_Data/data/spark/spark-3.5.0-bin-hadoop3.tgz"
!pip install -q findspark

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/root/spark-3.5.0-bin-hadoop3"
%cd /content
import findspark
findspark.init()

import pyspark
# get a spark context
sc = pyspark.SparkContext.getOrCreate()
print(sc)
# and a spark session
spark = pyspark.sql.SparkSession.builder.getOrCreate()
print(spark)

## Task 1 Standard Partitioning

We'll start by setting up an RDD with text as we did a few times before.

In [None]:
rddLines = sc.textFile("/content/drive/My Drive/Big_Data/data/hamlet.txt")
rddLines.take(3)

In [None]:
import re
rddWords = rddLines.flatMap(lambda c: re.split('\W+',c)).filter(lambda w: len(w)>0)
rddWords.take(3)

In [None]:
rddWordsW1 = rddWords.map(lambda w: (w,1))
rddWordsW1.take(3)

### a) find out the number of partitions in the rddWords and rddLines. Repartition to 10 partitions. 


In [None]:
print(rddWords ... ) >>> your code here, get the number of partitions
rddWordsRp = >>> repartition to 10 partitions 
print(rddWordsRp ... ) >>> get the number of partitions again

In [None]:
print(rddLines ...) >>> same procedure as above.
>>>
>>>

b) print the size of each partition using glom and a map with a lambda (and collect). 

In [None]:
rddWordsRp >> use glom and map with a lambda

In [None]:
rddLines >>> like above

### c) now use [mapPartitions](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.mapPartitions.html#pyspark.RDD.mapPartitions) to print the sizes of the partitions.

You need to provide a mapping function to mapPartitions (can be a lambda). The argument that the mapping function will get is an iterator. Iterators doen't have a function for counting their content. The fastest way to count the items in an iterator `iter` is to cast it to  `tuple(iter)` and apply `len`. The safest return type for the mapping function is a list `[]` wrapped around the value.  

In [None]:
rddWords >>> your code here, see instructions above

### d) use [mapPartitionsWithIndex](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.mapPartitionsWithIndex.html) to list the partition lengths together with the index for the partition. 

The solution is similar to 1c), but you need to accept two arguments in the mapping functions, index and partition iterator. Put the index and partition length  together in a tuple, and that still needs to be wrapped by  a list. 

In [None]:
rddWords <<< your code here

### e) for fun: see what `coalesce` does (it's not supposed to be used for increasing partition numbers). 

In [None]:
rddLinesRp = rddLines.coalesce(10)
rddLinesRp.glom().map(lambda l: len(l)).collect()

## 2) Custom Partitioning

### a) Create a partitioning sorted by the length of the lines. 

This is a relevant usecase for training deep networks with text for NLP applications. 
There, strings of similar lengths should be kept together, to make batch training efficient. 

You start by mapping the the (w,1) tuples to the length of `w`. Then you calculate the minimal and maximal lenghth, which enables you to determine the number of partitions you need. 

Then you can map the length to partitions in a lambda and apply it with [`partitionBy`](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.partitionBy.html).

In [None]:
rddWordsW1 >>> map to (w,1) structure 
rddWordsWl >> map to word length
>>> get min (offset) and max calculate `numPart` as the difference.
print("offset: ",offset, " numPart: ", numPart)
rddWordsPb >>> use offset and numPart in partitionBy
rddWordsPb >>> show partition sizes

b) You should see some skew in the partitioning (i.e. different partition sizes). Here a more specialised mapping of lenght values to partition indexes can help to counterbalance. Try to define and implement a mapping that reduces skew.

In [None]:
>>> create your counterbalancing partitioning function
def lp(i):
>>>    
rddWordsPb = rddWordsW1.partitionBy(...) >>>
rddWordsPb >>> show partition sizes

### c) Create a partitioning by the first letter of a word, lower case, with [`partitionBy`](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.partitionBy.html).

For this partitioning, we first remove all strings starting with non-letter characters. For this, use `s[0]` to get the first character in a string (which is interestingly still  a string type) and filter with [`isalpha`](https://docs.python.org/3/library/stdtypes.html?highlight=isalpha#bytearray.isalpha)). 

Then, we need to identify the range of the characters. For that, we map all (strings,1) items to the first character of the string. Then apply [`lower`](https://docs.python.org/3/library/stdtypes.html?highlight=lower#str.lower) and get the character encoding (i.e. the unicode number) with [`ord`](https://docs.python.org/3/library/functions.html?highlight=ord#ord). 

Then we calculate the min and max of the unicode numbers. With these values, we can map the initial characters to the 25 letters of the Latin alphabet (a-z) in our partition function and apply `partitionBy`. 

In [None]:
rddWordsW1 = rddWords ... >>> filter out non strings starting with non-letters
rddWordsWl = rddWordsW1 >>> get unicode numbers
>>> get min (offset) and max calculate `numPart` as the difference.
print("offset: ",offset, " numPart: ", numPart)
rddWordsPb = rddWordsW1.partitionBy( ... ) <<< apply
rddWordsPb >>> show sizes

### d) Show an example for each partition. 

We reuse the approach from 1d) here. The goal is to not just show the partition index and size, but also the letter with which the words in the partitions begin (or the whole word). 

You will need to write a named mapping function (i.e. defined with `def`, not in a lambda), which takes the partition index and the iterator, extracts and example word and the partition length, and then returns index, length and the work together in a tuple wrapped in a list.

In [None]:
#>>> define your mapping function
def f(i,p):
>>>
rddWordsPb.mapPartitionsWithIndex(f).collect() # apply f

## 3) Optimize with caching

### a)
Take the code from 2b) and identify which rdd should be cached to improve efficiency. Explain why, and how caching could help. Add the relevant code. 

### b) 
Time the code with and without caching using the `%%time` magic. The gains are not massive in this example, but you can try with more data to see more effect. 

In [None]:
#>>> apply caching, take the time
rddWordsW1 = rddWords.filter(lambda w: w[0].isalpha()).map(lambda w: (w,1))
rddWordsWl = rddWordsW1.map(lambda w1: ord(w1[0][0].lower()))
offset = rddWordsWl.min()
numPart = rddWordsWl.max() - offset 
rddWordsPb = rddWordsW1.partitionBy(numPart,lambda w: ord(w[0].lower())-offset)
rddWordsPb.glom().map(lambda l: len(l)).collect()

# Extra Tasks

## 4) Implement a Bloom filter in plain Python. 

In [None]:
import random
import re

# The size of the array 
vSize = 10000
v = [0]*vSize # the array for the Bloom filter

kmax=100 # maximal number of hash-functions
# The random parameters for universal hashing
a = []
b = []
# use a for-loop to append kmax random numbers to a and b 
>>> for ... 
>>> 	... random.randint(1,vSize)

def uhash(x,j):
	""" provides a hash of argument x based on the parameters in 'a' and 'b', 
	 as identified by j, based on universal hashing 
	"""
>>>	 implement universal hashing here with a[j] as 'a', b[j] as 'b', 433494437 as 'N'
>>>	 mod is '%' in Python


def fillArray(lst): 
	""" fills the Bloom filter array with a list of items to hash """
	for x in lst: # for all items
>>>		for ... : # iterate through all k hash functions
			idx = uhash(hash(x),i) % vSize # get the hashed position
>>>			v.... # and set position idx in v to 1


def checkItem(itm):
	""" checks whether an item is in the set, only true if all hash addresses have '1's. 
		may produce false positives, because of collisions, though.
	"""
	for i in range(k):
		idx = uhash(hash(itm),i) % vSize
>>>		if ... : # if array 'v' does not have a 1 at position 'idx'
			return False
	return True # return true only if all positions had a 1


k=2 # number of hash functions used, you can experiment with the effect
print('Using ',k,' hash function(s)')
# read the list of items to look for, e.g. the stopwords list.
with open("/data/BigDataMaterials/stopwords2.txt") as f:
	stpWrdLst = f.readlines()
	# ... and build the array
stpWrdLst = map(str.strip,stpWrdLst) # get rid of whitespace
fillArray(stpWrdLst)
# Then get the data to scan
with open("/data/tempstore/library/emma.txt") as f:
	text = f.read()
	textList = re.split("\W+",text)
print('read ',len(textList),' words in text')
# and check for hits with the Bloom filter
import time # take the time
start = time.time()
count = 0 # count the stopwords
for w in textList:
	if checkItem(w):
		count = count + 1
print(count, ' potential stopwords found in hamlet with Bloom filtering')
end = time.time()
print(end - start, "seconds used for Bloom filtering")
# and compare to  linear search, as used by Python's 'in' operator, 
# which is more exact but has higher complexity
start = time.time()
count = 0
for w in textList:
	if w in stpWrdLst:
		count = count+1
print(count, " actual stopwords found in hamlet with linear search")
end = time.time()
# output should look similar to this:
# ('Using ', 2, ' hash function(s)')
# ('read ', 165127, ' words in text')
# (100737, ' potential stopwords found in hamlet with Bloom filtering')
# (0.2430589199066162, 'seconds used for Bloom filtering')
# (100734, ' actual stopwords found in hamlet with linear search')
# (0.5513789653778076, 'seconds used for linear search')

print(end - start, "seconds used for linear search")
