# Hadoop Streaming assignment 4: Word Groups

Calculate statistics for groups of words which are equal up to permutations of letters. For example, ‘emit’, ‘item’ and ‘time’ are the same words up to a permutation of letters. Determine such groups of words and sum all their counts. Apply stop words filter. Filter out groups that consist of only one word.

Output: count of occurrences for the group of words, number of unique words in the group, comma-separated list of the words in the group in lexicographical order:

*sum <tab> group size <tab> word1,word2,...*

Example: assume ‘emit’ occurred 3 times, 'item' -- 2 times, 'time' -- 5 times; 3 + 2 + 5 = 10, group contains 3 words, so for this group result is:

*10 3 emit,item,time*

The result of the task is the output line with word ‘english’.

In [4]:
%%writefile word_groups_mapper.py

import sys
import re
from collections import Counter

reload(sys)
sys.setdefaultencoding('utf-8')

for line in sys.stdin:
    try:
        article_id, text = unicode(line.strip()).split('\t', 1)
    except ValueError as e:
        continue
    words = re.split("\W*\s+\W*", text, flags=re.UNICODE)
    counts = Counter(words)
    for word, count in counts.items():
        word = word.lower()
        print "%s\t%s\t%d" % (''.join(sorted(word)), word, count)

Overwriting word_groups_mapper.py


In [5]:
%%writefile word_groups_reducer.py

import sys

current_key = None
word_sum = 0
words_of_gruop = list()

for line in sys.stdin:
    try:
        group_key, key, count = line.strip().split('\t', 2)
        count = int(count)
    except ValueError as e:
        continue
    if current_key != group_key:
        if current_key and len(words_of_group) > 1:
            print "%d\t%d\t%s" % (word_sum, len(words_of_group), ','.join(words_of_group))
        word_sum = 0
        current_key = group_key
        words_of_group = list()
    word_sum += count
    if key not in words_of_group:
        words_of_group.append(key)

if current_key and len(words_of_group) > 1:
    print "%d\t%d\t%s" % (word_sum, len(words_of_group), ','.join(words_of_group))

Overwriting word_groups_reducer.py


In [6]:
%%bash

OUT_DIR="wordgroups_result_"$(date +"%s%6N")
NUM_REDUCERS=8

hdfs dfs -rm -r -skipTrash ${OUT_DIR} > /dev/null

yarn jar /opt/cloudera/parcels/CDH/lib/hadoop-mapreduce/hadoop-streaming.jar \
    -D mapred.jab.name="Streaming wordGroups" \
    -D mapreduce.job.reduces=${NUM_REDUCERS} \
    -files word_groups_mapper.py,word_groups_reducer.py \
    -mapper "python word_groups_mapper.py" \
    -reducer "python word_groups_reducer.py" \
    -input /data/wiki/en_articles_part \
    -output ${OUT_DIR} > /dev/null
    
hdfs dfs -cat ${OUT_DIR}/* |  grep -P '(\t|,)english(,|$)'

7820	5	english,helsing,hesling,shingle,shengli


rm: `wordgroups_result_1533209071395753': No such file or directory
18/08/02 11:24:34 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
18/08/02 11:24:34 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
18/08/02 11:24:35 INFO mapred.FileInputFormat: Total input files to process : 1
18/08/02 11:24:35 INFO mapreduce.JobSubmitter: number of splits:2
18/08/02 11:24:35 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1533143790063_0010
18/08/02 11:24:35 INFO impl.YarnClientImpl: Submitted application application_1533143790063_0010
18/08/02 11:24:35 INFO mapreduce.Job: The url to track the job: http://6b428e5a5e59:8088/proxy/application_1533143790063_0010/
18/08/02 11:24:35 INFO mapreduce.Job: Running job: job_1533143790063_0010
18/08/02 11:24:40 INFO mapreduce.Job: Job job_1533143790063_0010 running in uber mode : false
18/08/02 11:24:40 INFO mapreduce.Job:  map 0% reduce 0%
18/08/02 11:24:56 INFO mapreduce.Job:  map 63% reduce 0%
18/08/02 11: