#### Common warnings:

1. __Backup your solution into the 'work' directory inside the home directory ('/home/jovyan'). It is the only one that state will be saved over sessions.__

1. Please, ensure that you call the right interpreter (python2 or python3). Do not write just "python" without the major version. There is no guarantee that any particular version of Python is set as the default one in the Grading system.

1. One cell must contain only one programming language.
E.g. if a cell contains Python code and you also want to call a bash-command (using “!”) in it, you should move the bash to another cell.

1. Our IPython converter is an improved version of the standard converter Nbconvert and it can process most of Jupyter's magic commands correctly (e.g. it understands "%%bash" and executes the cell as a "bash"-script). However, we highly recommend to avoid magics wherever possible.

#### Hints for the YARN tasks:

1. Please, use relative HDFS paths, i.e. dir1/file1 instead of /user/jovyan/dir1/file1. When you submit the code it will be executed on a real Hadoop cluster. For instance, user ‘jovyan’ may not exist there.

1. Hadoop counters’ names should have only small latin letters. One exception: only the first letter of the name can be in upper case.

1. In the Hadoop logs the counter of stop words should be before the counter of total words. For doing this please take into account that the counters are printed in the lexicographical order.

# Hadoop Streaming assignment 2: Stop Words

The purpose of this task is to improve the previous "Word rating" program. You have to calculate how many stop words are there in the input dataset. Stop words list is in `/datasets/stop_words_en.txt` file. 

Use Hadoop counters to compute the number of stop words and total words in the dataset. The result is the percentage of stop words in the entire dataset (without percent symbol).

There are several points for this task:

1) As an output, you have to get the percentage of stop words in the entire dataset without percent symbol (correct answer on sample dataset is `41.603`).

2) As you can see in the Hadoop Streaming userguide "you will need to use `-files` option to tell the framework to pack your executable files as a part of a job submission."

3) Do not forget to redirect junk output to `/dev/null`.

4) You may modify mappers/reducers from "Word rating" task and parse its output to get the answer on "Stop Words" task.

5) You may use mapper/reducer to get `"Stop Words"` and `"Total Words"` amounts and redirect them to sys.stderr. After that you may redirect the output of MapReduce to the parsed function. In this function you may find rows correspond to these amounts and compute the percentage.

Here you can find the draft for the main steps of the task. You can use other methods to get the solution.

## Step 1. Create the mapper.

<b>Hint:</b> Create the mapper, which calculates Total word and Stop word amounts. You may redirect this information to sys.stderr. This will make it possible to parse these data on the next steps.

Example of the redirections (see the "eprint" definition in the cell below):

`eprint("reporter:counter:Wiki stats,Total words,%d" % count)`

Remember about the Distributed cache. If we add option `-files mapper.py,reducer.py,/datasets/stop_words_en.txt`, then `mapper.py, reducer.py` and `stop_words_en.txt` file will be in the same directory on the datanodes. Hence, it is necessary to use a relative path `stop_words_en.txt` from the mapper to access this txt file.

In [1]:
%%writefile mapper_wiki_parser.py

import sys
import re


def eprint(*args, **kwargs):
    print(*args, file=sys.stderr, **kwargs)

path = 'stop_words_en.txt'

stop_words=[]
s_w_count=0
t_w_count=0

with open(path) as f1:
    for l in f1:
        stop_words.append(l.strip())      

for line in sys.stdin:
    try:
        article_id, text = line.strip().split('\t', 1)
    except ValueError as e:
        continue

    words = re.split("\W*\s+\W*", text, flags=re.UNICODE)
    
    for word in words: 
        t_w_count+=1
        if word in stop_words:
            s_w_count+=1
               
eprint("reporter:counter:Wiki stats,Total words,%d" % t_w_count)
eprint("reporter:counter:Wiki stats,Stop words,%d" % s_w_count)

Overwriting mapper_wiki_parser.py


## Step 2. Create the reducer.

Create the reducer, which will accumulate the information after the mapper step. You may implement the combiner if you want. It can be useful from optimizing and speed up your computations (see the lectures from the Week 2 for more details).

## Step 3. Create the parsing function.

<b>Hint:</b> Create the function, which will parse MapReduce sys.stderr for Total word and Stop word amounts.

The `./counter_process.py` script should do the following:

- parse hadoop logs from Stderr,

- retrieve values of 2 user-defined counters,

- compute percentage and output it into the stdout.

In [2]:
%%writefile counter_process.py

#! /usr/bin/env python
import sys
import re

def search_in_log(sw,tw):
    sw_obj=re.compile(rf"^(.*)({sw})=(.*)$")
    tw_obj=re.compile(rf"^(.*)({tw})=(.*)$")

#sw_obj=re.compile(r"^(.*)(%s)(=)(.*)$"%sw)
#  tw_obj=re.compile(rf"^(.*)(%s)(=)(.*)$"%tw)

    
    logs = sys.stdin.readlines()
    for l in logs:
        if sw_obj.search(l):
            sw_count=int(sw_obj.search(l).group(3))
        elif tw_obj.search(l.strip()):
            tw_count=int(tw_obj.search(l).group(3))
    print((sw_count/tw_count)*100)



if __name__ == '__main__':
    search_in_log(sys.argv[1],sys.argv[2])

Overwriting counter_process.py


## Step 4. Bash commands

<b> Hints: </b> 

1) If you want to redirect standard output to txt file you may use the following argument in yarn jar:

```
yarn ... \
  ... \
  -output ${OUT_DIR} > /dev/null 2> $LOGS
```

2) For printing the percentage of stop words in the entire dataset you may parse the MapReduce output. Parsed script may be written in Python code. 

To get the result you may use the UNIX pipe operator `|`. The output of the first command acts as an input to the second command (see lecture file-content-exploration-2 for more details).

With this operator you may use command `cat` to redirect the output of MapReduce to ./counter_process.py with arguments, which correspond to the `"Stop words"` and `"Total words"` counters. Example is the following:

`cat $LOGS | python ./counter_process.py "Stop words" "Total words"`

Now something about Hadoop counters naming. 
 - Built-in Hadoop counters usually have UPPER_CASE names. To make the grading system possible to distinguish your custom counters and system ones please use the following pattern for their naming: `[Aa]aaa...` (all except the first letters should be in lowercase);
 - Another points is how Hadoop sorts the counters. It sorts them lexicographically. Grading system reads your first counter as Stop words counter and the second as Total words. Please name you counters in such way that Hadoop set the Stop words counter before the Total words. 
 
E.g. "Stop words" and "Total words" names are Ok because they correspond both requirements.

3) In Python code sys.argv is a list, which contains the command-line arguments passed to the script. The name of the script is in `sys.argv[0]`. Other arguments begin from `sys.argv[1]`.

Hence, if you have two arguments, which you send from the Bash to your python script, you may use arguments in your script with the following command:

`function(sys.argv[1], sys.argv[2])`

4) Do not forget about printing your MapReduce output in the last cell. You may use the next command:

`cat $LOGS >&2`

__NB__: Please, use a defined python major version (e.g. `python3 mappper.py` instead of `python mapper.py`)!

Only the answer to your task should be printed in the output stream (__stdout__) in the last cell. There should be no more output in this stream. In order to get rid of garbage [junk lines] (e.g. created by `hdfs dfs -rm` or `yarn` commands) redirect the output to /dev/null.

#### Final notice:

1. Please take into account that you must __not__ redirect __stderr__ to anywhere. Hadoop, Hive, and Spark print their logs to stderr and the Grading system also reads and analyses it.

1. During checking the code from the notebook, the system runs all notebook's cells and reads the output of only the last filled cell. It is clear that any exception should not be thrown in the running cells. If you decide to write some text in a cell, you should change the style of the cell to Markdown (Cell -> Cell type -> Markdown).

1. The Grader takes into account the output from the sample dataset you have in the notebook. Therefore, you have to "Run All" cells in the notebook before you send the ipynb solution.

1. The name of the notebook must contain only Roman letters, numbers and characters “-” or “_”. For example, Windows adds something like " (2)" (with the leading space) at the end of a filename if you try to download a file with the same name. This is a problem, because you will have a space character and curly braces "(" and ")". 

In [3]:
%%bash

OUT_DIR="coursera_mr_task2"
NUM_REDUCERS=0
LOGS="stderr_logs.txt"

hdfs dfs -rm -r -skipTrash ${OUT_DIR} > /dev/null

# Stub code for your job

yarn jar /opt/cloudera/parcels/CDH/lib/hadoop-mapreduce/hadoop-streaming.jar  \
    -D mapreduce.job.name="Streaming wordCount" \
    -D mapreduce.job.reduces=${NUM_REDUCERS} \
    -files mapper_wiki_parser.py,/datasets/stop_words_en.txt\
    -mapper "python3 mapper_wiki_parser.py" \
    -input /data/wiki/en_articles_part \
    -output ${OUT_DIR} > /dev/null 2> $LOGS
    
cat $LOGS | python3 ./counter_process.py "Stop words" "Total words"
cat $LOGS >&2
    

38.44036900909957


21/01/06 18:59:43 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
21/01/06 18:59:44 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
21/01/06 18:59:45 INFO mapred.FileInputFormat: Total input files to process : 1
21/01/06 18:59:46 INFO mapreduce.JobSubmitter: number of splits:2
21/01/06 18:59:46 INFO Configuration.deprecation: yarn.resourcemanager.system-metrics-publisher.enabled is deprecated. Instead, use yarn.system-metrics-publisher.enabled
21/01/06 18:59:46 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1609958315846_0008
21/01/06 18:59:46 INFO conf.Configuration: resource-types.xml not found
21/01/06 18:59:46 INFO resource.ResourceUtils: Unable to find 'resource-types.xml'.
21/01/06 18:59:46 INFO resource.ResourceUtils: Adding resource type - name = memory-mb, units = Mi, type = COUNTABLE
21/01/06 18:59:46 INFO resource.ResourceUtils: Adding resource type - name = vcores, units = , type = COUNTABLE
21/01/06 18:59:46 INFO impl.Ya