# Word Count on Amazon EMR

---

#### Tasks: 

- Finding top 100 words and their counts based on word count for the BookReviews_5M dataset
- Calculate average and standard deviation of execution times over 3 runs for these three settings:
    1. BookReviews_1M - 1 master + 1 worker node 
    2. BoookReviews_5M - 1 master + 1 worker node
    3. BookReviews_5M - 1 master + 3 worker nodes

---

#### Documentation:

PySpark API Documentation: https://spark.apache.org/docs/latest/api/python/index.html




### 1. Starting Spark Session

In [1]:
# Initialize Spark

import pyspark
from pyspark.sql import SparkSession, Row

print (spark.version, pyspark.version.__version__)

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
3,application_1615356276276_0005,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

3.0.1-amzn-0 3.0.1+amzn.0

In [2]:
# Record the starting time of execution for timing this notebook

import time
start_time = time.time()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [3]:
# HDFS file path of the 5M dataset.
dataFileName = "hdfs:///data/BookReviews_5M.txt"


# Read data from the above file path and convert it to a dataframe. 
textDF = spark.read.text(dataFileName)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### 2. Examining the data

Task: 
1. Examine the contents of the dataframe.

Expected output: 
1. Print the schema of the raw dataframe, as well as its first 25 rows. 

In [4]:
textDF.printSchema()
textDF.show(25)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- value: string (nullable = true)

+--------------------+
|               value|
+--------------------+
|This was the firs...|
|Also after going ...|
|As with all of Ms...|
|I've not read any...|
|This romance nove...|
|Carolina Garcia A...|
|Not only can she ...|
|Once again Garcia...|
|The timing is jus...|
|Engaging. Dark. R...|
|Set amid the back...|
|This novel is a d...|
|If readers are ad...|
| Reviewed by Phyllis|
|      APOOO BookClub|
|A guilty pleasure...|
|In the tradition ...|
|Beryl Unger, top ...|
|What follows is a...|
|The book flap say...|
|I'd never before ...|
|The novel's narra...|
|It is centered on...|
|If you like moder...|
|Beryl Unger is a ...|
+--------------------+
only showing top 25 rows

### 3. Cleaning the data

Task:
1. Remove all punctuations and convert all characters to lower case.

Expected output:
1. The first 25 rows of a dataframe, with a column containing the cleaned sentences. 

In [5]:
from pyspark.sql.functions import regexp_replace, trim, col, lower
def removePunctuation(column):
    """Removes punctuation, changes to lower case, and strips leading and trailing spaces."""
    return trim(lower(regexp_replace(column, "[^A-Za-z0-9 ]", ""))).alias("sentence")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [6]:
print(removePunctuation(textDF.value))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Column<b'trim(lower(regexp_replace(value, [^A-Za-z0-9 ], ))) AS `sentence`'>

In [7]:
# Execute the column expressions generated by removePunctuation() to clean the sentences
removedDF = textDF.select(removePunctuation(textDF.value))
removedDF.show(25)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+
|            sentence|
+--------------------+
|this was the firs...|
|also after going ...|
|as with all of ms...|
|ive not read any ...|
|this romance nove...|
|carolina garcia a...|
|not only can she ...|
|once again garcia...|
|the timing is jus...|
|engaging dark rea...|
|set amid the back...|
|this novel is a d...|
|if readers are ad...|
| reviewed by phyllis|
|      apooo bookclub|
|a guilty pleasure...|
|in the tradition ...|
|beryl unger top e...|
|what follows is a...|
|the book flap say...|
|id never before r...|
|the novels narrat...|
|it is centered on...|
|if you like moder...|
|beryl unger is a ...|
+--------------------+
only showing top 25 rows

### 4. Get dataframe containing unique words and their counts

Task:
1. Split each sentence into words based on the delimiter space (' ').
2. Put each word in each sentence row into their own rows. Put results into a new dataframe.
3. Print out the first 5 rows of the dataframe.

Expected output: 
1. First 5 rows of the output dataframe.


In [8]:
# We assemble the 'split' and 'explode' column expressions, then apply them to the sentence column

from pyspark.sql.functions import split, explode

wordsDF = removedDF.select(explode(split(removedDF.sentence, ' ')).alias('word'))
wordsDF.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----+
| word|
+-----+
| this|
|  was|
|  the|
|first|
| time|
+-----+
only showing top 5 rows

In [9]:
# Filter out all empty rows in the dataframe. 
from pyspark.sql.functions import length

oneword = wordsDF.filter("word != ''")
oneword.show(25)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------+
|          word|
+--------------+
|          this|
|           was|
|           the|
|         first|
|          time|
|             i|
|          read|
|garciaaguilera|
|             i|
|          came|
|          upon|
|           the|
|          name|
|            of|
|          this|
|          book|
|            on|
|          live|
|          with|
|         regis|
|           and|
|         kelly|
|          this|
|          book|
|           was|
+--------------+
only showing top 25 rows

In [10]:
# Group the dataframe by unique words, count each group
wordcount = oneword.groupby('word').count()
wordcount.show(25)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+-----+
|                word|count|
+--------------------+-----+
|              online|40394|
|              brands|19389|
|                hope|33903|
|                150x|  129|
|            everyday|12140|
|               input|33909|
|               spoil|  260|
|              spared|  162|
|               poppy|   39|
|               exept|   89|
|hrefonkyohtrc2707...|    2|
|            priority| 2840|
|           squealing|  138|
|            foamlike|   23|
|              outfit|  543|
|              filing|  538|
|           digitized|  277|
|           viewpoint|  129|
|              peolpe|   15|
|            wristlet|   28|
|             flashed|  806|
|        releasedhave|    1|
|hrefharmony880uni...|    1|
|     advertisedgreat|   14|
|            recibido|   41|
+--------------------+-----+
only showing top 25 rows

### 5. Sorting the word count dataframe in descending.

Task: 
1. Sort the previous dataframe by the counts column in a descending manner. Put results into a new dataframe. 

Expected output:
1. First 25 rows of the sorted word count dataframe. The first row would have the maximum count.

In [11]:
# Sort the dataframe by the 'count' column
wordCountsSortedDF = wordcount.sort(col("count").desc())
wordCountsSortedDF.show(25)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----+--------+
| word|   count|
+-----+--------+
|  the|10642903|
|    i| 6326216|
|   to| 5607568|
|  and| 5537690|
|    a| 5166838|
|   it| 4654902|
|   is| 3242588|
|  for| 2860227|
| this| 2845219|
|   of| 2782166|
|   my| 2319813|
|   in| 2147373|
| with| 2046990|
| that| 1983044|
|   on| 1758801|
|  you| 1754054|
| have| 1632887|
|  but| 1508591|
|  not| 1460730|
|  was| 1434985|
|   as| 1185866|
|  are| 1007811|
|   so|  994529|
|great|  988223|
| very|  893737|
+-----+--------+
only showing top 25 rows

### 6. Record the execution time

Your task: 
1. Print the execution time.

Expected output: The execution time. No particular value is expected.

In [12]:
# Print the time since execution start.
print(time.time() - start_time)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

47.44440722465515

### 7. Saving the sorted word counts directly to S3 as a CSV file

NOTE: Spark uses a distributed memory system, and stores working data in fragments known as "partitions". This is advantageous when a Spark cluster spans multiple machines, as each machine will only require part of the working data to do its own job. By default, Spark will save each of these data partitions into a individual file to avoid I/O collisions. We want only one output file, so we'll need to fuse all the data into a single partition first. 

Task: 
1. Coalesce the previous dataframe to one partition. This makes sure that all our results will end up in the same CSV file. 
2. Save the 1-partition dataframe to S3 using the DataFrame.write.csv() method.

In [13]:
# Save results to S3

#wordCountsSortedDF.coalesce(1).write.csv("s3://willquinnbucket/W9/wordcountAWS.csv", header=True, mode="overwrite")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [14]:
# Stop Spark session

spark.stop()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### 8. Execution times on different dataset and settings.

| Dataset | #Master Nodes | #Core Nodes | Runtime_1 | Runtime_2 | Runtime_3 | Mean | Std |
| :-: | :-: | :-: | :-: | :-: | :-: | :-: | :-: |
| 1M | 1 | 1 | 42.86703324317932 | 43.33017683029175 | 40.96249055862427 | 42.387 | 1.025 | 
| 5M | 1 | 1 | 105.99708557128906 | 106.47394895553589 | 105.98241972923279 | 106.151 | 0.228 | 
| 5M | 1 | 3 | 49.603049516677856 | 50.460874795913696 | 49.93751811981201 | 50.000 | 0.353 |

In [15]:
import numpy as np
print('Mean')
print(np.mean([42.86703324317932, 43.33017683029175, 40.96249055862427]))
print(np.mean([105.99708557128906, 106.47394895553589, 105.98241972923279]))
print(np.mean([49.603049516677856, 50.460874795913696, 49.93751811981201]))
print('Standard Deviation')
print(np.std([42.86703324317932, 43.33017683029175, 40.96249055862427]))
print(np.std([105.99708557128906, 106.47394895553589, 105.98241972923279]))
print(np.std([49.603049516677856, 50.460874795913696, 49.93751811981201]))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Mean
42.38656687736511
106.15115141868591
50.000480810801186
Standard Deviation
1.0245716232552018
0.2283308404897207
0.3530243407721914