## Assignment 2: TF-IDF
In this task Hadoop Streaming is used to process Wikipedia articles dump.

Dataset location: /data/wiki/en_articles_part

Stop words list is in ‘/datasets/stop_words_en.txt’ file.

Format: article_id <tab> article_text

To parse the articles don’t forget about Unicode (even though this is an English Wikipedia dump, there are many characters from other languages), remove punctuation marks and transform words to lowercase to get the correct quantities. To cope with Unicode we recommend to use the following tokenizer:

```python
#!/usr/bin/env python

import sys
import re

reload(sys)
sys.setdefaultencoding('utf-8')

for line in sys.stdin:
    try:
        article_id, text = unicode(line.strip()).split('\t', 1)
    except ValueError as e:
        continue
    text = re.sub("^\W+|\W+$", "", text, flags=re.UNICODE)
    words = re.split("\W*\s+\W*", text, flags=re.UNICODE)
    ...
```
Calculate tf*idf for each pair (word, article) from the Wikipedia dump. Apply the stop words filter to speed up calculations. Term frequency (tf) is a function depending on a term (word) and a document (article):

tf(term, doc_id) = Nt/N,

where Nt - quantity of particular term in the document, N - the total number of terms in the document (without stop words)

Inverse document frequency (idf) is a function depends on a term:

idf(term) = 1/log(1 + Dt),

where Dt - number of documents in the dataset with the particular term.

You can find more information here: https://en.wikipedia.xn--org/wiki/Tfidf-q82h but use just the formulas mentioned above.

Output: tf*idf for term=’labor’ and article_id=12

*Hint: all Wikipedia article_ids are greater than 0. So you can use a dummy article_id=0 to calculate the number of documents with each term.*

In [4]:
%%writefile mapper.py
from __future__ import division
import sys
import re
from collections import Counter

reload(sys)
sys.setdefaultencoding('utf-8') # required to convert to unicode

with open('stop_words_en.txt') as f:
    stop_words = set(f.read().split())

for line in sys.stdin:
    try:
        article_id, text = unicode(line.strip()).split('\t', 1)
        article_id = int(article_id)
    except ValueError as e:
        continue
    words = re.split("\W*\s+\W*", text, flags=re.UNICODE)
    words = [x.lower() for x in words if x.lower() not in stop_words]
    words_set = set(words)
    
    num_of_words_in_doc = len(words)
    counts = Counter(words)
    
    for word in words_set:
        num_of_word_in_doc = counts[word]
        tf = num_of_word_in_doc / num_of_words_in_doc
        print "%s\t%d\t%f" % (word, article_id, tf)

Overwriting mapper.py


In [2]:
%%writefile reducer.py
from __future__ import division
import sys
from math import log

current_word = None
articles_dict = dict()

for line in sys.stdin:
    try:
        word, article_id, tf = line.strip().split('\t')
        article_id = int(article_id)
        tf = float(tf)
    except ValueError as e:
        continue
    
    if current_word != word:
        if current_word:
            idf = 1 / log(1 + len(articles_dict))
            for key_article_id, tf in articles_dict.iteritems():
                tfidf = tf * idf
                print "%s\t%d\t%f" % (current_word, key_article_id, tfidf)
        articles_dict = dict()
        current_word = word
    articles_dict[article_id] = tf

if current_word:
    print "%s\t%d\t%f" % (current_word, article_id, tfidf)

Writing reducer.py


In [5]:
!hdfs dfs -cat /data/wiki/en_articles_part/articles-part | head -2 > test_file.txt
!cp /datasets/stop_words_en.txt stop_words_en.txt
!cat test_file.txt | python2 mapper.py | sort -k1,1 | python2 reducer.py

cat: Unable to write to output stream.
0-00-634011-3	12	0.000237
0.02	25	0.000237
0.5	25	0.000237
0.6	25	0.000237
0.7	25	0.000237
0-7546-6196-2	12	0.000237
0-8490-1397-6	12	0.000237
0-85345-175-3	12	0.000237
0.9	25	0.000237
1	12	0.000237
10	25	0.000661
10	12	0.000149
1000	25	0.000237
1,000	25	0.000237
100,000	12	0.000237
10–15	25	0.000237
11	25	0.000237
111-113	12	0.000237
119	12	0.000237
12	25	0.000237
1–2	25	0.000237
126	12	0.000237
12-year-old	25	0.000237
14	25	0.000441
14	12	0.000149
15	25	0.000661
15	12	0.000149
1-55164-250-6	12	0.000237
1-55164-251-4	12	0.000237
1.58	12	0.000237
15q13.3	25	0.000237
16	25	0.000237
1-60486-064-2	12	0.000237
1642	12	0.000237
16p13.1	25	0.000237
17	25	0.000237
1747	25	0.000237
179	12	0.000237
1793	12	0.000237
1794	12	0.000237
1796	12	0.000237
1798	25	0.000237
17p12	25	0.000237
17th	12	0.000237
18	25	0.000441
18	12	0.000149
1817–1862	12	0.000237
183	12	0.000237
18–30	25	0.000237
1833	12	0.000237
1842	12	0.000237
1844	12	0.000237
1848	12	0.000237
1849–

In [7]:
%%bash

OUT_DIR="tf_idf"
NUM_REDUCERS=8

hdfs dfs -rm -r -skipTrash ${OUT_DIR} > /dev/null

yarn jar /opt/cloudera/parcels/CDH/lib/hadoop-mapreduce/hadoop-streaming.jar \
    -D mapred.jab.name="Streaming tf_idf" \
    -D mapreduce.job.reduces=${NUM_REDUCERS} \
    -files mapper.py,reducer.py,/datasets/stop_words_en.txt \
    -mapper "python mapper.py" \
    -reducer "python reducer.py" \
    -input /data/wiki/en_articles_part \
    -output ${OUT_DIR} > /dev/null

hdfs dfs -cat tf_idf/* | grep -P 'labor\t12\t' | cut -f3

0%however	6970	0.003838
0&\mathrm{if	7583	0.003838
0(8)320-1234	2061	0.003838
0)).(1	7143	0.003838
0,03	5188	0.003838
0,1,...,n	1635	0.003838
0,1,0	6026	0.003838
0,1,\dots,n	4594	0.003838
0,5	3107	0.003838
0,50	4845	0.003838


18/04/16 09:19:23 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
18/04/16 09:19:23 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
18/04/16 09:19:24 INFO mapred.FileInputFormat: Total input files to process : 1
18/04/16 09:19:25 INFO mapreduce.JobSubmitter: number of splits:2
18/04/16 09:19:25 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1523870263342_0002
18/04/16 09:19:25 INFO impl.YarnClientImpl: Submitted application application_1523870263342_0002
18/04/16 09:19:25 INFO mapreduce.Job: The url to track the job: http://9c5ea2b8efd5:8088/proxy/application_1523870263342_0002/
18/04/16 09:19:25 INFO mapreduce.Job: Running job: job_1523870263342_0002
18/04/16 09:19:32 INFO mapreduce.Job: Job job_1523870263342_0002 running in uber mode : false
18/04/16 09:19:32 INFO mapreduce.Job:  map 0% reduce 0%
18/04/16 09:19:50 INFO mapreduce.Job:  map 62% reduce 0%
18/04/16 09:19:55 INFO mapreduce.Job:  map 80% reduce 0%
18/04/16 09:19:56 INFO 