## Hadoop Streaming assignment 3: Name Count

Make WordCount program for all the names in the dataset. Name is a word with the following properties:

The first character is not a digit (other characters can be digits).
The first character is uppercase, all the other characters that are letters are lowercase.
There are less than 0.5% occurrences of this word, when this word regardless to its case appears in the dataset and the condition (2) is not met.
Order by quantity, most popular first, output format:

<pre><code>
name < tab > count
</code></pre>
The result is the 5th line in the output.

The result on the sample dataset:
<pre><code>
french 5742
</code></pre>

In [1]:
%%writefile mapper1.py

import sys
import re
for line in sys.stdin:
    article_id, text = line.strip().split('\t',1)
    try:
        words = re.split('\W*\s+\W*', text.strip())
        for word in words:
            cond1 = word[0].isalpha()
            cond2 = word[0].isupper() and word[1:].islower()
            
            if cond1:
                print("{}\t{:d}\t{:d}".format(word.lower(), 1, int(cond2)))
    except Exception as e:
        continue

Writing mapper1.py


In [2]:
%%writefile reducer1.py

import sys

current_key = None
word_total = 0
name_total = 0

for line in sys.stdin:
    try:
        key, word_count, name_count = line.strip().split('\t',2)
        word_count = int(word_count)
        name_count = int(name_count)
        if current_key != key:
            if current_key:
                print("{}\t{:d}\t{:d}".format(current_key, name_total, word_total))
                
            current_key = key
            word_total = 0
            name_total = 0
        
        word_total += word_count
        name_total += name_count
    except Exception as e:
        print(e)
        continue 
if current_key:
    print("{}\t{:d}\t{:d}".format(current_key, name_total, word_total))

Writing reducer1.py


In [3]:
%%writefile mapper2.py

import sys

current_key = None
word_total = 0
name_total = 0

for line in sys.stdin:
    try:
        key, name_count, word_count = line.strip().split('\t', 2)
        word_count = int(word_count)
        name_count = int(name_count)
        
        if current_key != key:
            if current_key:
                print("{}\t{}\t{}".format(name_total, word_total, current_key))
                
            current_key = key
            word_total = 0
            name_total = 0
        
        word_total += word_count
        name_total += name_count
        
    except ValueError as e:
        print(e)
        continue    

if current_key:
    print("{}\t{}\t{}".format(name_total, word_total, current_key))

Writing mapper2.py


In [4]:
%%writefile reducer2.py

import sys

total_count = 0
total_caps = 0
current_word = None

for line in sys.stdin:
    try:
        caps_count, count, key = line.strip().split('\t', 2)
        count = int(count)
        caps_count = int(caps_count)
        
        if key != current_word:
            
            if current_word and float(total_caps) / float(total_count) >= 0.995: 
                print("{}\t{:d}".format(current_word, total_caps))
            
            total_count = 0
            total_caps = 0
            current_word = key


        total_caps += caps_count
        total_count += count

    except Exception as e:
        print e
        continue
        
if current_word and float(total_caps) / float(total_count) >= 0.995: 
    print("{}\t{:d}".format(current_word, total_caps))

Writing reducer2.py


In [5]:
%%bash

OUT_DIR_1="assignment3_1_"$(date +"%s%6N")
OUT_DIR_2="assignment3_2_"$(date +"%s%6N")
NUM_REDUCERS=4

hdfs dfs -rm -r -skipTrash ${OUT_DIR_1}* > /dev/null
yarn jar /opt/cloudera/parcels/CDH/lib/hadoop-mapreduce/hadoop-streaming.jar \
    -files mapper1.py,reducer1.py \
    -mapper 'python2 mapper1.py' \
    -reducer 'python2 reducer1.py' \
    -numReduceTasks ${NUM_REDUCERS} \
    -input /data/wiki/en_articles_part \
    -output ${OUT_DIR_1} > /dev/null

hdfs dfs -rm -r -skipTrash ${OUT_DIR_2}* > /dev/null
yarn jar /opt/cloudera/parcels/CDH/lib/hadoop-mapreduce/hadoop-streaming.jar \
    -D mapreduce.job.output.key.comparator.class=org.apache.hadoop.mapreduce.lib.partition.KeyFieldBasedComparator \
    -D mapreduce.partition.keycomparator.options="-k1,3nr" \
    -files mapper2.py,reducer2.py \
    -mapper 'python2 mapper2.py' \
    -reducer 'python2 reducer2.py' \
    -numReduceTasks 1 \
    -input ${OUT_DIR_1} \
    -output ${OUT_DIR_2} > /dev/null

hdfs dfs -cat ${OUT_DIR_2}/part-00000 | sed -n "5p;8q"

french	5740


rm: `assignment3_1_1615982252928508*': No such file or directory
21/03/17 11:57:35 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
21/03/17 11:57:35 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
21/03/17 11:57:35 INFO mapred.FileInputFormat: Total input files to process : 1
21/03/17 11:57:35 INFO mapreduce.JobSubmitter: number of splits:2
21/03/17 11:57:35 INFO Configuration.deprecation: yarn.resourcemanager.system-metrics-publisher.enabled is deprecated. Instead, use yarn.system-metrics-publisher.enabled
21/03/17 11:57:36 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1615980409049_0007
21/03/17 11:57:36 INFO conf.Configuration: resource-types.xml not found
21/03/17 11:57:36 INFO resource.ResourceUtils: Unable to find 'resource-types.xml'.
21/03/17 11:57:36 INFO resource.ResourceUtils: Adding resource type - name = memory-mb, units = Mi, type = COUNTABLE
21/03/17 11:57:36 INFO resource.ResourceUtils: Adding resource type - name = 