# PySpark

To install and run PySpark on Jupyter Notebook, see: https://technofob.com/2018/12/26/how-to-run-pyspark-2-4-0-in-jupyter-notebook-on-mac/

Main reference for this tutorial: https://spark.apache.org/docs/latest/rdd-programming-guide.html and https://spark.apache.org/examples.html

In [1]:
!pip install pyspark
!pip install -U -q PyDrive
!apt install openjdk-8-jdk-headless -qq
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m4.1 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425345 sha256=dc4394b1b2869e98344e85a239e4da3f6c2000f04031da1087a45a11670bd6f7
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0
The following additional packages will be installed:
  libxtst6 openjdk-8-jre-headless
Suggested packages:
  openjdk-8-demo openjdk-8-source libnss-mdns fonts-dejavu-extra fonts-nanum fonts-ipafont-gothic
  fonts-ipafont-mincho fonts-wqy-microhei fonts-wqy-zenhei fonts-indic

In [2]:
# We start by getting the spark context
from pyspark import SparkContext, SparkConf
sc = SparkContext.getOrCreate()

## Resilient Distributed Dataset (RDD)

An RDD can be created from parallelizing an existing collection in the driver program, or referencing a dataset in an external storage system, such as a shared filesystem, HDFS, HBase, or any data source offering a Hadoop InputFormat.

In [3]:
data = [1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20]
distData = sc.parallelize(data)

distData

ParallelCollectionRDD[0] at readRDDFromFile at PythonRDD.scala:289

Let us look at how the parallelization worked.

In [4]:
distData.saveAsTextFile("/Users/Sharanya/spark-playground/data1")

Spark chooses a suitable number of partitions.

### Specifying number of partitions manually

In [5]:
distData2 = sc.parallelize(data,4) # 4 partitions
distData2.saveAsTextFile("/Users/Sharanya/spark-playground/data2")

## Lambda: anonymous function

In [6]:
# This is the usual way to define a function
def add1(a,b):
    return a+b

print(add1(2,3))

5


In [7]:
# This is the way to define an anonymous function
add2 = lambda a, b : a + b

print(add2(2,3))

5


## Map: apply some function to all elements of the list

Map takes a function and a list as arguments and applies the functions to all the elements of the list. For example, the following would map a list of numbers to the list of it's squares.

In [8]:
# Recall that we already had a list of integers
print(data)

[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20]


In [9]:
def sq(a):
    return a*a

data_sq = list(map(sq, data))
print(data_sq)

[1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196, 225, 256, 289, 324, 361, 400]


### Using lambda with map

But it is more convenient to use a lambda function for this instead of defining a new function sq.

In [10]:
data_sq = list(map(lambda a : a*a, data))
print(data_sq)

[1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196, 225, 256, 289, 324, 361, 400]


## Reduce: apply a function repetitively on pairs of a list

This works best when at each step we reduce the pair of elements to a single number, so that overall the list is reduced to a single number.

In [11]:
from functools import reduce
data_sum = reduce(lambda a,b : a + b, data)
print(data_sum)

210


## Spark RDD basic operations

Once an RDD is created, spark offers many basic operations.

In [12]:
# Collect returns all elements of the dataset as an array.
# Only to be used for an RDD that is small enough
distData.collect()

[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20]

In [13]:
# Example of filter
def smallNumber(x):
    return x < 10

filtered = distData.filter(smallNumber)

In [14]:
filtered.count()

9

In [15]:
# In the real practical case, collect is not advisable at all.
# Use first() or takeSample
distData.first()

1

In [16]:
# Take a sample of n elements, with or without replacement
distData.takeSample(True, 10)
#distData.takeSample(False, 5)

[17, 1, 11, 3, 9, 3, 2, 14, 14, 1]

In [17]:
distData.count()

20

## RDD from external fies

In [20]:
trump = sc.textFile("/Users/Sharanya/spark-playground/Trump.txt")

In [21]:
trump.takeSample(False,10)

['No, I have no intention of changing my mind.',
 '',
 '',
 'To me, it was instructional.',
 'They put Hillary’s numbers. And I’m winning by a lot and they said, "Donald Trump is winning. You know, let’s go to a new subject, okay?"',
 '',
 '',
 'Here’s the problem. From the standpoint of bookkeeping, from the standpoint of bureaucracy, you’re talking about millions and millions of returns. You’re talking about building the IRS even bigger and it’s a monster. And I could cut it down way down. And the money that you’re talking about is far less than the administrative costs.',
 '',
 '']

# The word count problem in MapReduce (with Spark)

Now we are ready to write the code for the wordcount problem in a MapReduce fashion.

In [22]:
# Create the RDD
# Already done above

At this point, it has performed no task, has not even read the data (lazy). Spark will perform the task only if we call for some output to be sent to the driver.

Let us examine (for understanding) what's in there. It should be a list of strings.

In [23]:
#lines.takeSample(False,8)
#lines.collect()

# lines
#lines.count()

In [24]:
# Recall what split does
text = "Debapriyo Majumdar"
print(text.split(" "))

['Debapriyo', 'Majumdar']


Now let us generate the key value pairs. For each word $w$, send $w$ to $(w,1)$. But for that we need to split the strings into words as well. Let us try map for that.

In [25]:
words = trump.map(lambda line : line.split(" "))

words.takeSample(False,10)

[['As',
  'far',
  'as',
  'Jeb',
  'is',
  'concerned,',
  'I',
  'watched',
  'him',
  'this',
  'morning',
  'on',
  'television',
  'and',
  'it’s',
  'a',
  'little',
  'bit',
  'sad.'],
 ['And',
  'this',
  'really',
  'pertains',
  'to',
  'allowing',
  'people',
  'into',
  'our',
  'country',
  'that',
  'we',
  'know',
  'are',
  'going',
  '–',
  'we’re',
  'going',
  'to',
  'have',
  'problems.',
  'We’re',
  'going',
  'to',
  'have',
  'huge',
  'problems.',
  'Allowing',
  'people',
  'in',
  'from',
  'areas',
  'of',
  'the',
  'world',
  'we',
  'don’t',
  'know',
  'where',
  'they',
  'come',
  'from.',
  'We',
  'don’t',
  'know',
  'where',
  'their',
  'documents',
  'are.',
  'They',
  'say',
  'they',
  'don’t',
  'have',
  'documents.',
  'They',
  'have',
  'no',
  'papers.',
  'No',
  'nothing.',
  'We',
  'let',
  'them',
  'in.',
  'How',
  'stupid',
  'can',
  'we',
  'be?'],
 [''],
 ['When',
  'they',
  'start',
  'dancing',
  'about',
  'a',
  'deal',


Unfortunately it has created a list of lists of words. We would like a list of words. So, we use flatMap instead of map. It flattens it out.

In [26]:
words = trump.flatMap(lambda line : line.split(" "))

words.takeSample(False,30)
words.count()

#words
#words.count()

171898

Now that we have a list of words, we can use the map $w \mapsto (w,1)$.

In [27]:
keyVal = words.map(lambda w : (w,1))
keyVal.takeSample(False,20)

[('going', 1),
 ('what', 1),
 ('ENEMIES', 1),
 ('he', 1),
 ('be', 1),
 ('them', 1),
 ('building', 1),
 ('the', 1),
 ('going', 1),
 ('spin', 1),
 ('it.', 1),
 ('We’re', 1),
 ('us', 1),
 ('a', 1),
 ('the', 1),
 ('you.', 1),
 ('not', 1),
 ('I', 1),
 ('I', 1),
 ('landscape', 1)]

Now we use the function reduceByKey in Spark. It groups by key and applies reduce to the list of values for each key.

In [28]:
counts = keyVal.reduceByKey(lambda m, n: m + n)
counts.takeSample(False,100)

[('she’d', 2),
 ('Bibi.', 4),
 ('wages.', 2),
 ('contribute', 1),
 ('Phony.', 1),
 ('excavator.', 1),
 ('Cole.', 1),
 ('PROBLEM', 3),
 ('growth,', 2),
 ('reading,', 1),
 ('Rich', 1),
 ('…Do', 1),
 ('successful', 28),
 ('similar', 2),
 ('Ole', 2),
 ('FIGHTING.', 1),
 ('Pfizer?', 1),
 ('Street.', 1),
 ('range', 1),
 ('independents,', 1),
 ('chances.', 1),
 ('night', 23),
 ('spewing', 2),
 ('musicians', 1),
 ('Seriously.', 1),
 ('Third', 3),
 ('ministers.', 2),
 ('Authority', 1),
 ('candidacy?"', 1),
 ('Trump?', 4),
 ('incredible,', 3),
 ('something', 138),
 ('claiming', 1),
 ('possible,', 1),
 ('competitive', 1),
 ('whom', 8),
 ('taken,', 1),
 ('Stop', 1),
 ('sake', 1),
 ('accomplished', 3),
 ('SON.', 1),
 ('Gov.', 4),
 ('this.', 84),
 ('MESSENGER,', 1),
 ('originally', 2),
 ('bill.', 1),
 ('adds', 1),
 ('insulted,', 1),
 ('CNBC.', 1),
 ('really,', 11),
 ('Independents', 1),
 ('Nah,', 1),
 ('COACHES,', 1),
 ('end,', 11),
 ('points."', 1),
 ('racial', 3),
 ('America"', 2),
 ('wants.', 2),

We could as well write this whole thing in a more crips code.

In [29]:
# def processText(text):
#    return text.split(" ")

In [31]:
lines = sc.textFile("/Users/Sharanya/spark-playground/Trump.txt")
counts = lines.flatMap(lambda line: line.split(" ")) \
             .map(lambda word: (word , 1)) \
             .reduceByKey(lambda m,n: m + n)

# We can save the result if we want
counts.saveAsTextFile("/Users/Sharanya/spark-playground/Trump_wordcount")
counts.takeSample(True,20)

[('shows.', 3),
 ('bound,', 1),
 ('likewise,', 3),
 ('Gov.', 4),
 ('firm', 1),
 ('ACHIEVEMENT', 1),
 ('thing…I', 1),
 ('premiums', 3),
 ("weren't", 1),
 ('statisticians.', 1),
 ('unbelievably', 9),
 ('essentially', 8),
 ('Icahn."', 1),
 ('Won’t', 1),
 ('tens', 15),
 ('federal', 3),
 ('9%,', 1),
 ('laws.', 4),
 ('REGULATIONS', 2),
 ('"What?"', 2)]

# A Monte-Carlo simulation

Estimate the value of Pi, the area of a unit circle.

In [32]:
%%time

import numpy as np

def inside(p):
    x, y = np.random.random(), np.random.random()
    return x*x + y*y < 1

NUM_SAMPLES = 10000 * 1000
count = sc.parallelize(range(0, NUM_SAMPLES),4)\
          .filter(inside).count()
#print(count)

print("Pi is roughly %f" % (4.0 * count / NUM_SAMPLES))

Pi is roughly 3.142102
CPU times: user 38.4 ms, sys: 7.76 ms, total: 46.2 ms
Wall time: 9.1 s
