### Mongo As Queue


In [1]:
import pymongo
from redis import Redis

redis_connection = Redis(host='this_redis')
mongo_client = pymongo.MongoClient('this_mongo')

In [2]:
mongo_client.database_names()

['admin', 'local']

In [3]:
import pandas as pd

In [4]:
twentyktweets = pd.read_pickle('../data/twentyktweets.p')

In [5]:
mongo_client.twitter.tweets.insert_many(twentyktweets.to_dict('records'))

<pymongo.results.InsertManyResult at 0x7f9da5486088>

### Parallel MapReduce

In [2]:
cd ..

/home/jovyan


In [3]:
from redis import Redis 
from rq import Queue
from lib.worker import remove_punctuation, mapper, reducer, toggle_hold, check_hold
import time
redis_connection = Redis('this_redis')
job_queue = Queue(connection=redis_connection)

Here is our previous implementation of the `word_count` function.

    def word_count(documents, redis_connection, word_list='word_list'):

        counts = []

        for document in documents:
            mapper(document, redis_connection, word_list)

        word = redis_connection.spop(word_list)
        while word:
            word = word.decode()
            count = reducer(word, redis_connection)
            counts.append((word, count))
            word = redis_connection.spop(word_list)

        return counts
        
There are two candidates for parallelization in this function:

1. the `for` loop can be parallelized as tokenization of one document is completely independent of the tokenization of another.
1. the `while` loop can be parallelized as the counting of tokens for one word is completely independent of the counting of another.

There was one tricky aspect to this parallelization, however. We cannot work on the word counts until all of the tokenization is complete.

In [5]:
def word_count(documents, word_list='word_list', count_list='count_list'):
    
    for document in documents:
        job = job_queue.enqueue(mapper, document, 'word_list')   
        
    word = redis_connection.spop('word_list')x
    job_queue.enqueue(reducer, word, 'count_list', depends_on=job)
    
    while word:
        word = word.decode()
        job_queue.enqueue(reducer, word, 'count_list')
        word = redis_connection.spop('word_list')
        


In [6]:
word_count(documents)

In [8]:
job.status

'finished'

In [4]:
def map_documents(documents):
    for document in documents:
        job = job_queue.enqueue(mapper, document, 'word_list')
    return job

In [6]:
job = map_documents(documents)
job.status

'queued'

In [7]:
job.status

'finished'

In [8]:
def reduce_words():

    word = redis_connection.spop('word_list')
    while word:
        word = word.decode()
        job = job_queue.enqueue(reducer, word, 'count_list')
        word = redis_connection.spop('word_list')
    
    return job

In [9]:
job = reduce_words()

In [10]:
job.status

'queued'

In [14]:
def collect_counts():
    
    counts = []
    count = redis_connection.lpop('count_list')
    while count:
        counts.append(count)
        count = redis_connection.lpop('count_list')
    return counts

In [15]:
collect_counts()

[b"('information', 2)",
 b"('selects', 1)",
 b"('The', 6)",
 b"('a', 8)",
 b"('variables', 3)",
 b"('it', 1)",
 b"('tool', 1)",
 b"('be', 2)",
 b"('presents', 1)",
 b"('clear', 1)",
 b"('This', 2)",
 b"('model', 3)",
 b"('testing', 2)",
 b"('variable', 1)",
 b"('eliminate', 1)",
 b"('most', 1)",
 b"('solve', 1)",
 b"('DRP', 2)",
 b"('of', 5)",
 b"('experimental', 1)",
 b"('allows', 1)",
 b"('techniques', 1)",
 b"('to', 9)",
 b"('kernel', 3)",
 b"('kernelPCA', 2)",
 b"('more', 1)",
 b"('improving', 1)",
 b"('market', 1)",
 b"('In', 1)",
 b"('this', 1)",
 b"('However', 1)",
 b"('is', 5)",
 b"('capture', 1)",
 b"('PCA', 3)",
 b"('sliding', 1)",
 b"('these', 1)",
 b"('than', 1)",
 b"('proposed', 1)",
 b"('research', 1)",
 b"('dimension', 1)",
 b"('profits', 1)",
 b"('applied', 2)",
 b"('use', 2)",
 b"('but', 1)",
 b"('approach', 1)",
 b"('strategies', 1)",
 b"('application', 2)",
 b"('very', 1)",
 b"('can', 1)",
 b"('formed', 1)",
 b"('transformation', 1)",
 b"('small', 1)",
 b"('the', 5)"