In [1]:
#Run this cell for markdown formatting assistance. This is not a part of the assignment.

from IPython.core.display import HTML
table_css = 'table {align:left;display:block} '
HTML('<style>{}</style>'.format(table_css))

# MGTA 466: Analytics Assignment 2 - Word Count

#### Submission on Gradescope:
  * You need to submit the following three files under "PA2". Instructions to generate the csv files are given in their respective sections
      * The current notebook - **PA2_Starter.ipynb** - with all cells executed
      * csv file containing the 100 most frequently occurring words between 5 and 10 characters long (inclusive) and their counts, with columns named `word` and `count` - **100_words.csv**
      * csv file containing execution times for 1,2 and 4 cores across three trial, the average and the standard deviation - **exec_time.csv**
      
#### IMPORTANT submission guidelines enforced by autograder. Please read carefully:
  * Make sure that all the cells in this notebook are executed and that the outputs are present in the expected cells before submission
  * Some cells are marked **DO NOT DELETE**. These cells cannot be deleted and the output of these cells will be used for autograding
  * You can add additional cells, but the **Expected Output** for each of the tasks MUST be the output of the cells marked as such
  * DO NOT print anything other than the *exact* expected output. **Do not include any sentences/words describing the output**. This is strictly enforced by the autograder which checks for an *exact* match of the expected output. For example, if you are expected to print the PySpark version:
      * '10.9.8' - <span style="color:#093">CORRECT</span>
      * 'The PySpark version is 10.9.8' - <span style="color:#FF0000">INCORRECT</span>
  * You can add cells for printing debugging information anywhere, but do not print anything else in **Expected Output** cells other than the expected output for the task
---

Remember: when in doubt, read the documentation first. It's always helpful to search for the class that you're trying to work with, e.g. pyspark.sql.DataFrame. 

PySpark API Documentation: https://spark.apache.org/docs/latest/api/python/index.html

Spark DataFrame Guide:  https://spark.apache.org/docs/latest/sql-programming-guide.html

In [2]:
# Suppress native-hadoop warning
!sed -i '$a\# Add the line for suppressing the NativeCodeLoader warning \nlog4j.logger.org.apache.hadoop.util.NativeCodeLoader=ERROR,console' /$HADOOP_HOME/etc/hadoop/log4j.properties

### 1. Copy data file `BookReviews_1M.txt` to the root of HDFS

This step is similar to Programming Assignment 1

#### **Expected output**: None

### 2. Start Spark Session

##### Change the number of cores in for your program by setting `spark.master` to `local[n]` in this code block where n take values 1,2 and 4.

#### **Expected output**: None

In [3]:
# Change the number of cores in this code block
# by setting `spark.master` to `local[n]` where
# n is the number of cores
import pyspark
from pyspark.sql import SparkSession

conf = pyspark.SparkConf().setAll([('spark.master', 'local[4]'),
                                   ('spark.app.name', 'Basic Setup')])
spark = SparkSession.builder.config(conf=conf).getOrCreate()
pyspark.__version__

'3.5.1'

In [4]:
# Record the starting time of execution for timing this notebook
import time
start_time = time.time()

### 3. Load Data

Read data from the `BookReviews_1M.txt` file on HDFS

#### **Expected output**: None

In [5]:
textDF = spark.read.text("file:///home/jovyan/MGTA466:Session1/Demo1-Local/BookReviews_1M.txt").cache()
textDF.head()

Row(value="This was the first time I read Garcia-Aguilera.  I came upon the name of this book on Live with Regis and Kelly. This book was exactly what I was looking for ... it hit the spot.  I really enjoyed this book because it was well written. Once I started this book it kept me coming back for more. It had culture, family, friendship and romance. I was looking for a little more romance when I picked this book but in the end it turned out to be just right.  I love the main chartachter Margarita (aka Daisy). I've never been to Miami but the way Daisy told the story I certainly felt I'd been there.")

### 4. Clean the data - 2 points

Task: Remove all punctuations and convert all characters to lower case.

Expected output: The first 25 rows of a dataframe, with a column containing the **entire** cleaned sentences. Pass `truncate=False` as argument to `DataFrame.show` to see the entire sentence.

#### Your output would look like this, but the entire sentence:

|            sentence|
----------------------
|this was the firs...|
|also after going ...|
|as with all of ms...|
|ive not read any ...|
|this romance nove...|
|carolina garcia a...|
|not only can she ...|
|once again garcia...|
|the timing is jus...|
|engaging dark rea...|
|set amid the back...|
|this novel is a d...|
|if readers are ad...|
| reviewed by phyllis|
|      apooo bookclub|
|a guilty pleasure...|
|in the tradition ...|
|beryl unger top e...|
|what follows is a...|
|the book flap say...|
|id never before r...|
|the novels narrat...|
|it is centered on...|
|if you like moder...|
|beryl unger is a ...|

only showing top 25 rows

**NOTE** - The above table with cleaned sentences is for illustration only. Your output may differ slightly.

In [6]:
# We provide the following function for building a column expression for Task 1. 
# Do not change this cell. 

# NOTE: Counterintuitively, column objects do NOT store any data; instead they store column expressions (transformations). 
#       The below function takes in a column object, and adds more expressions to it to make a more complex transformation. 
#       Once we have a column object representing the expressions we want, use DataFrame.select(column) to apply the expressions

from pyspark.sql.functions import regexp_replace, trim, col, lower
def removePunctuation(column):
    """Removes punctuation, changes to lower case, and strips leading and trailing spaces."""
    return trim(lower(regexp_replace(column, "[^A-Za-z0-9 ]", ""))).alias("sentence")

In [7]:
# Recommended: take a look at the contents of a column object returned from removePunctuations. What's in there? 
print(removePunctuation(textDF.value))

Column<'trim(lower(regexp_replace(value, [^A-Za-z0-9 ], , 1))) AS sentence'>


In [8]:
cleanedDF = textDF.select(removePunctuation(col("value")))

#### **Expected output**: The first 25 rows of the cleaned dataframe, with a column containing the **entire** cleaned sentences, under a column named `sentence`

In [9]:
cleanedDF.show(25, truncate=False)

+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|sentence                                                                                                         


### 5. Get dataframe containing unique words between 5 and 10 characters long (inclusive) and their counts - 5 points

#### 5.1 Create a dataframe of words - 2 points

#### Tasks:
<ol>
    <li type = "a"> Split each sentence into words based on the delimiter space (' '). </li>
    <li type = "a"> Put each word in each sentence row into their own rows. Put your results into a new dataframe with a single column named <code>word</code>.
</ol>

Useful functions - [pyspark.sql.functions.split](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.split.html), [pyspark.sql.functions.explode](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.explode.html)


#### **Expected output**: The first 5 rows of the resulting dataframe, with a single column named `word`

In [10]:
from pyspark.sql.functions import split, explode, length
wordsDF = cleanedDF.select(explode(split("sentence", " ")).alias("word"))

In [11]:
wordsDF.show(5)

+-----+
| word|
+-----+
| this|
|  was|
|  the|
|first|
| time|
+-----+
only showing top 5 rows



#### 5.2 Filter words that are between 5 and 10 characters long (inclusive) and count them - 3 points

#### Tasks:

<ol>
    <li type = "a"> Filter the dataframe to contain only words that are between 5 and 10 characters long (inclusive). </li>
    <li type = "a"> Group rows in the previous dataframe by unique words, then count the rows in each group.
</ol>

#### Expected output:

<ol>
    <li type = "a"> First 20 rows of the dataframe, where each row contains only one word that is between 5 and 10 characters long (inclusive). The dataframe must not contain empty rows. </li>
    <li type = "a"> First 20 rows of the dataframe containing unique words and their counts. </li>
</ol>

##### The output after filtering words between 5 and 10 characters long (inclusive) would look like this:


|          word|
----------------
|first          |
|regis         |
|kelly           |

... 17 more

##### The output after grouping unique words and their counts would look like this:


|       word|count|
------------|------
|still    |343  |
|those |1    |
|degrade  |2008 |

... 17 more


**NOTE** - The above table with words and counts is for illustration only. Your output should contain all 20 rows for each of the tasks, and your counts may differ.


#### **Expected output**: The first 20 rows of a dataframe, where each row contains only one word between 5 and 10 characters long (inclusive), under a column named `word`.

In [12]:
uni_5_10 = wordsDF.filter((length(col("word")) >= 5) & (length(col("word")) <= 10))
uni_5_10.show(20)

+----------+
|      word|
+----------+
|     first|
|     regis|
|     kelly|
|   exactly|
|   looking|
|    really|
|   enjoyed|
|   because|
|   written|
|   started|
|    coming|
|   culture|
|    family|
|friendship|
|   romance|
|   looking|
|    little|
|   romance|
|    picked|
|    turned|
+----------+
only showing top 20 rows



#### **Expected output**: First 20 rows of the dataframe containing unique words between 5 and 10 characters long (inclusive) and their counts, under columns named `word` and `count` respectively

In [13]:
uni_5_10.groupBy("word").count().show(20)

+----------+-----+
|      word|count|
+----------+-----+
|     still|52574|
|     those|22067|
|   degrade|  343|
| bookshelf|  900|
|amazonings|    1|
| recognize| 2008|
|     inner|  819|
|    harder| 1441|
|   lyrical|   14|
| viewpoint|   37|
|     spoil|   84|
|  historys|    2|
|  everyday| 2493|
| meursault|    1|
| involving|  142|
| connected| 9172|
|    spared|   43|
|    doubts|  320|
|     1970s|  175|
|    brands| 4228|
+----------+-----+
only showing top 20 rows



### 6. Sort the word count dataframe in a **descending** order by count - 2 points

**CHECK** - The first row would have the maximum count.

#### **Expected output**: First 20 rows of the word count dataframe sorted in **descending** order by counts, with columns named `word` and `count`

In [14]:
sortedDF = uni_5_10.groupBy("word").count().orderBy("count", ascending=False)

In [15]:
sortedDF.show(20)

+----------+------+
|      word| count|
+----------+------+
|     great|195349|
|     these|155186|
|     would|118028|
|     sound|111156|
|   quality|106469|
|     works|102626|
|    camera|101803|
|   product| 96848|
|     about| 96704|
|     price| 93010|
|     other| 83774|
|    bought| 79714|
|     which| 74529|
|     there| 71767|
|     after| 70339|
|    really| 66968|
|   because| 63800|
|headphones| 62986|
|    better| 62669|
|    little| 59847|
+----------+------+
only showing top 20 rows



### 7. Record the execution time

#### **Expected output**: The execution time. No particular value is expected. This will be needed in section 10.

In [16]:
print(time.time() - start_time)

105.26768589019775



### 8. Save the sorted word counts to HDFS as a CSV file - 1 point

**NOTE**: Spark uses a distributed memory system, and stores working data in fragments known as "partitions". This is advantageous when a Spark cluster spans multiple machines, as each machine will only require part of the working data to do its own job. By default, Spark will save each of these data partitions into a individual file to avoid I/O collisions. We want only one output file, so we'll need to fuse all the data into a single partition first. 

Your task: 
1. Coalesce the previous dataframe to one partition using `DataFrame.coalesce(1)`. This returns a 1-partition dataframe. This makes sure that all our results will end up in the same csv file. 
2. Save the 1-partition dataframe to HDFS using the `DataFrame.write.csv(<path>)` method to the root directory of the HDFS, i.e. `hdfs:///<<your-result-file>>.csv`.

#### **Expected output**: None

In [17]:
df_transformed = sortedDF.select("word", "count").limit(100)

df_transformed.show(100, truncate=False)

+----------+------+
|word      |count |
+----------+------+
|great     |195349|
|these     |155186|
|would     |118028|
|sound     |111156|
|quality   |106469|
|works     |102626|
|camera    |101803|
|product   |96848 |
|about     |96704 |
|price     |93010 |
|other     |83774 |
|bought    |79714 |
|which     |74529 |
|there     |71767 |
|after     |70339 |
|really    |66968 |
|because   |63800 |
|headphones|62986 |
|better    |62669 |
|little    |59847 |
|using     |58806 |
|cable     |53488 |
|still     |52574 |
|first     |49388 |
|could     |48790 |
|recommend |43101 |
|player    |42996 |
|problem   |42488 |
|computer  |40801 |
|years     |40236 |
|while     |40054 |
|radio     |39952 |
|speakers  |39497 |
|right     |37779 |
|without   |37735 |
|small     |37447 |
|worked    |37176 |
|around    |37074 |
|another   |36239 |
|thing     |35578 |
|their     |34649 |
|purchased |34581 |
|since     |34526 |
|perfect   |34008 |
|light     |33786 |
|didnt     |33415 |
|enough    |33063 |


In [18]:
df_single_partition = df_transformed.coalesce(1)

df_single_partition.write.mode("overwrite").csv("hdfs:///Demo1-HDFS/100_words.csv", header=True)

The resultant file saved in the step above is actually a folder, which contains individually saved files from each partition of the saved dataframe. <br> <br>
Now, use an HDFS command to show the contents of the resulting folder on HDFS from the last step in the cell below. <br>
You will need to include ‘!’ before the HDFS command for Jupyter Notebook to recognize it as an operating system command. For instance '! pwd' displays the path name of your current directory.

In [19]:
! pwd

/home/jovyan/MGTA466:Session1/Demo1-Local


#### **Expected output**: List of files in the result directory

In [20]:
! hdfs dfs -ls hdfs:///Demo1-HDFS/100_words.csv

Found 2 items
-rw-r--r--   1 jovyan supergroup          0 2025-02-25 12:04 hdfs:///Demo1-HDFS/100_words.csv/_SUCCESS
-rw-r--r--   1 jovyan supergroup       1323 2025-02-25 12:04 hdfs:///Demo1-HDFS/100_words.csv/part-00000-400a25f7-a065-4d2a-951a-6485fd3d08ba-c000.csv


Now, stop the spark session in the cell below
#### **Expected output**: None

In [21]:
! hdfs dfs -ls /

Found 1 items
drwxr-xr-x   - jovyan supergroup          0 2025-02-25 12:04 /Demo1-HDFS


In [23]:
! hdfs dfs -get hdfs:///Demo1-HDFS/100_words.csv/part-00000-400a25f7-a065-4d2a-951a-6485fd3d08ba-c000.csv 100_words.csv

In [24]:
# Stop Spark session

spark.stop()


### 9. Copy the results from HDFS to the local file system - 1 point

Now that we have our results stored in HDFS, we need to copy it back to the local file system to access it. This process may sound cumbersome, but it is a necessary result of Spark and Hadoop's distributed architecture, and their ability to scale up to arbitrarily large datasets and computing operations. 

Copying the results from HDFS to the local file system:
1. Run an hdfs command in the terminal to list the root directory of the HDFS. You should see the CSV file that you have saved. Counterintuitively, this CSV file is actually a folder, which contains individually saved files from each partition of the saved dataframe (see above for data partitioning).  
2. Run another hdfs command to see what's inside the saved folder. Since we made sure to coalesce our dataframe to just one partition, we should expect to find only one saved partition in this folder, saved also as a CSV. Note the name of this file, it should look something like `part-00000-xx.....xx.csv`. 
3. Now copy the resultant CSV file from HDFS to the current folder on your local file system using an hdfs command in the terminal. You may rename this file to something more interpretable - let's say `results.csv`. 
4. We want you to submit a CSV containing the first 101 rows of the results file. To do this, use the command `head -n 101 results.csv > 100_words.csv`. You can also do so manually, since CSV files are in plain text. Remember that we want the first 101 lines which would include the header as well - so basically it is header + 100 rows.

#### **Expected output**: None

### 10. Stop HDFS

#### **Expected output**: "Stopping HDFS ..."

In [25]:
#Stop HDFS
!$HADOOP_HOME/stop-dfs.sh

Stopping HDFS ...


### 11. Submission of `exec_times.csv` containing execution times on different number of cores - 2 point

#### **Expected output**(in exec_times.csv file) - Execution times on 1,2 and 4 cores, 3 trials for each core count, the mean and standard deviation of execution times for each core count. The submission should follow the exact template shown below

**NOTE** - No output is expected in the notebook

After writing all of the expected code before this cell, you should set the cofiguration at the beginning of this Notebook in the cell where this code is present:

```conf = pyspark.SparkConf().setAll([('spark.master', 'local[1]'), ('spark.app.name', 'Word Count')])```


Create a csv file `exec_times.csv` and fill it with the following template:

| Cores | Runtime_1 | Runtime_2 | Runtime_3 | Mean | Std |
--------|-----------|-----------|-----------|------|------
|1| ...| ...| ...| ...| ...|... |
|2| ...| ...| ...| ...| ...|... |
|4| ...| ...| ...| ...| ...|... |




In [None]:
import pandas as pd
exec_times = pd.DataFrame({"Cores": [1, 2, 4],
                          "Runtime_1": [18.97, 17.88, 19.50],
                          "Runtime_2": [20.32, 18.39, 18.92],
                          "Runtime_3": [21.37, 18.43, 19.88],
                          "Mean": [20.22, 18.23, 19.43],
                          "Std": [0.98, 0.25, 0.39]})
exec_times

Unnamed: 0,Cores,Runtime_1,Runtime_2,Runtime_3,Mean,Std
0,1,18.97,20.32,21.37,20.22,0.98
1,2,17.88,18.39,18.43,18.23,0.25
2,4,19.5,18.92,19.88,19.43,0.39


### 12. Submission of `100_words.csv` - 2 points

#### **Expected output**(in the 100_words.csv file) - Top 100 unique words between 5 and 10 characters long (inclusive) and their counts, sorted in descending order.

**NOTE** - No output is expected in the notebook

The csv file should have two columns, `word` and `count`, in the first line and 100 more lines with the top 100 unique words between 5 and 10 characters long (inclusive) and their counts, sorted in descending order of the counts.

### Note on Autograder

The autograder will check whether the results that you submit in the `100_words.csv` file matches **exactly** with the expected results or not.

The csv file would look something like this:




|       word|count|
------------|------
|       great| 100000|
|       these| 95000|
|      would|90000|

... 97 more



The counts are shown for illustration -- Your counts may differ