In [1]:
%%writefile mapper_wiki_parser.py


import sys
import re


def eprint(*args, **kwargs):
    print(*args, file=sys.stderr, **kwargs)

path = 'stop_words_en.txt'

def read_stop_words(file_path):
    return set(word.strip() for word in open(file_path))

stop_words = read_stop_words(path)


for line in sys.stdin:
    try:
        article_id, text = line.strip().split('\t', 1)
    except ValueError as e:
        continue

    words = re.split("\W*\s+\W*", text, flags=re.UNICODE)

    for word in words:
        eprint("reporter:counter:Wiki stats,Total words,%d" % 1)
        if word in stop_words:
            eprint("reporter:counter:Wiki stats,Stop words,%d" % 1)
        print("%s\t%d" % (word.lower(), 1))

Overwriting mapper_wiki_parser.py


In [2]:
%%writefile reducer_sum.py

import sys

current_key = None
word_sum = 0

for line in sys.stdin:
    try:
        key, count = line.strip().split('\t', 1)
        count = int(count)
    except ValueError as e:
        continue
    if current_key != key:
        if current_key:
            print("%s\t%d" % (current_key, word_sum))
        word_sum = 0
        current_key = key
    word_sum += count

if current_key:
    print("%s\t%d" % (current_key, word_sum))

Overwriting reducer_sum.py


In [3]:
%%writefile counter_process.py

#! /usr/bin/env python
import sys


if __name__ == '__main__':
    stop_words_key = sys.argv[1]
    total_words_key = sys.argv[2]
    
    stop_words_count = 0
    total_words_count = 0
 
    for line in sys.stdin:
        line = line.strip()
        if line.startswith(stop_words_key):
            stop_words_count = int(line.split('=')[1].strip())
        if line.startswith(total_words_key):
            total_words_count = int(line.split('=')[1].strip())
    
    print (float(stop_words_count)/total_words_count * 100)

Overwriting counter_process.py


In [4]:
%%bash

OUT_DIR="coursera_mr_task2"
NUM_REDUCERS=4
LOGS="stderr_logs.txt"

hdfs dfs -rm -r -skipTrash ${OUT_DIR} > /dev/null

# Stub code for your job

yarn jar /opt/cloudera/parcels/CDH/lib/hadoop-mapreduce/hadoop-streaming.jar \
    -D mapreduce.job.name="Streaming Word Count" \
    -D mapreduce.job.reduces=${NUM_REDUCERS} \
    -files mapper_wiki_parser.py,reducer_sum.py,/datasets/stop_words_en.txt \
    -mapper "python3 mapper_wiki_parser.py" \
    -combiner "python3 reducer_sum.py" \
    -reducer "python3 reducer_sum.py" \
    -input /data/wiki/en_articles_part \
    -output ${OUT_DIR} > /dev/null 2> $LOGS
    
cat $LOGS | python ./counter_process.py "Stop words" "Total words"
cat $LOGS >&2


38.44036900909957


21/10/27 07:33:25 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
21/10/27 07:33:26 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
21/10/27 07:33:26 INFO mapred.FileInputFormat: Total input files to process : 1
21/10/27 07:33:27 INFO mapreduce.JobSubmitter: number of splits:2
21/10/27 07:33:27 INFO Configuration.deprecation: yarn.resourcemanager.system-metrics-publisher.enabled is deprecated. Instead, use yarn.system-metrics-publisher.enabled
21/10/27 07:33:27 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1635318885577_0004
21/10/27 07:33:28 INFO conf.Configuration: resource-types.xml not found
21/10/27 07:33:28 INFO resource.ResourceUtils: Unable to find 'resource-types.xml'.
21/10/27 07:33:28 INFO resource.ResourceUtils: Adding resource type - name = memory-mb, units = Mi, type = COUNTABLE
21/10/27 07:33:28 INFO resource.ResourceUtils: Adding resource type - name = vcores, units = , type = COUNTABLE
21/10/27 07:33:28 INFO impl.Ya