# PySpark 

To install and run PySpark on Jupyter Notebook, see: https://technofob.com/2018/12/26/how-to-run-pyspark-2-4-0-in-jupyter-notebook-on-mac/

Main reference for this tutorial: https://spark.apache.org/docs/latest/rdd-programming-guide.html and https://spark.apache.org/examples.html

In [1]:
# We start by getting the spark context
from pyspark import SparkContext, SparkConf
sc = SparkContext.getOrCreate()

## Resilient Distributed Dataset (RDD)

An RDD can be created from parallelizing an existing collection in the driver program, or referencing a dataset in an external storage system, such as a shared filesystem, HDFS, HBase, or any data source offering a Hadoop InputFormat.

In [5]:
from pyspark import SparkContext, SparkConf
sc = SparkContext.getOrCreate()

data = sc.parallelize([0,1,1,2,2,2,3,3,3,3])

#counts = data.map(lambda n : (n+1,2))\
            # .reduceByKey(lambda m, n: m+n)
#counts.collect()

In [6]:
data = [1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21]
distData = sc.parallelize(data)

distData

ParallelCollectionRDD[7] at parallelize at PythonRDD.scala:480

Let us look at how the parallelization worked. 

In [7]:
distData.saveAsTextFile("/Users/debapriyo/spark-playground/data1")

Spark chooses a suitable number of partitions. 

### Specifying number of partitions manually

In [8]:
distData2 = sc.parallelize(data,4) # 4 partitions
distData2.saveAsTextFile("/Users/debapriyo/spark-playground/data2")

## Lambda: anonymous function

In [9]:
# This is the usual way to define a function
def add1(a,b):
    return a+b

print(add1(2,3))

5


In [10]:
# This is the way to define an anonymous function
add2 = lambda a, b : a + b

print(add2(2,3))

5


In [None]:
# Or, without even naming it 

#print(lambda a, b: a + b)

## Map: apply some function to all elements of the list 

Map takes a function and a list as arguments and applies the functions to all the elements of the list. For example, the following would map a list of numbers to the list of it's squares. 

In [11]:
# Recall that we already had a list of integers
print(data)

[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21]


In [12]:
def sq(a):
    return a*a

data_sq = list(map(sq, data))
print(data_sq)

[1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196, 225, 256, 289, 324, 361, 400, 441]


### Using lambda with map

But it is more convenient to use a lambda function for this instead of defining a new function sq. 

In [14]:
data_sq = list(map(lambda a : a*a, data))
print(data_sq)

[1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196, 225, 256, 289, 324, 361, 400, 441]


## Reduce: apply a function repetitively on pairs of a list

This works best when at each step we reduce the pair of elements to a single number, so that overall the list is reduced to a single number. 

In [17]:
from functools import reduce
data_sum = reduce(lambda a,b : a + b, data)
print(data_sum)

231


## Spark RDD basic operations

Once an RDD is created, spark offers many basic operations. 

In [18]:
# Collect returns all elements of the dataset as an array. 
# Only to be used for an RDD that is small enough
distData.collect()

[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21]

In [19]:
# In the real practical case, collect is not advisable at all. 
# Use first() or takeSample
distData.first()

1

In [21]:
# Take a sample of n elements, with or without replacement 
#distData.takeSample(True, 5)
distData.takeSample(False, 5)

[12, 4, 10, 13, 14]

In [22]:
distData.count()

21

## RDD from external fies

In [None]:
lines = sc.textFile("/Users/debapriyo/spark-playground/anarchism.txt")

In [None]:
lines.takeSample(False,5)

# The word count problem in MapReduce (with Spark)

Now we are ready to write the code for the wordcount problem in a MapReduce fashion. 

In [None]:
# Create the RDD 
lines = sc.textFile("/Users/debapriyo/spark-playground/anarchism.txt")

At this point, it has performed no task, has not even read the data (lazy). Spark will perform the task only if we call for some output to be sent to the driver. 

Let us examine (for understanding) what's in there. It should be a list of strings. 

In [None]:
#lines.takeSample(False,8)
#lines.collect()
lines.count()

Now let us generate the key value pairs. For each word $w$, send $w$ to $(w,1)$. But for that we need to split the strings into words as well. Let us try map for that. 

In [None]:
words = lines.map(lambda line : line.split(" "))

words.takeSample(False,10)

Unfortunately it has created a list of lists of words. We would like a list of words. So, we use flatMap instead of map. It flattens it out. 

In [None]:
words = lines.flatMap(lambda line : line.split(" "))

words.takeSample(False,10)
#words.count()

Now that we have a list of words, we can use the map $w \mapsto (w,1)$.

In [None]:
keyVal = words.map(lambda w : (w,1))
keyVal.takeSample(False,10)

Now we use the function reduceByKey in Spark. It groups by key and applies reduce to each key. 

In [None]:
counts = keyVal.reduceByKey(lambda m, n: m + n)
counts.takeSample(False,10)

We could as well write this whole thing in a more crips code. 

In [None]:
lines = sc.textFile("/Users/debapriyo/spark-playground/anarchism.txt")
counts = lines.flatMap(lambda line: line.split(" ")) \
             .map(lambda w: (w , 1)) \
             .reduceByKey(lambda m,n: m + n)

# We can save the result if we want
counts.saveAsTextFile("/Users/debapriyo/spark-playground/anarchism_counts")

# A Monte-Carlo simulation

Estimate the value of Pi, the area of a unit circle. 

In [None]:
import numpy as np

def inside(p):
    x, y = np.random.random(), np.random.random()
    return x*x + y*y < 1

NUM_SAMPLES = 1000000
count = sc.parallelize(range(0, NUM_SAMPLES),2)\
          .filter(inside).count()
#print(count)

print("Pi is roughly %f" % (4.0 * count / NUM_SAMPLES))