In [1]:
%%writefile mapper_1.py
#!/usr/bin/env python3

import sys
import re

sw_path = 'stop_words_en.txt'
stop_words = set()
stop_word_file = open(sw_path, 'r')
for stop_word in stop_word_file:
    stop_words.add(stop_word.strip())

for line in sys.stdin:
    try:
        article_id, text = line.rstrip().split('\t', 1)
        text = re.sub("^\W+|\W+$", "", text).lower()
        words = re.split("\W*\s+\W*", text)
    except ValueError as e:
        continue

    filtered_words = [x for x in words if x not in stop_words]
    word_count = len(filtered_words)
    for word in filtered_words:
            print(word + '_' + str(article_id) + '\t' + str(word_count) + '\t' + str(1))


Overwriting mapper_1.py


In [2]:
%%writefile reducer_1.py
#!/usr/bin/env python3

import sys

current_word_current_doc_id = None
current_word_count = 0
current_doc_len = None


for line in sys.stdin:
    try:
        input_word_input_doc_id, input_doc_len, input_word_count = line.strip().split('\t', 2)
        input_word_count = int(input_word_count)
    except ValueError as e:
        continue

    if current_word_current_doc_id == input_word_input_doc_id:
        current_word_count += input_word_count
        current_doc_len = input_doc_len

    else:
        if current_word_current_doc_id is None:
            current_word_current_doc_id = input_word_input_doc_id
            current_word_count = input_word_count
            current_doc_len = input_doc_len

        else:
            current_word, current_doc_id = current_word_current_doc_id.split('_', 1)
            print(current_word + '\t' + current_doc_id + '\t' + current_doc_len + '\t' + str(current_word_count) + '\t' + str(1))

            current_word_current_doc_id = input_word_input_doc_id
            current_word_count = input_word_count
            current_doc_len = input_doc_len

if current_word_current_doc_id:
    current_word, current_doc_id = current_word_current_doc_id.split('_', 1)
    print(current_word + '\t' + current_doc_id + '\t' + current_doc_len + '\t' + str(current_word_count) + '\t' + str(1))


Overwriting reducer_1.py


In [3]:
%%writefile mapper_2.py
#!/usr/bin/env python3

import sys

for line in sys.stdin:
    try:
        print(line)
    except ValueError as e:
        continue


Overwriting mapper_2.py


In [4]:
%%writefile reducer_2.py
#!/usr/bin/env python3

import sys

current_word = None
current_doc_ids = []
current_doc_lens = []
current_word_counts = []
current_docs_with_word = 0


for line in sys.stdin:
    try:
        input_word, input_doc_id, input_doc_len, input_word_count, input_docs_with_word = line.strip().split('\t', 4)
        input_docs_with_word = int(input_docs_with_word)
    except ValueError as e:
        continue

    if current_word == input_word:
        current_doc_ids.append(input_doc_id)
        current_doc_lens.append(input_doc_len)
        current_word_counts.append(input_word_count)
        current_docs_with_word += input_docs_with_word

    else:
        if current_word is None:
            current_word = input_word
            current_doc_ids = [input_doc_id]
            current_doc_lens = [input_doc_len]
            current_word_counts = [input_word_count]
            current_docs_with_word = input_docs_with_word

        else:
            print(current_word + '\t' + ','.join(current_doc_ids) + '\t' + ','.join(current_doc_lens) + '\t'
                  + ','.join(current_word_counts) + '\t' + str(current_docs_with_word))

            current_word = input_word
            current_doc_ids = [input_doc_id]
            current_doc_lens = [input_doc_len]
            current_word_counts = [input_word_count]
            current_docs_with_word = input_docs_with_word

if current_word:
    print(current_word + '\t' + ','.join(current_doc_ids) + '\t' + ','.join(current_doc_lens) + '\t'
          + ','.join(current_word_counts) + '\t' + str(current_docs_with_word))


Overwriting reducer_2.py


In [5]:
%%writefile find_labor_12.py
#!/usr/bin/env python3

import sys
import math

if __name__ == '__main__':

    check_word = 'labor'
    article_id = '12'

    for line in sys.stdin:
        input_word, input_doc_ids, input_doc_lens, input_word_counts, input_docs_with_word = line.strip().split('\t', 4)
        input_doc_ids_arr = input_doc_ids.split(',')
        input_doc_lens_arr = input_doc_lens.split(',')
        input_word_counts_arr = input_word_counts.split(',')

        if input_word == check_word:
            doc_id_index = input_doc_ids_arr.index(article_id)
            doc_len = int(input_doc_lens_arr[doc_id_index])
            word_count = int(input_word_counts_arr[doc_id_index])
            docs_with_word = int(input_docs_with_word)

            tf = word_count / doc_len
            idf = 1 / math.log(1 + docs_with_word)
            tf_idf = tf * idf

            print(tf_idf)


Overwriting find_labor_12.py


In [6]:
%%bash

OUT_DIR_1="week_6_tf_idf_a_"$(date +"%s%6N")
OUT_DIR_2="week_6_tf_idf_b_"$(date +"%s%6N")
NUM_REDUCERS_1=8
NUM_REDUCERS_2=1

# Code for first job
yarn jar /opt/cloudera/parcels/CDH/lib/hadoop-mapreduce/hadoop-streaming.jar \
    -files mapper_1.py,reducer_1.py,/datasets/stop_words_en.txt \
    -mapper 'python3 mapper_1.py' \
    -reducer 'python3 reducer_1.py' \
    -numReduceTasks ${NUM_REDUCERS_1} \
    -input /data/wiki/en_articles_part \
    -output ${OUT_DIR_1} > /dev/null

# Code for second job
yarn jar /opt/cloudera/parcels/CDH/lib/hadoop-mapreduce/hadoop-streaming.jar \
    -files mapper_2.py,reducer_2.py,find_labor_12.py \
    -mapper 'python3 mapper_2.py' \
    -reducer 'python3 reducer_2.py' \
    -numReduceTasks ${NUM_REDUCERS_2} \
    -input ${OUT_DIR_1} \
    -output ${OUT_DIR_2} > /dev/null

# Code for obtaining the results
hdfs dfs -cat ${OUT_DIR_2}/part-00000 | python3 ./find_labor_12.py

hdfs dfs -rm -r -skipTrash ${OUT_DIR_1}* > /dev/null
hdfs dfs -rm -r -skipTrash ${OUT_DIR_2}* > /dev/null

0.00035046896210986204


18/11/15 03:36:28 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
18/11/15 03:36:28 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
18/11/15 03:36:29 INFO mapred.FileInputFormat: Total input files to process : 1
18/11/15 03:36:29 INFO mapreduce.JobSubmitter: number of splits:2
18/11/15 03:36:29 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1542235673601_0003
18/11/15 03:36:29 INFO impl.YarnClientImpl: Submitted application application_1542235673601_0003
18/11/15 03:36:29 INFO mapreduce.Job: The url to track the job: http://81fb89c3c837:8088/proxy/application_1542235673601_0003/
18/11/15 03:36:29 INFO mapreduce.Job: Running job: job_1542235673601_0003
18/11/15 03:36:37 INFO mapreduce.Job: Job job_1542235673601_0003 running in uber mode : false
18/11/15 03:36:37 INFO mapreduce.Job:  map 0% reduce 0%
18/11/15 03:36:54 INFO mapreduce.Job:  map 44% reduce 0%
18/11/15 03:37:00 INFO mapreduce.Job:  map 60% reduce 0%
18/11/15 03:37:04 INFO 