In [48]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://www-us.apache.org/dist/spark/spark-2.4.6/spark-2.4.6-bin-hadoop2.7.tgz
!tar xf spark-2.4.6-bin-hadoop2.7.tgz
!pip install -q findspark
!wget https://www.gutenberg.org/files/2600/2600-0.txt -O war-and-peace.txt

--2020-07-29 12:17:16--  https://www.gutenberg.org/files/2600/2600-0.txt
Resolving www.gutenberg.org (www.gutenberg.org)... 152.19.134.47, 2610:28:3090:3000:0:bad:cafe:47
Connecting to www.gutenberg.org (www.gutenberg.org)|152.19.134.47|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 3359549 (3.2M) [text/plain]
Saving to: ‘war-and-peace.txt’


2020-07-29 12:17:23 (521 KB/s) - ‘war-and-peace.txt’ saved [3359549/3359549]



In [49]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.6-bin-hadoop2.7"

In [50]:
import findspark
findspark.init()
from pyspark import SparkContext

sc = SparkContext.getOrCreate()

In [51]:
sc

In [52]:
# A helper function to compute the list of words in a line of text
import re
def get_words(line):
    return re.compile('\w+').findall(line)

print(get_words("This, is a test!"))

['This', 'is', 'a', 'test']


### Learning activity: Create RDD with `parallelize`
Transform the list `words` into an rdd. The count should return `3`

In [53]:
words = ["Hello", "Spark", "1"]

In [54]:
sc.parallelize(words).count()

3

### Learning activity: Create RDDs

To analyse large datasets using Spark you will load them into Resilient Distributed Datasets (RDDs). There are a number of ways in which you can create RDDs. Use the `parallelize()` function to create one from a Python collection, and use the `textFile()` function to create an RDD from the file `data/war-and-peace.txt`. 

In [55]:
data = sc.textFile("war-and-peace.txt")

### Learning activity: Basic RDD manipulation

Print the number of lines in War and Peace using the method `count()`

In [56]:
data.count()

66055

Print the first 15 lines using the method `take()`.

In [57]:
data.take(5)

['',
 'The Project Gutenberg EBook of War and Peace, by Leo Tolstoy',
 '',
 'This eBook is for the use of anyone anywhere at no cost and with almost',
 'no restrictions whatsoever. You may copy it, give it away or re-use']

In [58]:
data.filter(lambda line: 'war' in get_words(line)).count()

266

### Learning activity: `filter()` and `map()` and `distinct()`

Use `filter()` to count the number of lines which mention `war` and the number of lines which mention `peace`.

In [59]:
# How often is war mentioned?
data.filter(lambda line: "war" in get_words(line)).count()
#data.map(lambda line: get_words(line)).take(5)

266

In [60]:
# How often is peace mentioned?
data.filter(lambda line: "peace" in get_words(line)).count()

104

Use `map()` to capitalise each line in the RDD, and print the first 15 capitalized lines.

In [61]:
# Capitalize each line in the RDD
data.map(lambda line: line.upper()).take(20)

['',
 'THE PROJECT GUTENBERG EBOOK OF WAR AND PEACE, BY LEO TOLSTOY',
 '',
 'THIS EBOOK IS FOR THE USE OF ANYONE ANYWHERE AT NO COST AND WITH ALMOST',
 'NO RESTRICTIONS WHATSOEVER. YOU MAY COPY IT, GIVE IT AWAY OR RE-USE',
 'IT UNDER THE TERMS OF THE PROJECT GUTENBERG LICENSE INCLUDED WITH THIS',
 'EBOOK OR ONLINE AT WWW.GUTENBERG.ORG',
 '',
 '',
 'TITLE: WAR AND PEACE',
 '',
 'AUTHOR: LEO TOLSTOY',
 '',
 'TRANSLATORS: LOUISE AND AYLMER MAUDE',
 '',
 'POSTING DATE: JANUARY 10, 2009 [EBOOK #2600]',
 '',
 'LAST UPDATED: JANUARY 21, 2019',
 '',
 'LANGUAGE: ENGLISH']

Use `flatMap()` to create an RDD of the words in War and Peace and count the number of words.

In [62]:
# Split each line into words using get_words()
words = data.flatMap(lambda line: get_words(line))
words.count()

576629

Finally, use `distinct()` to count the number of different words in the RDD.

In [63]:
# Count the number of distinct words
words.distinct().count()

19423

### Learning activity: Set like transformations

Use the function `union()` to create an RDD of lines with either war or peace mentioned. Count how many lines.

In [64]:
war_lines = data.filter(lambda line: 'war' in get_words(line) )
pie_lines = data.filter(lambda line: 'peace' in get_words(line))
war_or_peace_lines = war_lines.union(pie_lines)

In [65]:
war_lines.count(), pie_lines.count(), war_or_peace_lines.count()

(266, 104, 370)

Use the function `intersection()` to create an RDD of lines with both war and peace being mentioned. Count how many lines.

In [66]:
war_n_peace_lins = war_lines.intersection(pie_lines)
war_n_peace_lins.count()

7

Find all the lines that mention both war and peace without using `intersection()`

In [67]:
war_n_peace_lines = data.filter(lambda line: 'war' in get_words(line) and 'peace'  in get_words(line) )
war_n_peace_lines.count()

7

### Learning activity: `reduce()`

You have already seen three actions: `collect()` which returns all elements in the RDD, `take(n)`, which return the first `n` elements of the RDD, and `count()` which returns the number of elements in the RDD.

The action `reduce()` takes as input a function which collapses two elements into one. Use it to find the longest word in War and Peace.

In [68]:
words_len = words.map(lambda w: (w, len(w)))

words_len.max(key=lambda a:a[1])

('characteristically', 18)

The Python function `str.istitle()` returns `True` if the string `str` is titlecased: the first character is uppercase and others are lowercase. Use it to:
* Find the set of distinct words in War and Peace which are titlecased
* Find the set of distinct words in War and Peace which are not titlecased

The Python function `str.lower()` returns a string with all characters of str lowercase. Use it, along with your previously generated RDD to find the set of words in War and Peace which only appear titlecased.

In [69]:
title_words_count = words.filter(lambda w: w.istitle()).distinct().count()


title_words_count, words.count() - title_words_count

(3128, 573501)

### Learning activity: WordCount in Spark

Use the functions `flatMap()` and `reduceByKey()` to count the number of occurences of each word in War and Peace, and print the count of five words.

In [91]:
word_key = words.map(lambda w: (w, 1))

word_key.reduceByKey(lambda x, y: x+y).sortBy(lambda x: x[1], ascending=False).take(5)

[('the', 31954), ('and', 21214), ('to', 16514), ('of', 14939), ('a', 10129)]

### Learning activity: using `groupByKey()`
Reimplement the above word count using `groupByKey()` instead of `reduceByKey()`

In [94]:
word_key.groupByKey().mapValues(sum).sortBy(lambda x: x[1], ascending=False).take(5)

[('the', 31954), ('and', 21214), ('to', 16514), ('of', 14939), ('a', 10129)]