# Homework #10: MapReduce

__DUE:__ 11:59am April 18th

__HOW TO SUBMIT:__ __We are moving to a new submission system that is powered by git__ 

We are no longer going to use [WebSubmit](https://www.cs.duke.edu/csed/websubmit/). We have created a SUBMIT repository for almost every one on [coursework gitlab](https://coursework.cs.duke.edu). Make sure you have access to CS216-s2018-stu/your_firstname-your_lastname-SUBMIT; otherwise please contact zjmiao@cs.duke.edu as soon as possible. 

_If this is the first time you are submitting a hw or lab assignment:_

Fire up your VM, open a terminal, type in the following commands to initialize your SUBMIT repository:
```shell
cd ~
git clone git@coursework.cs.duke.edu:CS216-s2018-stu/<Firstname>-<Lastname>-SUBMIT.git SUBMIT
cd SUBMIT
touch README.md
git add README.md
git commit -m "add README"
git push -u origin master
```
Replace `<Firstname>` and `<Lastname>` with your name as it appears on the coursework gitlab page. If this is successful, you should see a new folder SUBMIT in your home directory. 

_How to submit an assignment:_ 

Create a new folder `hw10` directly inside your SUBMIT directory. 
```shell
cd ~/SUBMIT
mkdir hw10
```

And copy all required files (from your local `working` directory, see __WHAT TO SUBMIT__ under each problem below) into it. The sample folder structure should look like this:
```shell
/SUBMIT
/SUBMIT/hw10
/SUBMIT/hw10/hw10.ipynb
```    

Finally, when your are ready to submit your homework, run the commands below:
```shell
cd ~/SUBMIT
git add hw10
git commit -m "updated hw10"
git push
```
You can replace "updated hw10" with any meaningful message you want.

If your submission is successful, you will see your files in your repository on gitlab website.

You can submit multiple times, but __we will only grade the files under /hw10 directory in your LATEST submission__.

#     0. Getting Started

__WHAT TO SUBMIT:__ Nothing is required for this part.

To get ready for this assignment, open up a VM shell, and type the following command:

```shell
cd ~/CS216-s2018-READONLY/
git pull
```

Next, type the following commands to create a working directory for this homework. Here we use `hw10` under your `shared/` directory, but feel free to change it to another location.

```shell
cd ~
cp -r ~/CS216-s2018-READONLY/assignments/hw10 ~/working/
cd ~/working/hw10
```
    
Next, run the following commands to **install and set up Spark environment**.

```shell
./installSpark.sh
```

# 1. Finding the top 5 Twitter hashtags locally

In this exercise, you will write a MapReduce program in Python to find the 5 most popular hashtags from (1) a small file containing approximately 1,000 tweets. (2) a medium file containing approximately 100,000 tweets. We strongly recommend that you debug first running on the small file before running on the medium file. You can run the program on both input files locally.

**Hint**: Your approach should work on much bigger datasets since your code should also work for large dataset in part 2. Keep in mind the tips from class about potential problems when computing on data in parallel.

### `PySpark` Library and an Example

The major Python package that you'll use is `PySpark`, which allows people to program Spark with Python. You're highly recommended to read its official programming guide (https://spark.apache.org/docs/latest/rdd-programming-guide.html#resilient-distributed-datasets-rdds) to know how to use its APIs. And we have provided an example program `example_word_count.py`, which takes in a text file, counts the number of times that each word occurs in the file, and outputs the five most frequent words in it. You could open the file to read it in detail.

The following cell installs the Python `pyspark` package first.

In [1]:
!pip --no-cache-dir install pyspark --user



The following cell runs the example word count program on the file `tweets_sm.txt`. Right now you don't need to know what the command is exactly doing, just run it to get a sense of how it works. From the output you can see the 5 most frequent words and their counts in the given file are:

```
a 82
I'm 73
I 71
to 69
the 68
```

The outputs of Spark include its running logs, possible error messages, and real outputs of our program. By specifying `2>log` in the following command, only `stdout` will be printed in the notebook, and running logs will be redirected to the file called `log`. If you encounter an error, you can go and check the log file.

In [None]:
!spark-submit --master local[4] example_word_count_local.py tweets_sm.txt 2>log

### Run a Python Spark Program

* The usual way to run a Python Spark program locally with shell commands:

Assume your Python Spark program is stored in a file called `hashtag_count.py`.

In the shell, type the following commands:

```shell
cd ~/working/hw10
spark-submit --master local[4] hashtag_count.py tweets_sm.txt
spark-submit --master local[4] hashtag_count.py tweets_med.txt
```

The first command is to go to the working directory where all your program files (.py) and input files (.txt) exist.

The second command is to run the Spark program on the small input. 

The third command is to run the Spark program on the medium input. 

4 is number of cores. You can modify it to suit your needs.

* In our homework, we have put the Python code and the shell `spark-submit` command into the notebook. You can just follow the notebook instructions to modify the Python code in the notebook, and then run the Spark application in the notebook too!

**WHAT TO SUBMIT**: `hw10.ipynb` notebook where you've modified the actual code cell, and run the two `spark-submit` cells and kept their outputs.

### The Actual Coding Part

With the first line `%%writefile hashtag_count.py`, everytime you run the following cell, its content will be written to a file called `hashtag_count.py`. If the file name already exists, its contents will be overwritten silently. So DON'T modify the first line in the cell otherwise you won't be able to save code to file. 

We will need to run the `hashtag_count.py` file for the Spark application. So please make sure the output file (i.e., the code in the following cell) has valid Python file syntax.

In [4]:
%%writefile hashtag_count.py
from __future__ import print_function

import re
import sys
import heapq
import operator
from operator import add
from pyspark import SparkContext

reload(sys)
sys.setdefaultencoding('utf8')

if __name__ == "__main__":
    
    if len(sys.argv) != 2:
        print("Usage: topK_local <file>", file=sys.stderr)
        exit(-1)

    # Initialize the spark context.
    sc = SparkContext("local", "PythonHashTag")

    # You can modify partition number according to your need
    numPartitions = 3

    # make a new RDD from the input text
    lines = sc.textFile(sys.argv[1], numPartitions)

    # [Map stage] split lines to words
    words = lines.flatMap(lambda lines: lines.split(' '))
    # lambda x: f(x) ... means for every x run the function f(x)

    # [Map stage] filter words to only get hashtags
    # FIX THIS LINE
    hashtags = words.filter(lambda word: len(word) > 0 and word[0]=='#')

    # [Map stage] turn word to (word, 1) keyvalue pair
    hashtagsNum = hashtags.map(lambda word: (word, 1))

    # [Reduce stage] add up values for each distinct key
    hashtagsCount = hashtagsNum.reduceByKey(add)
    
    # YOUR CODE HERE
    # You can have multiple MapReduce stages.
    # You will find mapPartitions() helpful
    
    reversedhashtagsCount = hashtagsCount.map(lambda (hashtag, count): (count, hashtag))
    sortedReversedhashtagsCount = reversedhashtagsCount.sortByKey(ascending=False)
    sortedhashtagsCount = sortedReversedhashtagsCount.map(lambda (count, hashtag): (hashtag, count))
    
    for (hashtag, count) in sortedhashtagsCount.take(5):
        print(hashtag, count)

    sc.stop()


Overwriting hashtag_count.py


The following cell executes a shell command:
```shell
spark-submit --master local[4] hashtag_count.py tweets_sm.txt
```
inside Jupyter notebook.

This shell command is to run the Python code locally in Spark, with `tweets_sm.txt` as the input to the Python program. 4 is number of cores and you can modify it to suit your needs. You'll see its output printed in the notebook.

In [5]:
!spark-submit --master local[4] hashtag_count.py tweets_sm.txt 2>log

#OmSpikTanya 6
#NP 2
#20cosesulmiocompagnodibanco 2
#ReMix 2
#CheerUp 1


Similarly the following cell is to run the same Python program in Spark, but with `tweets_med.txt` as input to the program.

In [6]:
!spark-submit --master local[4] hashtag_count.py tweets_med.txt 2>log

#OmSpikTanya 140
# 137
#Team 97
#GGMU 76
#TeamFollowBack 51


# 2. Finding the top 50 Twitter hashtags on cloud

This part is optional. You will get extra credit for this part.

In this part, you will run your code in Part1 on cloud to find the **50** most popular hashtags from a large file containing approximately 3.5 million tweets (a couple hours in the twitterverse).

The small tweets file (for testing) is stored at: **s3://cs216-spring2015/twitter/tweets_sm.txt**

The large tweets file is stored at: **s3://cs216-spring2015/twitter/tweets-full/* **

We strongly recommend that you debug first locally before running on the large dataset on cloud. 

You can make a copy of the your local code:

```shell
cp hashtag_count.py hashtag_count_aws.py
```

and edit `hashtag_count_aws.py` to make it work on cloud.

To run the Spark program on cloud, please study the instructions in `~/working/hw10/spark-tutorial-aws.pdf`. 

To know how to modify the local program, how to use S3 files, please study the example file `example_word_count_aws.py`. You could copy some configuration code from `example_word_count_aws.py` to your `hashtag_count_aws.py`, but please remember to change `MASTER_ADDRESS`, `AWS ACCESS KEY` and `AWS SECRET ACCESS KEY`.
    
**WHAT TO SUBMIT**: `hw10.ipynb` with your answers pasted in the following cell.

Paste your answers (50 top hashtags) in the following cell.