# PySpark Demo and Word Counting with Spark

To get you started, we'll walk you through a bit of Colab specific Python and some PySpark code, and then we'll do the classic word count example, followd by some tasks for you to try.

**Please run through the notebook cell by cell (using 'run' above or 'shift-return' on the keyboard).**

##Preliminaries: Preparing Colab and Spark
1.   When you open this notebook from the shared "Data-Engineering" folder, you don't have write acceess. When you save it, a copy will be created in the folder "Colab Notebooks".
2.   The code below will mount Google Drive as a directory in the file system (the machine is a virtual Linux box). You will be asked to authorise this and provide an authentication code available through a link.



In [1]:
# Load the Drive helper and mount
from google.colab import drive

# This will prompt for authorization.
drive.mount('/content/drive')

ModuleNotFoundError: No module named 'google.colab'

Next we move to the "Colab Notebooks" folder on your drive and create subfolder "data". Then we copy the "hamlet.txt" file there (you can check on Google Drive if it has worked). 

In [None]:
# %cd /content/drive/My\ Drive/
# !mkdir "Colab Notebooks"
# %cd "Colab Notebooks"
# !mkdir data
# %cd data
# !cp "/content/drive/My Drive/Data-Engineering/data/hamlet.txt" .

# instead of copying in the script above, upload the hamlet.txt file into the path above using drive
# you will need to create the above folder structure

# if you use a different naming scheme you will need to adjust the fileName as necessary
# use the 'Files' tab of the menu that is hidden on the left of the screen to see the file system structure

# find out the working directory
!ls -l

Next, we Install Spark (may take a while) and altair for visualisaion. This will need to be done every time a new machine is created. 




In [None]:
# Install spark-related dependencies
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
# !wget -q http://www-eu.apache.org/dist/spark/spark-2.3.3/spark-2.3.3-bin-hadoop2.7.tgz
!wget -q http://www-eu.apache.org/dist/spark/spark-2.4.4/spark-2.4.4-bin-hadoop2.7.tgz
!tar xf spark-2.4.4-bin-hadoop2.7.tgz

!pip install -q findspark
!pip install pyspark
# Set up required environment variables

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-9-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.4-bin-hadoop2.7"
#!pip install pyspark
!pip install altair

If the installation above doesnn't work, try the one below.

## Part 1 - Demo: Apapche Spark API with PySpark

Basically there are 32APIs available in Apache Spark - RDD (Resilient Distributed Datasets) and DataFrame (extended by Dataset in Scala and Java). In this lab we will look at RDDs and Dataframes in Python.

For more information on the Spark framework - visit (https://spark.apache.org)
For more information on the Pyspark API - visit (https://spark.apache.org/docs/latest/api/python/index.html)

In [None]:
from google.colab import drive
drive.mount('/content/drive')

### 1) Access to Spark

We start by cretaing a SparkContext, normally called `sc`. 
We use that to create RDDs and a SparkSession object (for DataFrames), often just called `spark`. 

In [None]:
import findspark
findspark.init()

import pyspark

# get a spark context
sc = pyspark.SparkContext.getOrCreate()
print(sc)
# get the context
spark = pyspark.sql.SparkSession.builder.getOrCreate()
print(spark) 

### 2) RDD Creation

There are two ways to create RDDs. The first is to parallelise a Python object that exists in your driver process (i.e. this one). 

The second way is to create an RDD is by referencing an external dataset such as a shared filesystem, HDFS, HBase, or just data source offering a Hadoop InputFormat. This is what we will be using in this lab (further down).

In [None]:
# Creat an RDD from a Python object in this process (the "driver").
# The parallelize function  creating the "numbers" RDD
data = [1,2,3,4,5]
firstRDD = sc.parallelize(data)
print(firstRDD)

This RDD lives now on as many worker machines as are available and as are deemed useful by Spark.

### 3) RDD operations
RDDs have two kinds of operations: *Transformations* and *Actions*.

*Transformations* create a new RDD by applying a function to the items in the RDD. The function will be remembered, but only be applied when needed ("*lazy* evaluation").

*Actions* produce some output from the data. An *Action* will trigger the execution of all *Transformations*.

Here are some examples:

In [None]:
def myfun(x):
  return x+3
# lambda function: x -> x+3
#RDD2 = firstRDD.map(lambda x:x+3)  
RDD2 = firstRDD.map(myfun)  
print(RDD2)
# nothing happened to far, as there is no action
#RDD2.count()
RDD2.countApprox(10000, 0.95)

If the functions are short (one expression, to be exact), this is more convenient write with a lamba expression, that creates an anonymous function. 

In [None]:
RDD3 = firstRDD.map(lambda x:x+3) # this is the same as using myfun 
print(RDD3)
# nothing happened to far, as there is no action

In [None]:
# "count" is an action and triggers the transformation   
a = RDD2.count() 
print(a)

`collect` is an action that returns the values of the RDD in an Python array, back into this local driver process.

In [None]:
a = RDD2.collect() 
print(a)

b = RDD3.collect() 
print(b)

As we can seee above, *myfun* (RDD2) and the *lambda x: x+3* (RDD3) have the same effect.

Look here for more information about the functions provided by the RDD class: (https://spark.apache.org/docs/2.4.0/api/python/pyspark.html#pyspark.RDD). 

### 4) Dataframes 

Dataframes are a more structured form of storage than RDDs and similar to Pandas dataframes.  

Let us see how to create and use dataframes. There are three ways of creating a dataframe
    a) from an existing RDD.
    b) form and external data source, e.g., loading the data from JSON or CSV files.
    c) Programmatically specifying schema and data.
    
Here is an example for option a). We use the *Row* class to create structure data rows.

In [None]:
from pyspark.sql import Row

dataList = [('Anne',21),('Bob',22),('Carl',29),('Daisy',36)] # our data as a list
rdd = sc.parallelize(dataList) # RDD from the list
peopleRDD = rdd.map(lambda x: Row(name=x[0], age=int(x[1]))) # RDD
peopleDF = spark.createDataFrame(peopleRDD) 
print(peopleDF)

## Part 2: Classic Word Count

We will now do the classic word count example for the MapReduce pattern.

We will apply it to the text of Sheakespeare's play *Hamlet*. For that you should have uploaded the file "hamlet.txt" into the data assets.


### 6) Load the data
First we need to load the text into an RDD (the second method of creating an RDD as mentioned above). 

We need to specify the path, and we can read directly from the shared Data-Engineering directory. 

In [None]:
# filepath = "/content/drive/My Drive/Data-Engineering/data/hamlet.txt"
# use a relative path to the file
filepath = "drive/My Drive/Data-Engineering/data/hamlet.txt" 

In [None]:
# verify that the file is there
!ls -l "drive/My Drive/Data-Engineering/data/"

You can read the file into an RDD with `textFile`. The RDD then contains as items the lines of the text. `take(3)` then gives us the first 3 lines.  

In [None]:
lineRDD = sc.textFile(filepath)
lineRDD.take(3)

### 7) Split lines into words

In order to count the words, we need to split the lines into words. We can do that using the `split` function of the Python String class to separate at each space. 

The map function replaces each item with a new one, in this case our `lambda` returns an array of words (provided by `split(' ')`). However, we want to create one item per word, therefore we need to use a function called `flatMap` that creates a new RDD item for every item in the array returned by the lambda.  

In [None]:
wordRDD = lineRDD.flatMap(lambda x: x.split(' '))
wordRDD.take(3)

Map the words to tuples of the form *(word, 1)*.

In [None]:
word1RDD = wordRDD.map(lambda x: (x, 1))
word1RDD.take(3)

### 8) Count by reducing
For Spark, the first part in each tuple is the 'key'. Now we can use reduceByKey() to add the 1s and get the number of occurences per word.

In [None]:
wordCountRDD = word1RDD.reduceByKey(lambda x,y: x+y )
wordCountRDD.take(3)

### 9) Filtering 

There are many empty strings returned by the splitting. We can remove them by filtering.
Then can take a shortcut and use a ready-made functions 'count by value', which does the same as we before.

In [None]:
wordFilteredRDD = wordRDD.filter(lambda x: len(x)>0)
word1RDD = wordFilteredRDD.map(lambda x: (x, 1))
wordCountRDD = word1RDD.reduceByKey(lambda x,y: x+y )
wcList = wordCountRDD.take(5)
print(wcList)

## Part 3: Tasks for you to work on

Based on the examples above, you can now try and write some code yourself.  Look for the lines starting with **>>>**. You neeed to fix them by writing your own code.

## Task 1) Better splitting 

Currently our 'words' can contain punctuation, becausee only spaces are removed. A better way to split is using regular expressions  (Python's 're' package)(https://docs.python.org/3.5/library/re.html?highlight=regular%20expressions). `re.split('\W+', 'my. test. string!')` does a good job. Try it out below by fixing the line that starts with '>>>'.

In [None]:
import re
#>>> wordRDD = lineRDD.flatMap(lambda x: ...)
wordRDD = lineRDD.flatMap(lambda x: re.split('\W+', x)) # apply re.split('\W+', string) here
wordFilteredRDD = wordRDD.filter(lambda x: len(x)>0) # filtering
wordFilteredRDD.take(3)

## 2) Use lower case

Convert all strings to lower case (using `.lower()` provided by the Python string class), so that 'Test' and 'test' count as the same. Package it into one a tuple of the form (word,1) in the same call.

In [None]:
#>>> wordLowerRDD = wordFilteredRDD.map(lambda x: ... )
wordLowerRDD = wordFilteredRDD.map(lambda x: x.lower() )
wordLowerRDD.take(3)

In [None]:
word1RDD = wordLowerRDD.map(lambda x: (x,1)) # we can now get better word count results
wordCountRDD = word1RDD.reduceByKey(lambda x,y: x+y) # we can now get better word count results
wordCountRDD.take(5)

## 3) Filter rare words

Add a filtering step call remove all words with less than 5 occurrences. This can be useful to identify common topics in documents, where very rare words can be misleading. 

In [None]:
# the trick here is to apply the lambda only to the second part of each item, i.e. x[1] 
#freqWordsRDD = wordCountRDD.filter(lambda x:  ... ) # tip: filter keeps the times where the lambda returns true.
freqWordsRDD = wordCountRDD.filter(lambda x:  x[1]>5 ) # tip: filter keeps the times where the lambda returns true.
freqWordsRDD.take(5)


## 4) List only stopwords

Stopwords are frequent words that are not topic-specifc.  Stopwords can be useful for recognising the style of an author. Removing stopwords can be useful in regocnising the topic of a document. 

Below is a small list of stopwords. Filter the tuples where the first part is a stopword.

In [None]:
stopWordList = ['the','a','in','of','on','at','for','by','I','you','me'] 
#stopWordsRDD = freqWordsRDD.filter(lambda x:  ...) # the 1st part of the tuple should be in the list
stopWordsRDD = freqWordsRDD.filter(lambda x: x[0] in stopWordList ) # the 1st part of the tuple should be in the list 

stopWordsRDD.take(10)

In [None]:
stopWordsRDD.count()

There are only a few words, so we can see the vies results. 

In [None]:
output = stopWordsRDD.collect() 
for (word, count) in output:
    print("%s: %i" % (word, count))

We can now visualise the stopword counts.

In [None]:
import pandas as pd
word_list=[]
item_list=[]
for item in output:
  (word,count)=item
  word_list.append(word)
  item_list.append(count)
df3=pd.DataFrame({'words':word_list,'items':item_list})
df3