### Delayed Job Processing

A common software development pattern for parallelization using Python is **delayed job processing**. This pattern is particularly useful for software that is designed to work on a multiple server architecture. Under this pattern a primary or **scheduler** process will schedule a job to be executed by a **worker** process at some later time. This pattern is used internally in high-performance/parallel libraries like Spark or Tensorflow, does the specifics of the implementation are abstracted away from us, and we need only worry about designing a processing graph.

Here, we implement a scheduler/worker pattern manually using the `rq` (Redis queue) Python library. This library is typically used in web applications to trigger large jobs to be executed at a later time. For our purposes here it will help us to design a truly parallel implementation of MapReduce using Python.

### Test Data

The use the following list of strings as our "documents". This is to say that each string is a document that will be mapped.

In [1]:
documents = ["This paper presents a kernel-based principal component analysis, kernel PCA, to extract critical features for improving the performance of a stock trading model. ",
"The feature extraction method is one of the techniques to solve dimensionality reduction problems.",
"The kernel PCA is a feature extraction approach which has been applied to data transformation from known variables to capture critical information.",
"The kernel PCA is a kernel-based data mapping tool that has characteristics of both principal component analysis and non-linear mapping.",
"The feature selection method is another DRP technique that selects only a small set of features from known variables, but these features still indicate possible collinearity problems that fail to reflect clear information.",
"However, most feature extraction methods use a variable mapping application to eliminate noisy and collinear variables. In this research, we use the kernel-PCA method in a stock trading model to transform stock technical indices which allows features of smaller dimension to be formed.",
"The kernel-PCA method has been applied to various stocks and sliding window testing methods using both half-year and 1-year testing strategies. The experimental results show that the proposed method generates more profits than other DRP methods on the America stock market.",
"This stock trading model is very practical for real-world application, and it can be implemented in a real-time environment."]

### Parallel MapReduce

In [2]:
cd ..

/home/jovyan


In [3]:
from redis import Redis 
from rq import Queue
from lib.worker import remove_punctuation, mapper, reducer, toggle_hold, check_hold
import time
redis_connection = Redis('this_redis')
job_queue = Queue(connection=redis_connection)

Here is our previous implementation of the `word_count` function.

    def word_count(documents, redis_connection, word_list='word_list'):

        counts = []

        for document in documents:
            mapper(document, redis_connection, word_list)

        word = redis_connection.spop(word_list)
        while word:
            word = word.decode()
            count = reducer(word, redis_connection)
            counts.append((word, count))
            word = redis_connection.spop(word_list)

        return counts
        
There are two candidates for parallelization in this function:

1. the `for` loop can be parallelized as tokenization of one document is completely independent of the tokenization of another.
1. the `while` loop can be parallelized as the counting of tokens for one word is completely independent of the counting of another.

There was one tricky aspect to this parallelization, however. We cannot work on the word counts until all of the tokenization is complete.

In [5]:
# def word_count(documents, word_list='word_list', count_list='count_list'):
    
#     for document in documents:
#         job = job_queue.enqueue(mapper, document, 'word_list')   
        
#     word = redis_connection.spop('word_list')x
#     job_queue.enqueue(reducer, word, 'count_list', depends_on=job)
    
#     while word:
#         word = word.decode()
#         job_queue.enqueue(reducer, word, 'count_list')
#         word = redis_connection.spop('word_list')
        


In [6]:
# word_count(documents)

In [8]:
# job.status

'finished'

In [4]:
def map_documents(documents):
    for document in documents:
        job = job_queue.enqueue(mapper, document, 'word_list')
    return job

In [6]:
job = map_documents(documents)
job.status

'queued'

In [7]:
job.status

'finished'

In [8]:
def reduce_words():

    word = redis_connection.spop('word_list')
    while word:
        word = word.decode()
        job = job_queue.enqueue(reducer, word, 'count_list')
        word = redis_connection.spop('word_list')
    
    return job

In [9]:
job = reduce_words()

In [10]:
job.status

'queued'

In [14]:
def collect_counts():
    
    counts = []
    count = redis_connection.lpop('count_list')
    while count:
        counts.append(count)
        count = redis_connection.lpop('count_list')
    return counts

In [15]:
collect_counts()

[b"('information', 2)",
 b"('selects', 1)",
 b"('The', 6)",
 b"('a', 8)",
 b"('variables', 3)",
 b"('it', 1)",
 b"('tool', 1)",
 b"('be', 2)",
 b"('presents', 1)",
 b"('clear', 1)",
 b"('This', 2)",
 b"('model', 3)",
 b"('testing', 2)",
 b"('variable', 1)",
 b"('eliminate', 1)",
 b"('most', 1)",
 b"('solve', 1)",
 b"('DRP', 2)",
 b"('of', 5)",
 b"('experimental', 1)",
 b"('allows', 1)",
 b"('techniques', 1)",
 b"('to', 9)",
 b"('kernel', 3)",
 b"('kernelPCA', 2)",
 b"('more', 1)",
 b"('improving', 1)",
 b"('market', 1)",
 b"('In', 1)",
 b"('this', 1)",
 b"('However', 1)",
 b"('is', 5)",
 b"('capture', 1)",
 b"('PCA', 3)",
 b"('sliding', 1)",
 b"('these', 1)",
 b"('than', 1)",
 b"('proposed', 1)",
 b"('research', 1)",
 b"('dimension', 1)",
 b"('profits', 1)",
 b"('applied', 2)",
 b"('use', 2)",
 b"('but', 1)",
 b"('approach', 1)",
 b"('strategies', 1)",
 b"('application', 2)",
 b"('very', 1)",
 b"('can', 1)",
 b"('formed', 1)",
 b"('transformation', 1)",
 b"('small', 1)",
 b"('the', 5)"