# Hadoop Streaming assignment 1: Words Rating

The purpose of this task is to create your own WordCount program for Wikipedia dump processing and learn basic concepts of the MapReduce.

In this task you have to find the 7th word by popularity and its quantity in the reverse order (most popular first) in Wikipedia data (`/data/wiki/en_articles_part`).

There are several points for this task:

1) As an output, you have to get the 7th word and its quantity separated by a tab character.

2) You must use the second job to obtain a totally ordered result.

3) Do not forget to redirect all trash and output to /dev/null.

Here you can find the draft of the task main steps. You can use other methods for solution obtaining.

## Step 1. Create mapper and reducer.

<b>Hint:</b>  Demo task contains almost all the necessary pieces to complete this assignment. You may use the demo to implement the first MapReduce Job.

In [1]:
%%writefile mapper1.py

import sys
import re

reload(sys)
sys.setdefaultencoding('utf-8') # required to convert to unicode

for line in sys.stdin:
    try:
        article_id, text = unicode(line.strip()).split('\t', 1)
    except ValueError as e:
        continue
    words = re.split("\W*\s+\W*", text, flags=re.UNICODE)
    for word in words:
#         print >> sys.stderr, "reporter:counter:Wiki stats,Total words,%d" % 1
        print "%s\t%d" % (word.lower(), 1)

Overwriting mapper1.py


In [2]:
%%writefile reducer1.py

import sys

current_key = None
word_sum = 0

for line in sys.stdin:
    try:
        key, count = line.strip().split('\t', 1)
        count = int(count)
    except ValueError as e:
        continue
    if current_key != key:
        if current_key:
            print "%s\t%d" % (current_key, word_sum)
        word_sum = 0
        current_key = key
    word_sum += count

if current_key:
    print "%s\t%d" % (current_key, word_sum)

Overwriting reducer1.py


## Step 2. Create sort job.

<b>Hint:</b> You may use MapReduce comparator to solve this step. Make sure that the keys are sorted in ascending order.

In [26]:
%%writefile mapper2.py

import sys
import re

reload(sys)
sys.setdefaultencoding('utf-8') # required to convert to unicode

for line in sys.stdin:
    try:
        word, count = line.strip().split('\t', 1)
        count = int(count)
    except ValueError as e:
        continue
        
    print "%d\t\t%s" % (count, word)

Overwriting mapper2.py


In [27]:
%%writefile reducer2.py

import sys

for line in sys.stdin:
    try:
        count, word = line.strip().split('\t', 1)
        count = int(count)
    except ValueError as e:
        continue
        
    print "%s\t\t%d" % (word, count)

Overwriting reducer2.py


## Step 3. Bash commands

<b> Hint: </b> For printing the exact row you may use basic UNIX commands. For instance, sed/head/tail/... (if you know other commands, you can use them).

To run both jobs, you must use two consecutive yarn-commands. Remember that the input for the second job is the ouput for the first job.

In [17]:
%%bash

# OUT_DIR="wordcount_result_"$(date +"%s%6N")
# NUM_REDUCERS=8

# hdfs dfs -rm -r -skipTrash ${OUT_DIR} > /dev/null

# yarn jar /opt/cloudera/parcels/CDH/lib/hadoop-mapreduce/hadoop-streaming.jar \
#     -D mapred.jab.name="Streaming wordCount" \
#     -D mapreduce.job.reduces=${NUM_REDUCERS} \
#     -files mapper1.py,reducer1.py \
#     -mapper "python mapper1.py" \
#     -combiner "python reducer1.py" \
#     -reducer "python reducer1.py" \
#     -input /data/wiki/en_articles_part \
#     -output ${OUT_DIR} > /dev/null


# OUT_DIR_2="wordcount_result_"$(date +"%s%6N")
# NUM_REDUCERS=1

# hdfs dfs -rm -r -skipTrash ${OUT_DIR_2} > /dev/null

# yarn jar /opt/cloudera/parcels/CDH/lib/hadoop-mapreduce/hadoop-streaming.jar \
#     -D mapred.jab.name="Streaming wordCount Rating" \
#     -D mapreduce.job.output.key.comparator.class=org.apache.hadoop.mapreduce.lib.partition.KeyFieldBasedComparator \
#     -D map.output.key.field.separator=\t \
#     -D mapreduce.partition.keycomparator.options=-k1,1nr \
#     -D mapreduce.job.reduces=${NUM_REDUCERS} \
#     -files mapper2.py,reducer2.py \
#     -mapper "python mapper2.py" \
#     -reducer "python reducer2.py" \
#     -input ${OUT_DIR} \
#     -output ${OUT_DIR_2} > /dev/null

# hdfs dfs -cat ${OUT_DIR_2}/part-00000 | head -7 | tail -1

In [18]:
%%bash

# OUT_DIR="wordcount_result_"$(date +"%s%6N")
# NUM_REDUCERS=16

# hdfs dfs -rm -r -skipTrash ${OUT_DIR} > /dev/null

# yarn jar /opt/cloudera/parcels/CDH/lib/hadoop-mapreduce/hadoop-streaming.jar \
#     -D mapred.jab.name="Streaming wordCount" \
#     -D mapreduce.job.reduces=${NUM_REDUCERS} \
#     -files mapper1.py,reducer1.py \
#     -mapper "python mapper1.py" \
#     -combiner -"python reducer1.py" \
#     -reducer "python reducer1.py" \
#     -input /data/wiki/en_articles_part \
#     -output ${OUT_DIR} > /dev/null

# hdfs dfs -get ${OUT_DIR} /home/jovyan/output

In [19]:
%%bash

# cd all_files/
# pwd
# ls -ltr
# wc -l /home/jovyan/output/wordcount_result_1548923867416270/part-000*
# # cat /home/jovyan/output/wordcount_result_1548923867416270/part-000* > /home/jovyan/all_files/bulk.txt
# wc -l /home/jovyan/all_files/bulk.txt

In [20]:
# cat /home/jovyan/all_files/bulk.txt | /usr/bin/python ./mapper2.py | sort | /usr/bin/python ./reducer2.py | head

In [21]:
# cat /home/jovyan/all_files/bulk.txt | head

In [22]:
%%bash

#hdfs dfs -ls /test
#hdfs dfs -rm -r /test/articles-part
#hdfs dfs -cp /data/wiki/en_articles_part/articles-part /test
#hdfs dfs -get /test/articles-part #| head -n 1 > /articles-part-short
#pwd
#mkdir /test
#cat articles-part | head -n 1 > articles-part-short
#cat articles-part-short
#ls -ltr
#hdfs dfs -put /home/jovyan/articles-part-short /test/articles-part-short


In [28]:
%%bash

OUT_DIR="wordcount_result_1"
NUM_REDUCERS=16

hdfs dfs -rm -r -skipTrash ${OUT_DIR} > /dev/null

yarn jar /opt/cloudera/parcels/CDH/lib/hadoop-mapreduce/hadoop-streaming.jar \
    -D mapred.jab.name="Streaming wordCount" \
    -D mapreduce.job.reduces=${NUM_REDUCERS} \
    -files mapper1.py,reducer1.py \
    -mapper "python mapper1.py" \
    -combiner "python reducer1.py" \
    -reducer "python reducer1.py" \
    -input /test/articles-part-short \
    -output ${OUT_DIR} > /dev/null


OUT_DIR_2="wordcount_result_2"
NUM_REDUCERS=1

hdfs dfs -rm -r -skipTrash ${OUT_DIR_2} > /dev/null

yarn jar /opt/cloudera/parcels/CDH/lib/hadoop-mapreduce/hadoop-streaming.jar \
    -D mapred.jab.name="Streaming wordCount Rating" \
    -D mapreduce.job.output.key.comparator.class=org.apache.hadoop.mapreduce.lib.partition.KeyFieldBasedComparator \
    -D map.output.key.field.separator=\t\t \
    -D mapreduce.partition.keycomparator.options=-k1,1nr \
    -D mapreduce.job.reduces=${NUM_REDUCERS} \
    -files mapper2.py,reducer2.py \
    -mapper "python mapper2.py" \
    -reducer "python reducer2.py" \
    -input ${OUT_DIR} \
    -output ${OUT_DIR_2} > /dev/null

hdfs dfs -cat ${OUT_DIR_2}/part-00000 | head -7 | tail -1

	anarchist		151


19/01/31 14:19:55 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
19/01/31 14:19:55 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
19/01/31 14:19:56 INFO mapred.FileInputFormat: Total input files to process : 1
19/01/31 14:19:56 INFO mapreduce.JobSubmitter: number of splits:2
19/01/31 14:19:56 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1548876590404_0050
19/01/31 14:19:56 INFO impl.YarnClientImpl: Submitted application application_1548876590404_0050
19/01/31 14:19:56 INFO mapreduce.Job: The url to track the job: http://c5770aa3f9ef:8088/proxy/application_1548876590404_0050/
19/01/31 14:19:56 INFO mapreduce.Job: Running job: job_1548876590404_0050
19/01/31 14:20:02 INFO mapreduce.Job: Job job_1548876590404_0050 running in uber mode : false
19/01/31 14:20:02 INFO mapreduce.Job:  map 0% reduce 0%
19/01/31 14:20:08 INFO mapreduce.Job:  map 100% reduce 0%
19/01/31 14:20:15 INFO mapreduce.Job:  map 100% reduce 6%
19/01/31 14:20:18 INF

In [29]:
!hdfs dfs -cat wordcount_result_2/part-00000

	the		748
	of		447
	and		368
	in		291
	a		198
	to		176
	anarchist		151
	anarchism		149
	as		128
	was		93
	by		91
	that		73
	is		71
	anarchists		61
	with		58
	an		57
	for		56
	from		48
	on		44
	movement		41
	which		39
	international		38
	free		34
	it		34
	or		34
	first		34
	were		33
	workers		31
	revolution		30
	social		29
	state		28
	his		28
	individualist		26
	thought		26
	be		25
	this		24
	such		24
	libertarian		24
	not		24
	individual		24
	anarchy		23
	during		23
	at		22
	revolutionary		22
	its		21
	society		21
	are		21
	century		20
	their		20
	federation		20
	education		20
	also		20
	have		19
	he		19
	but		19
	spanish		19
	press		19
	some		19
	political		19
	they		18
	war		18
	ideas		18
	french		17
	movements		17
	united		17
	most		17
	within		16
	american		16
	schools		16
	p		16
	form		16
	de		16
	proudhon		15
	many		15
	may		15
	love		15
	other		15
	between		15
	had		14
	communism		14
	who		14
	new		