# BDPP Course lab 3 (Resilient Distributed Datasets)

Welcome to the third lab of BDPP course! 

Here is a summary of what we will cover in this lab:

- Work with the SparkSession objects
- Speed benchmarking
- Work with Resilient Distributed Datasets

**Note:- We assume you are using pyspark on ALREADY created conda environment in LAB 1. If you have missed it, please refer to LAB 1.**

# 1. SparkSession

PySpark applications start with initializing `SparkSession` which is the entry point of PySpark as below. In case of running it in PySpark shell via <code>pyspark</code> executable, the shell automatically creates the session in the variable <code>spark</code> for users.

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

#### Check the Spark version
Check the version of the Spark driver application:

In [None]:
spark

Previous spark versions used `SparkContext` and then created `SparkSession`s.
If you need to access `SparkContext` through SparkSession use `sparkContext` attribute:

In [None]:
spark.sparkContext

All the functionality available with `sparkContext` are also available in `sparkSession`. 

### 2.3. Speed benchmark (Spark VS Pandas)

Now, let's use the `sparkSession` object to run a simple benchmark by comparing reading a relatively big CSV file with pandas VS Spark. Although we are still running Spark on our local computer, it ends up reading the file faster than pandas (note that the result highly depends on the parallelization capabilities of your CPU). This demonstrates how Spark dataframes are much faster when compared to their pandas equivalent.

For this experiment, we use a somewhat large Vermont vendor dataset. This data is accessible through [this link](https://data.vermont.gov/Finance/Vermont-Vendor-Payments/786x-sbp3). On this link, please select export and then choose CSV format. Download the file rename it to `Vermont_Vendor_Payments.csv` and place it in the `files` folder next to this notebook. Now, run the following two code snippets.

In [None]:
%%timeit

# loading csv file with Spark
housing = spark.read.csv("files/Vermont_Vendor_Payments.csv", header="true")

In [None]:
# installl pandas library if you don't have it.

import pandas as pd

In [None]:
%%timeit

# loading csv file with Pandas
df_pandas = pd.read_csv("files/Vermont_Vendor_Payments.csv")

### Question1: Use the cell below to report your observations from this experiment and compare the run times

In [None]:

""" TODO_1: change the format of this cell to Markdown and answer the question here """


### 2.4. Speed benchmark ($\pi$ calculation)

Spark can also be used for compute-intensive tasks. This code estimates $\pi$ by "throwing darts" at a circle. 
We pick random points in the unit square ((0, 0) to (1,1)) and see how many fall in the unit circle. 
The fraction should be $\pi / 4$, so we use this to get our estimate.

In [None]:
# π calculation code

import random

num_samples = 10000000 # you can change this number, e.g. try 100000000

def inside(p):
    x, y = random.random(), random.random()
    return x*x + y*y < 1 # if the point is inside the circle return True

def spark_pi_calc():
    # here we do the pi calcaulation using Spark
    count = spark.sparkContext.parallelize(range(0, num_samples)).filter(inside).count() # count the number of points inside the circle
    return (4.0 * count / num_samples) # return the estimated pi value

def python_pi_calc():
    # here we do the same calculation with python list comprehension
    count = sum([inside(_) for throw in range(num_samples)])  # count the number of points inside the circle
    return (4.0 * count / num_samples) # return the estimated pi value

In [None]:
# Depending on your hardware, this can take some time to fininsh.
# You can reduce num_samples if it is taking too much time.

print("[Spark] Pi is roughly:", spark_pi_calc())

In [None]:
# Depending on your hardware, this can take some time to fininsh.
# You can reduce num_samples if it is taking too much time.

print("[Python] Pi is roughly:", python_pi_calc())

Now, we use $\pi$ calculation code to benchmark spark VS python. For small problems, python might work faster than spark because of the initial setup cost of spark. However, as the problem gets bigger, the spark code starts to show its benefit and runs faster than python.

In the code below, we start from a small `num_samples` and keep doubling it until the python loop exceeds `max_time` (here it is set to 10 seconds by default). We collect running times for spark and python codes and produce a plot displaying time VS num_samples. 

We want you to play with the `max_time` parameter until the problem gets big enough so that you observe the spark code runs faster than the python code. This, of course, highly depends on the parallelization capacity of your CPU, and you may end up getting different results.

In [None]:
import timeit, time

max_time = 10 # you can also try 1, 2, 5, and 10 depending on your hardware performance.

print('Running experiment. This may take a few minutes to run.')
print('You can change max_time value to increase or decrease run time.')
print('(please wait)')

num_samples = 10000
steps = []
python_times = []
sparks_times = []

def my_timeit(func):
    runs = 3  # If the experiment is still taking too much time to run, you may decrease this value as well.
    dtime = timeit.timeit(func, number=runs)
    elapsed = dtime/runs
    return elapsed

start = time.time()
while True:
    pt = my_timeit(python_pi_calc)
    st = my_timeit(spark_pi_calc)
    python_times.append(pt)
    sparks_times.append(st)
    steps.append(num_samples)
    print(min(int(pt * 100), max_time*100), '/', max_time*100)
    if pt > max_time:
        break
        
    if pt > max_time:
        break
    elif pt < 0.1:
        num_samples = num_samples * 10
    else:
        num_samples = num_samples * 2
print(f"Done! Total time = {time.time()-start:.2f}s")

In [None]:
!pip install matplotlib

In [None]:
%matplotlib inline
import matplotlib.pyplot as plt

plt.figure()
plt.plot(steps, python_times, color='red', label='python')
plt.plot(steps, sparks_times, color='blue', label='spark')
plt.legend()
plt.xlabel('number of samples', fontsize=12)
plt.ylabel('running time (seconds)', fontsize=12)
plt.title(r'Speed benchmark ($\pi$ calculation)')
plt.show()

### Question2: Use the cell below to report your observations from this experiment and compare the run times. Can Spark implementation run faster than Python on a single computer? How?

In [None]:
""" TODO_2: change the format of this cell to Markdown and answer the question here """


__Hint__: If you pick a big enough value for the `num_samples` parameter (code below), you should be able to see multiple python processes running at the same time in your (system monitor/task manager) when benchmarking spark code (A).

In [None]:
# code (A) Spark 
# Depending on your hardware, this can take some time to fininsh.
# You can reduce num_samples if it is taking too much time.

num_samples = 300000000  # reduce this number if it is taking too much time to run
spark_pi_calc()

In [None]:
# code (B) Python - List Comprehension
# Depending on your hardware, this can take some time to fininsh.
# You can reduce num_samples if it is taking too much time.

num_samples = 100000000  # reduce this number if it is taking too much time to run
python_pi_calc()

<div class="alert alert-block alert-info">

__Question3__: Run the above experiment three times for each configuration and report the running times and mean and standard deviation for each setup. Discuss how adding extra resources affects the running time?

</div>

In [None]:
""" TODO_3: change the format of this cell to Markdown and answer the question here """

<a id="rdd"></a>
## 3. Work with Resilient Distributed Datasets
Spark uses an abstraction for working with data called a Resilient Distributed Dataset (RDD). An RDD is a collection of elements that can be operated on in parallel. \
RDDs are **immutable**, so you can't update the data in them. To update data in an RDD, you must create a new RDD. In Spark, all work is done by creating new RDDs, transforming existing RDDs, or using RDDs to compute results. 
When working with RDDs, the Spark driver application automatically distributes the work across the cluster.

You can construct RDDs by parallelizing existing Python collections (lists), by manipulating RDDs, or by manipulating files in HDFS or any other storage system.

You can run these types of methods on RDDs: 
 - Actions: query the data and return values
 - Transformations: manipulate data values and return pointers to new RDDs. 

Find more information on Python methods in the <a href="http://spark.apache.org/docs/latest/api/python/pyspark.html" target="_blank" rel="noopener noreferrer">PySpark documentation</a>.

<a id="rdd1"></a>
### 3.1 Create a collection
Create a Python collection of the numbers 1 - 10:

In [None]:
x = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

<a id="rdd2"></a>
### 3.2 Create an RDD 
Put the collection into an RDD named `x_nbr_rdd` using the `parallelize` method:

In [None]:
x_nbr_rdd = spark.sparkContext.parallelize(x)

Notice that there's no return value. The `parallelize` method didn't compute a result, which means it's a transformation. Spark just recorded how to create the RDD.

<a id="rdd3"></a>
### 3.3 View the data 
View the first element in the RDD:

In [None]:
x_nbr_rdd.first()

Each number in the collection is in a different element in the RDD. Because the `first()` method returned a value, it is an action. 

Now view the first five elements in the RDD:

In [None]:
x_nbr_rdd.take(5)

view all data

In [None]:
x_nbr_rdd.collect()

Be careful with the `collect` method! It returns __all__ elements of the RDD to the driver. Returning a large data set might be not be very useful. No-one wants to scroll through a million rows!

<a id="rdd4"></a>
### 3.4 Create another RDD 
Create a Python collection that contains strings:

In [None]:
y = ["Hello Human", "My Name is Spark"]

Put the collection into an RDD:

In [None]:
y_str_rdd = spark.sparkContext.parallelize(y)

View the first element in the RDD:

In [None]:
y_str_rdd.take(1)

You created the string "Hello Human" and you returned it as the first element of the RDD. To analyze a set of words, you can map each word into an RDD element.

<a id="trans"></a>
## 4. Manipulate data in RDDs

Remember that to manipulate data, you use transformation functions.

Here are some common Python transformation functions that you'll be using in this notebook:

 - `map(func)`: returns a new RDD with the results of running the specified function on each element  
 - `filter(func)`: returns a new RDD with the elements for which the specified function returns true   
 - `distinct([numTasks]))`: returns a new RDD that contains the distinct elements of the source RDD
 - `flatMap(func)`: returns a new RDD by first running the specified function on all elements, returning 0 or more results for each original element, and then flattening the results into individual elements

You can also create functions that run a single expression and don't have a name with the Python `lambda` keyword. For example, this function returns the sum of its arguments: `lambda a , b : a + b`.

<a id="trans1"></a>
### 4.1 Update numeric values
Run the `map()` function with the `lambda` keyword to replace each element, X, in your first RDD (the one that has numeric values) with X+1. Because RDDs are immutable, you need to specify a new RDD name.

In [None]:
# TODO_4: Replace <FILL IN> with appropriate code

x_nbr_rdd_2 = x_nbr_rdd.map(lambda x: <FILL IN>)

Now look at the elements of the new RDD: 

In [None]:
x_nbr_rdd_2.collect()

<a id="trans2"></a>
### 4.2 Add numbers in an array
An array of values is a common data format where multiple values are contained in one element. You can manipulate the individual values if you split them up into separate elements.

Create an array of numbers by including quotation marks around the whole set of numbers. If you omit the quotation marks, you get a collection of numbers instead of an array.

In [None]:
X = ["1,2,3,4,5,6,7,8,9,10"]

Create an RDD for the array:

In [None]:
y_rd = spark.sparkContext.parallelize(X)

In [None]:
y_rd.take(1)

Split the values at commas and add values in the positions 3 and 7 in the array. Keep in mind that an array starts with position 0. Use a backslash character, \, to break the line of code for clarity.

In [None]:
# TODO_5: Replace <FILL IN> with appropriate code

Sum_rd = y_rd.map(lambda y: <FILL IN>).map(lambda y: <FILL IN>)

Now return the value of the sum:

In [None]:
Sum_rd.first()

You should get `12`.

<a id="trans3"></a>
### 4.3 Split and count text strings

Create an RDD with a text string and show the first element:

In [None]:
Words = ["Hello Human. I'm Spark and I love running analysis on data."]
words_rd = spark.sparkContext.parallelize(Words)
words_rd.first()

Split the string into separate lines at the space characters and look at the first element:

In [None]:
# TODO_6: Replace <FILL IN> with appropriate code

Words_rd2 = words_rd.map(lambda line: <FILL IN>)
Words_rd2.first()

Count the number of elements in this RDD with the `count()` method:

In [None]:
Words_rd2.count()

You should get `1`.

Of course, you already knew that there was only one element because you ran the `first()` method and it returned the whole string. Splitting the string into multiple lines did not create multiple elements.

Now split the string again, but this time with the `flatmap()` method, and look at the first three elements:

In [None]:
# TODO_7: Replace <FILL IN> with appropriate code

words_rd2 = words_rd.flatMap(lambda line: <FILL IN>)
words_rd2.take(3)

In [None]:
words_rd2.count()

You should get `11`.
This time each word is separated into its own element.

<a id="trans4"></a>
### 4.4 Count words with a pair RDD
A common way to count the number of instances of words in an RDD is to create a pair RDD. A pair RDD converts each word into a key-value pair: the word is the key and the number 1 is the value. Because the values are all 1, when you add the  values for a particular word, you get the number of instances of that word.

Create an RDD:

In [None]:
z = ["First,Line", "Second,Line", "and,Third,Line"]
z_str_rdd = spark.sparkContext.parallelize(z)
z_str_rdd.first()

Split the elements into individual words with the `flatmap()` method:

In [None]:
z_str_rdd_split_flatmap = z_str_rdd.flatMap(lambda line: line.split(","))
z_str_rdd_split_flatmap.collect()

Convert the elements into key-value pairs:

In [None]:
countWords = z_str_rdd_split_flatmap.map(lambda word:(word,1))
countWords.collect()

Now sum all the values by key to find the number of instances for each word: 

In [None]:
from operator import add
countWords2 = countWords.reduceByKey(add)
countWords2.collect()

Notice that the word `Line` has a count of 3.

<a id="filter"></a>
## 5. Filter data

The filter command creates a new RDD from another RDD based on a filter criteria.
The filter syntax is: 

`.filter(lambda line: "Filter Criteria Value" in line)`

Hint: Use a simple python `print` command to add a string to your Spark results and to run multiple actions in single cell.

Find the number of instances of the word `Line` in the `z_str_rdd_split_flatmap` RDD:

In [None]:
# TODO_8: Replace <FILL IN> with appropriate code

words_rd3 = z_str_rdd_split_flatmap.filter(<FILL IN>) 


In [None]:
z_str_rdd_split_flatmap.map(lambda word:(word,1)).reduceByKey(add).filter(lambda word: word[0] == "Line").collect()

You should get `3`.

<a id="wordfile"></a>
## 6. Analyze text data from a file
In this section, you'll use a text file `README.txt` to create an RDD from it, and analyze the text in it. The file should already exist on `files`folder next to this notebook.

<a id="wordfile2"></a>
### 6.2 Create an RDD from the file
Use the `textFile` method to create an RDD named `textfile_rdd` based on the `README.txt` file. The RDD will contain one element for each line in the `README.txt` file.
Also, count the number of lines in the RDD, which is the same as the number of lines in the text file. 

In [None]:
textfile_rdd = spark.sparkContext.textFile("files/README.txt")
textfile_rdd.count()

<a id="wordfile3"></a>
### 6.3 Filter for a word 
Filter the RDD to keep only the elements that contain the word "Spark" with the `filter` transformation:

In [None]:
# TODO_9: Replace <FILL IN> with appropriate code

Spark_lines = textfile_rdd.filter(<FILL IN>)
Spark_lines.first()

You should see `'# Apache Spark'`

Count the number of elements in this filtered RDD and present the result as a concatenated string:

In [None]:
# TODO_10: Replace <FILL IN> with appropriate code

print ("The file README.txt has " + str(Spark_lines.<FILL IN>) + \
" of " + str(textfile_rdd.<FILL IN>) + \
" Lines with word Spark in it.")

You should see `The file README.txt has 19 of 98 Lines with word Spark in it.`

<a id="wordfile4"></a>
### 6.4 Count the instances of a string at the beginning of words
Count the number of times the substring "Spark" appears at the beginning of a word in the original text.

Here's what you need to do: 

1. Run a `flatMap` transformation on the Spark_lines RDD and split on white spaces.
2. Create an RDD with key-value pairs where the first element of the tuple is the word and the second element is the number 1.
3. Run a `reduceByKey` method with the `add` function to count the number of instances of each word.<br>
4. Filter the resulting RDD to keep only the elements that start with the word "Spark". In Python, the syntax to determine whether a string starts with a token is: `string.startswith("token")` 
5. Display the resulting list of elements that start with "Spark".

In [None]:
# TODO_11: write your code here.


You should see:<br>
<pre>
[('Spark', 14),
 ('Spark"](http://spark.apache.org/docs/latest/building-spark.html).', 1),
 ('SparkPi', 2),
 ('Spark](#building-spark).', 1),
 ('Spark.', 1)]
</pre>

<a id="wordfile5"></a>
### 6.5 Count instances of a string within words
Now filter and display the elements that contain the substring "Spark" anywhere in the word, instead of just at the beginning of words like the last section. Your result should be a superset of the previous result.

The Python syntax to determine whether a string contains a particular token is: `"token" in string`

In [None]:
# TODO_12: write your code here.


You should see:
<pre>
[('Spark', 14),
 ('Spark"](http://spark.apache.org/docs/latest/building-spark.html).', 1),
 ('SparkPi', 2),
 ('Spark](#building-spark).', 1),
 ('Spark.', 1),
 ('tests](https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark#ContributingtoSpark-AutomatedTesting).',
  1)]
</pre>

<a id="numfile"></a>
## 7. Analyze numeric data from a file
You'll analyze a sample file `Scores.txt` given in `files` folder that contains instructor names and scores. The file has the following format: Instructor Name,Score1,Score2,Score3,Score4,... The number of scores for each instructor could be diferent.
Here is an example line from the text file: "Carlo,5.5,3,3,4" or "Pablo,9,10,8.6,7,9,5,6"
Your task is to look at all the scores from each instructor and find the maximum score given by each instructor:

1. Load the text file into an RDD.
1. Run a transformation to create an RDD with the instructor names and the scores per instructor.
1. Run a second transformation to compute the maximum score for each instructor. (you may have to convert each score to a float since original value is a string)
1. Display the results.

In [None]:
# TODO_13: write your code here.


You should see:
<pre>
[('Tobias', 8.0),
 ('Malin', 10.0),
 ('Ali', 8.7),
 ('Magnus', 5.0),
 ('Alice', 9.1),
 ('Jack', 7.4)]
</pre>