In [1]:
#!/usr/bin/env python3
#
# word_count.py
# author: Jeremy Dobbins-Bucklad <jeremy.a.db@gmail.com>
# 
# Description:
#     word_count returns the tallies the number of appearances for each word in a 
# given document. Additionally, it outputs the number of times each word appears
# per sentence. 
#     It is built on the MapReduce paradigm. A Tokenizer divides the document into 
# into distinct sentences, and pushes (sentence, sentence #) tuples onto  a thread-safe queue. The 
# Mappers pull sentences from the queue, and convert them into streams of 
# (word, sentence #) tuples. Reducers then aggregate the output from Mappers, producing
# output of `(word, total count, sentence #'s per appearance). 
#     Mapper output is collected in a process-safe hash table. Output is partitioned
# between buckets with a modulo by the number of Reducers, providing a rough distribution
# of labor. Reducer output is ultimately placed into a min-Heap, which allows
# for a simple dumping of values to produce an alphabetical list.

__Questions__
* How do I share queues between Mappers and Reducers?

In [2]:
from multiprocessing import (
    cpu_count,
    log_to_stderr,
    Manager,
    Pool, 
    Process, 
    Queue
)
from collections import defaultdict
import itertools
import logging
import re
import os
import string
import sys

import nltk.data

N_WORKERS = cpu_count() + 1
# LOGGER = log_to_stderr()
# LOGGER.setLevel(logging.INFO)

def setup():    
    nltk_data_parent_dir = os.path.expanduser('~')
    if 'nltk_data' not in os.listdir(nltk_data_parent_dir):
        import nltk
        nltk.download('punkt')

setup()

class Tokenizer:
    """Tokenizers convert file streams into streams of delimited values.
    
    :ivar str filename: Name of file to read from.
    """
    
    def __init__(self, filename, delimiter='.'):
        self.filename = filename
        try:
            english_tokens = os.path.join(nltk_data_parent_dir,
                                         'tokenizers/punkt/english.pickle')
            self.tokenizer = nltk.data.load(english_tokens)
        except:
            logger.exception('Failed to load tokenizer data from %s', 
                             english_tokens)
            raise
        
    def tokenize(self):
        # This is dangerous if the file is very large. If memory was a bottlneck,
        # we could read in bytes until we hit the delimiter, then pass that to 
        # the Mapper's input queue.
        with open(self.filename, 'r') as data:
            return self.tokenizer.tokenize(data)
        
    def tokenize_text(text):
        return self.tokenizer.tokenize(data)

Running 3.4.0 (default, Apr 11 2014, 13:05:11) 
[GCC 4.8.2]


In [4]:
test_paragraph = """As Mr. Smith walked towards the edge of the cliff, 
he recalled what his father had said to him. "Boy,", his father had started, 
"I must tell you this next thing before I go." But Mr. Smith had never 
gotten to hear what his father had to say, as at that moment his father
had fallen over the side of the cliff. The very same cliff, Mr. Smith mused,
that he himself was walking towards this very moment. How Ms. Smith would
laugh, he thought to himself, if she were reading an account of his present
actions.
"""
test_sentences = [
    'As Mr. Smith walked towards the edge of the cliff, he recalled what his father had said to him.', 
    '"Boy,", his father had started, "I must tell you this next thing before I go."',
    'But Mr. Smith had never gotten to hear what his father had to say, as at that moment his father had fallen over the side of the cliff.',
    'The very same cliff, Mr. Smith mused, that he himself was walking towards this very moment.', 
    'How Ms. Smith would laugh, he thought to himself, if she were reading an account of his present actions.'
]

In [5]:
def count_words(in_queue, out_queues):
    while not in_queue.empty():
        idx, sentence = in_queue.get()
        words = sentence.split()
        for word in words:
            word = word.strip(string.punctuation).lower()
            # Hash modulo between queues
            out_queues[hash(word) % N_WORKERS].put((word, idx))
        
            
def tally_words(in_queue, out_queue):
    word_table = defaultdict(list)
    for word, idx in iter(in_queue.get, 'STOP'):
        word_table[word].append(idx)
    # Export counts into output queue
    for word, counts in word_table.items():
        out_queue.put((word, len(counts), counts))
        
        
def heapify_words(in_queue, heap):
    for data in iter(in_queue.get, 'STOP'):
        heappush(heap, data)
        
        
def map_work(map_input, map_outputs):
    # Parallel: Map tasks
    mappers = []
    for i in range(N_WORKERS):
        p = Process(target=count_words, args=(map_input, map_outputs))
        mappers.append(p)
        p.start()
    return mappers


def reduce_work(map_outputs, reduce_output):
    # Parallel: Aggregate words with sentence numbers
    reducers = []
    for i in range(N_WORKERS):
        p = Process(target=tally_words, args=(map_outputs[i], reduce_output))
        reducers.append(p)
        p.start()
    return reducers

def sort_work(reduce_output):
    # Parallel: Sort reduce output
    with Manager() as manager:
        l = manager.list()
        sorter = Process(target=heapify_words, args=(reduce_output, l))
        sorter.start()
        return sorter

In [6]:
from pprint import pprint
def test_mapping():
    map_input, map_outputs = Queue(), [Queue() for n in range(N_WORKERS)]
    # Serial: Fill our input queue
    for i, sentence in enumerate(test_sentences):
        map_input.put((i, sentence))
        
    mappers = map_work(map_input, map_outputs)
    [m.join() for m in mappers]
    for q in map_outputs:
        assert q.qsize() > 0
    return map_outputs

In [8]:
def test_reducing():
    reduce_output = Queue()
    map_outputs = test_mapping()
    reducers = reduce_work(map_outputs, reduce_output)
    [r.join() for r in reducers]
    return reduce_output
    
q = test_reducing()
tallies = [q.get(block=False) for x in range(q.qsize())]
pprint(tallies)

KeyboardInterrupt: 

In [None]:
def main():
    """
    Tokenizers parse text into sentences.
    Mappers turn sentences into (word, sentence #) tuples
    Reducers tally the number of words by sentence
    """
    map_input, map_outputs = Queue(), [Queue() for n in range(N_WORKERS)]
    reduce_output = Queue()
    
    # Serial: Fill our input queue
    for i, sentence in enumerate(test_sentences):
        map_input.put((i, sentence))
        
    mappers = map_work(map_input, map_outputs)
    #reducers = reduce_work(map_outputs, reduce_output)
    #sorter = sort_work(reduce_output)
    
    
    # Cleanup
    #[m.join() for m in mappers]
    # Send shutdown signal to reducers
    #[q.put('STOP') for q in map_output]
    # Wait for reducers
    #[r.join() for r in reducers]
    #reduce_output.put('STOP')
    # Shutdown sorter
    #sorter.join()    
    
    # Display
    #[print(heappop(l) for l in range(len(l)))]

In [84]:
main()

[INFO/Process-39] incref failed: [Errno 2] No such file or directory
[INFO/Process-39] incref failed: [Errno 2] No such file or directory
[INFO/Process-39] incref failed: [Errno 2] No such file or directory
[INFO/Process-39] incref failed: [Errno 2] No such file or directory
[INFO/Process-39] incref failed: [Errno 2] No such file or directory
[INFO/Process-39] incref failed: [Errno 2] No such file or directory
[INFO/Process-39] child process calling self.run()
[INFO/Process-39] child process calling self.run()
[INFO/Process-39] child process calling self.run()
[INFO/Process-39] child process calling self.run()
[INFO/Process-39] child process calling self.run()
[INFO/Process-39] child process calling self.run()
[INFO/Process-39] process shutting down
[INFO/Process-39] process shutting down
[INFO/Process-39] process shutting down
[INFO/Process-39] process shutting down
[INFO/Process-39] process shutting down
[INFO/Process-39] process shutting down
[INFO/Process-39] process exiting with e

FileNotFoundError: [Errno 2] No such file or directory