# Hadoop Streaming assignment 4: Word Groups

Calculate statistics for groups of words which are equal up to permutations of letters. For example, ‘emit’, ‘item’ and ‘time’ are the same words up to a permutation of letters. Determine such groups of words and sum all their counts. Apply stop words filter. Filter out groups that consist of only one word.

Output: count of occurrences for the group of words, number of unique words in the group, comma-separated list of the words in the group in lexicographical order:

    sum <tab> group size <tab> word1,word2,...

Example: assume ‘emit’ occurred 3 times, 'item' -- 2 times, 'time' -- 5 times; 3 + 2 + 5 = 10, group contains 3 words, so for this group result is:

    10 3 emit,item,time

The result of the task is the output line with word ‘english’.

The result on the sample dataset:

    
`7823    eghilns 5   english,helsing,hesling,shengli,shingle`
    
NB: Do not forget about the lexicographical order of words in the group: 'emit,item,time' is OK, 'emit,time,item' is not.

In [2]:
%config IPCompleter.greedy=True

## Step 1. Create the 1st mapper and reducer

In [13]:
%%writefile mapper1.py

import sys
import re

reload(sys)
sys.setdefaultencoding('utf-8')  # required to convert to unicode

path = 'stop_words_en.txt'

# Your code for reading stop words here
with open(path, "r") as f:
    stop_words = f.read().split('\n')

for line in sys.stdin:
    try:
        article_id, text = unicode(line.strip()).split('\t', 1)
    except ValueError as e:
        continue

    words = re.split("\W*\s+\W*", text, flags=re.UNICODE)
    words = [word for word in words if (word not in stop_words) and word.isalpha()]
    # Your code for mapper here.
    for word in words:
        print >> sys.stderr, "reporter:counter:Wiki stats,Total words,%d" % 1
        print "%s\t%d" % (word.lower(), 1)


Overwriting mapper1.py


In [11]:
%%writefile reducer1.py

# Your code for reducer here.
import sys

current_key = None
total_words = 0

for line in sys.stdin:
    try:
        key, count = line.strip().split('\t', 1)
        count = int(count)
    except ValueError as e:
        continue
    if key != current_key:
        if current_key:
            print("{0}\t{1}".format(current_key, total_words))
        total_words = 0
        current_key = key
    total_words += count
if current_key:
    print("{0}\t{1}".format(current_key, total_words))

Overwriting reducer1.py


## Step 2. Create the 2nd mapper and reducer

In [7]:
%%writefile mapper2.py

import sys

current_key = None
sorted_key = None
total_words = 0

for line in sys.stdin:
    try:
        key, count = line.strip().split('\t', 1)
        count = int(count)
    except ValueError as e:
        continue
    
    if key != current_key:
        if current_key:
            print("{0}\t{1}\t{2}".format(sorted_key, current_key, total_words))
        total_words = 0
        current_key = key
        # .join(['a','e','l','p','p'])
        sorted_key = "".join(sorted(current_key))
    total_words += count
    
if current_key:
    print("{0}\t{1}\t{2}".format(sorted_key, current_key, total_words))        

Writing mapper2.py


In [8]:
%%writefile reducer2.py

import sys

word_set = ()
current_key = None
total_words = 0

for line in sys.stdin:
    try:
        sorted_key, key, count = line.strip().split('\t', 2)
        count = int(count)
    except ValueError as e:
        continue
    if current_key != sorted_key:
        if current_key:
            #7823    eghilns 5   english,helsing,hesling,shengli,shingle
            print("{0}\t{1}\t{2}\t{3}".format(total_words, current_key, len(word_set), ",".join(sorted(word_set))))
            
        total_words = 0
        current_key = sorted_key
        word_set = ()
        
    word_set.add(key)
    total_words += count
    
if current_key:
    print("{0}\t{1}\t{2}\t{3}".format(total_words, current_key, len(word_set), ",".join(sorted(word_set))))

Writing reducer2.py


## Step 3. Bash command

Hint: For printing the exact row you may use basic UNIX commands. For instance, sed/head/tail/... (if you know other commands, you can use them).

To run both jobs, you must use two consecutive yarn-commands. Remember that the input for the second job is the ouput for the first job.

In [9]:
%%bash

OUT_DIR_1="assignment4_1_"$(date +"%s%6N")
OUT_DIR_2="assignment4_2_"$(date +"%s%6N")
NUM_REDUCERS=4

# Code for your first job
yarn jar /opt/cloudera/parcels/CDH/lib/hadoop-mapreduce/hadoop-streaming.jar \
    -files mapper1.py,reducer1.py,/datasets/stop_words_en.txt \
    -mapper 'python mapper1.py' \
    -combiner 'python reducer1.py' \
    -reducer 'python reducer1.py' \
    -numReduceTasks ${NUM_REDUCERS} \
    -input /data/wiki/en_articles_part \
    -output ${OUT_DIR_1} > /dev/null


# Code for your second job
yarn jar /opt/cloudera/parcels/CDH/lib/hadoop-mapreduce/hadoop-streaming.jar \
    -files mapper2.py,reducer2.py \
    -mapper 'python mapper2.py' \
    -reducer 'python reducer2.py' \
    -numReduceTasks 1 \
    -input ${OUT_DIR_1} \
    -output ${OUT_DIR_2} > /dev/null

# Code for obtaining the results
hdfs dfs -cat ${OUT_DIR_2}/part-00000 | grep "english,"

hdfs dfs -rm -r -skipTrash ${OUT_DIR_1}* > /dev/null
hdfs dfs -rm -r -skipTrash ${OUT_DIR_2}* > /dev/null

19/05/21 04:57:19 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
19/05/21 04:57:19 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
19/05/21 04:57:21 INFO mapred.FileInputFormat: Total input files to process : 1
19/05/21 04:57:21 INFO mapreduce.JobSubmitter: number of splits:2
19/05/21 04:57:21 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1558306547568_0001
19/05/21 04:57:22 INFO impl.YarnClientImpl: Submitted application application_1558306547568_0001
19/05/21 04:57:22 INFO mapreduce.Job: The url to track the job: http://994ded922800:8088/proxy/application_1558306547568_0001/
19/05/21 04:57:22 INFO mapreduce.Job: Running job: job_1558306547568_0001
19/05/21 04:57:29 INFO mapreduce.Job: Job job_1558306547568_0001 running in uber mode : false
19/05/21 04:57:29 INFO mapreduce.Job:  map 0% reduce 0%
19/05/21 04:57:45 INFO mapreduce.Job:  map 3% reduce 0%
19/05/21 04:57:51 INFO mapreduce.Job:  map 4% reduce 0%
19/05/21 04:57:57 INFO ma