# Introduction to Spark

In this notebook, you will be introduced to the Apache Spark libary for big data processing. There's a now a python package called `pyspark` which will load Spark for you. Tthe variable `sc` is a Spark context that lets you interact with the Spark runtime. Check that it is correctly initialised:

In [1]:
from pyspark import SparkContext
sc = SparkContext.getOrCreate()
print(sc)
print("Ready to go!")

<SparkContext master=local[*] appName=pyspark-shell>
Ready to go!


### Learning activity: Create RDDs

To analyse large datasets using Spark you will load them into Resilient Distributed Datasets (RDDs). There are a number of ways in which you can create RDDs. Use the `parallelize()` function to create one from a Python collection of elements ["Hello", "World", "!"] 

In [5]:
sc.parallelize(['Hello', 'World', '!'])

ParallelCollectionRDD[4] at parallelize at PythonRDD.scala:175

Use the `textFile()` function to create an RDD from the file `war-and-peace.txt` and store it in a variable called `lines`. 

In [3]:
lines = sc.textFile('war-and-peace.txt')

### Learning activity: Basic RDD manipulation

Print the number of lines in War and Peace using the functions `count()`.

In [33]:
lines.count()
#return integer

54223

Print the first 15 lines using the functions `take()`.

In [19]:
lines.take(15)
#return a list

['                                      1869',
 '                                 WAR AND PEACE',
 '                                 by Leo Tolstoy',
 'BK1',
 '                                 BOOK ONE: 1805',
 'BK1|CH1',
 '  CHAPTER I',
 '',
 '  "Well, Prince, so Genoa and Lucca are now just family estates of the',
 "Buonapartes. But I warn you, if you don't tell me that this means war,",
 'if you still try to defend the infamies and horrors perpetrated by',
 'that Antichrist- I really believe he is Antichrist- I will have',
 'nothing more to do with you and you are no longer my friend, no longer',
 "my 'faithful slave,' as you call yourself! But how do you do? I see",
 'I have frightened you- sit down and tell me all the news."']

### Learning activity: `filter()` and `map()` and `distinct()`

Lets apply some transformations onto RDDs. The following helper function will be useful to select the words from a line.

In [8]:
# A helper function to compute the list of words in a line of text
import re
def get_words(line):
    return re.compile('\w+').findall(line)

print(get_words("This, is a test!"))

['This', 'is', 'a', 'test']


Use `filter()` to count the number of lines which mention `war` and the number of lines which mention `peace`.

In [46]:
# How often are war and peace mentioned?

#if doesn't contain war, it returns false and get ignored
#the get_words function avoid the counting of word warning in such a war
war_lines = lines.filter(lambda l: 'war' in get_words(l))
print(war_lines.count())

peace_lines = lines.filter(lambda l: 'peace' in get_words(l))
print(peace_lines.count())

265
104


Use `map()` to capitalise each line in the RDD, and print the first 15 capitalized lines.

In [23]:
# Capitalize each line in the RDD

#instead of using for loop, the lambda function can be used to call the function parallely
cap_lines = lines.map(lambda l: l.upper())
cap_lines.take(15)

['                                      1869',
 '                                 WAR AND PEACE',
 '                                 BY LEO TOLSTOY',
 'BK1',
 '                                 BOOK ONE: 1805',
 'BK1|CH1',
 '  CHAPTER I',
 '',
 '  "WELL, PRINCE, SO GENOA AND LUCCA ARE NOW JUST FAMILY ESTATES OF THE',
 "BUONAPARTES. BUT I WARN YOU, IF YOU DON'T TELL ME THAT THIS MEANS WAR,",
 'IF YOU STILL TRY TO DEFEND THE INFAMIES AND HORRORS PERPETRATED BY',
 'THAT ANTICHRIST- I REALLY BELIEVE HE IS ANTICHRIST- I WILL HAVE',
 'NOTHING MORE TO DO WITH YOU AND YOU ARE NO LONGER MY FRIEND, NO LONGER',
 "MY 'FAITHFUL SLAVE,' AS YOU CALL YOURSELF! BUT HOW DO YOU DO? I SEE",
 'I HAVE FRIGHTENED YOU- SIT DOWN AND TELL ME ALL THE NEWS."']

Use `flatMap()` to create an RDD of the words in War and Peace and count the number of words.

In [31]:
# Split each line into words using get_words()

#the usage of get_words is make each word a line, instead of numerous words in a row or in a line
words = lines.flatMap(lambda l: get_words(l))
print(words.count())

#the first 10 words
words.take(10)

573322


['1869', 'WAR', 'AND', 'PEACE', 'by', 'Leo', 'Tolstoy', 'BK1', 'BOOK', 'ONE']

Finally, use `distinct()` to count the number of different words in the RDD.

In [32]:
# Count the number of distinct words
distinct_words = words.distinct()
distinct_words.count()

19206

### Learning activity: Set like transformations

Use the functions `union()` and `intersection()` to create RDDs of lines with either war or peace mentioned, and both war and peace being mentioned. Count how many lines of each type there are and print some examples.

In [43]:
union_lines = war_lines.union(peace_lines)
print('RDDs of lines which have either war or peace mentioned: ', union_lines.count())

intersect_lines = war_lines.intersection(peace_lines)
print('RDDs of lines which have both war and peace mentioned: ', intersect_lines.count())

RDDs of lines which have either war or peace mentioned:  369
RDDs of lines which have both war and peace mentioned:  7


### Learning activity: `reduce()`

You have already seen three actions: `collect()` which returns all elements in the RDD, `take(n)`, which return the first `n` elements of the RDD, and `count()` which returns the number of elements in the RDD.

The action `reduce()` takes as input a function which collapses two elements into one. Use it to find the longest word in War and Peace.

In [59]:
#reduce works by comparing one data with the neighbouring data
def longer_words(x, y):
    if len(x) > len(y):
        return x
    else:
        return y

words.reduce(lambda x, y: longer_words(x, y))

#words.reduce(lambda x, y: return x if len(x) > len(y) else y)

'characteristically'

### Bonus activity: merging filters

Find all the lines that mention both war and peace **without** using `intersection()`.

In [60]:
merging_lines = lines.filter(lambda l: 'war' in get_words(l)).filter(lambda l: 'peace' in get_words(l))
#both works the same
#merging_lines = lines.filter(lambda l: 'war' in get_words(l) and 'peace' in get_words(l))
merging_lines.count()

7

### Bonus learning activity: Finding proper nouns

The Python function `str.istitle()` returns `True` if the string `str` is titlecased: the first character is uppercase and others are lowercase. Use it to:
* Find the set of distinct words in War and Peace which are titlecased
* Find the set of distinct words in War and Peace which are not titlecased

In [64]:
titlecased_words = words.filter(lambda l: l.istitle()).distinct()
titlecased_words.count()

3068

The Python function `str.lower` returns a string with all characters of `str` lowercase. Use it, along with your previously generated RDD to find the set of words in War and Peace which only appear titlecased.

In [73]:
#not_titlecased_words = words.filter(lambda l: not l.istitle()).distinct()

not_titlecased_words = words.subtract(titlecased_words).distinct()
print(not_titlecased_words.count())

#to extract the titlecased words to only to proper nouns. words such as 'the' even though is titlecased, might be caused by
#the placement that it is at the beginning of the sentence.
proper_nouns = titlecased_words.subtract(not_titlecased_words.map(lambda l: l.title()))
proper_nouns.take(15)
#the proper nouns are only names

16138


['Leo',
 'Lucca',
 'Kuragin',
 'Buonaparte',
 'God',
 'Hardenburg',
 'Wintzingerode',
 'Funke',
 'Lavater',
 'Bolkonski',
 'Kutuzov',
 'Helene',
 'Russian',
 'Andrew',
 'Boris']

# Key/Value pairs in Spark

### Learning activity: WordCount in Spark

Use the functions `flatMap()` and `reduceByKey()` to count the number of occurences of each word in War and Peace, and print the count of five words.

In [95]:
word_count = words.map(lambda l: (l, 1)).reduceByKey(lambda a, b: a+b)
word_count.take(5)

[('PEACE', 1), ('Leo', 1), ('BOOK', 15), ('ONE', 2), ('1805', 24)]

### Learning activity: using `groupByKey()`

Reimplement the above word count using `groupByKey()` instead of `reduceByKey()`

In [96]:
word_count = words.map(lambda l: (l, 1)).groupByKey().map(lambda l: (l[0], sum(l[1])))
word_count.take(5)

[('PEACE', 1), ('Leo', 1), ('BOOK', 15), ('ONE', 2), ('1805', 24)]

### Bonus learning activity: computing the average of each key

The pair RDD defined below `word_line_pairs` has an element for each line in War and Peace with as key the first word, and as value the line itself. Use it to compute the average length of each line for each starting word.

In [111]:
word_line_pairs = lines.filter(lambda line: len(get_words(line)) > 0).map(lambda line: (get_words(line)[0], line))     

In [112]:
#map the first word to how many occurence of the word
word_line_pairs = word_line_pairs.map(lambda l: (l[0], (len(l[1]), 1)))
#count the occurence
word_line_pairs = word_line_pairs.reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1]))
#count the average of the how many words in the first word
word_line_pairs = word_line_pairs.map(lambda l: (l[0], l[1][0] / float(l[1][1])))
word_line_pairs.take(5)

[('BOOK', 48.46666666666667),
 ('It', 62.230496453900706),
 ('Fedorovna', 68.0),
 ('of', 59.47846889952153),
 ('reception', 66.0)]