# Word Count Using Map Reduce

We have a very big set of news articles and we want to find the top 10 used words not including stop words. We will use the dataset from sklearn

In [18]:
from sklearn.datasets import fetch_20newsgroups
newsgroups_test = fetch_20newsgroups(subset='train',
                                     remove=('headers', 'footers', 'quotes'))
data = newsgroups_test.data

For each text in the dataset, we want to tokenize it, clean it, remove stop words and finally count the words:

In [19]:
from collections import Counter
import re
from sklearn.feature_extraction import text

def clean_word(word):
    return re.sub(r'[^\w\s]','',word).lower()

def word_not_in_stopwords(word):
    return word not in text.ENGLISH_STOP_WORDS and word and word.isalpha()
    
    
def find_top_words(data):
    cnt = Counter()
    for text in data:
        tokens_in_text = text.split()
        tokens_in_text = map(clean_word, tokens_in_text)
        tokens_in_text = filter(word_not_in_stopwords, tokens_in_text)
        cnt.update(tokens_in_text)
        
    return cnt.most_common(10)


## Without Map Reduce
Let’s see how much time does it take without MapReduce:

In [20]:
%time find_top_words(data)

CPU times: user 2.79 s, sys: 181 µs, total: 2.79 s
Wall time: 2.79 s


[('x', 5086),
 ('people', 3996),
 ('like', 3882),
 ('dont', 3861),
 ('just', 3743),
 ('know', 3477),
 ('maxaxaxaxaxaxaxaxaxaxaxaxaxaxax', 3307),
 ('use', 3158),
 ('think', 2995),
 ('time', 2799)]


## With Map Reduce

Now, let’s write our mapper, reducer and chunk_mapper:

In [26]:
def chunkIt(seq, num):
    avg = len(seq) / float(num)
    out = []
    last = 0.0

    while last < len(seq):
        out.append(seq[int(last):int(last + avg)])
        last += avg

    return out

def mapper(text):
    tokens_in_text = text.split()
    tokens_in_text = map(clean_word, tokens_in_text)
    tokens_in_text = filter(word_not_in_stopwords, tokens_in_text)
    return Counter(tokens_in_text)

def reducer(cnt1, cnt2):
    cnt1.update(cnt2)
    return cnt1

def chunk_mapper(chunk):
    mapped = map(mapper, chunk)
    reduced = reduce(reducer, mapped)
    return reduced

#### What does the function do ?

The mapper gets a text, splits it into tokens, cleans them and filters stop words and non-words, finally, it counts the words within this single text document. The reducer function gets 2 counters and merges them. The chunk_mapper gets a chunk and does a MapReduce on it. Now let’s run using the framework we built it and see:

In [29]:
%%time
from multiprocessing import Pool
from functools import reduce

pool = Pool(6)

data_chunks = chunkIt(data, num=6)#step 1:

mapped = pool.map(chunk_mapper, data_chunks)#step 2:

reduced = reduce(reducer, mapped)

print(reduced.most_common(10))

[('x', 5086), ('people', 3996), ('like', 3882), ('dont', 3861), ('just', 3743), ('know', 3477), ('maxaxaxaxaxaxaxaxaxaxaxaxaxaxax', 3307), ('use', 3158), ('think', 2995), ('time', 2799)]
CPU times: user 147 ms, sys: 36.8 ms, total: 184 ms
Wall time: 1.77 s
