# Parallelising tasks

When working with large simulation it's common to have tasks that take long to run but that are not dependent on each other. Such tasks could be easily split into partial tasks (chunks), run in different processes, and finally glued in a summarise process.

Take for example the over used case of counting words in a text.

In [1]:
# Text extracted from https://en.wikipedia.org/wiki/MapReduce

text = "mapreduce is a programming model and an associated implementation for processing and generating bigdata sets with a parallel distributed algorithm on a cluster a mapreduce program is composed of a map procedure which performs filtering and sorting such as sorting students by first name into queues one queue for each name and a reduce method which performs a summary operation such as counting the number of students in each queue yielding name frequencies the mapreduce system also called infrastructure or framework orchestrates the processing by marshalling the distributed servers running the various tasks in parallel managing all communications and data transfers between the various parts of the system and providing for redundancy and fault tolerance the model is a specialization of the splitapplycombine strategy for data analysis it is inspired by the map and reduce functions commonly used in functional programming although their purpose in the mapreduce framework is not the same as in their original forms the key contributions of the mapreduce framework are not the actual map and reduce functions which for example resemble the message passing interface standard reduce and scatter operations but the scalability and faulttolerance achieved for a variety of applications by optimizing the execution engine as such a singlethreaded implementation of mapreduce is usually not faster than a traditional nonmapreduce implementation any gains are usually only seen with multithreaded implementations on multiprocessor hardware the use of this model is beneficial only when the optimized distributed shuffle operation which reduces network communication cost and fault tolerance features of the mapreduce framework come into play optimizing the communication cost is essential to a good mapreduce algorithm mapreduce libraries have been written in many programming languages with different levels of optimization a popular opensource implementation that has support for distributed shuffles is part of apache hadoop the name mapreduce originally referred to the proprietary google technology but has since been genericized by google was no longer using mapreduce as their primary bigdata processing model and development on apache mahout had moved on to more capable and less diskoriented mechanisms that incorporated full map and reduce capabilities"

The idea here is use every word as a key in a hastable (dictionary) and store the counting as the respective value.

In [2]:
def count_words(words):
    bucket = {}
    for word in words:
        bucket[word] = bucket.get(word, 0) + 1
    return bucket

print(count_words(text.split()))

{'mapreduce': 11, 'is': 9, 'a': 13, 'programming': 3, 'model': 4, 'and': 15, 'an': 1, 'associated': 1, 'implementation': 4, 'for': 7, 'processing': 3, 'generating': 1, 'bigdata': 2, 'sets': 1, 'with': 3, 'parallel': 2, 'distributed': 4, 'algorithm': 2, 'on': 4, 'cluster': 1, 'program': 1, 'composed': 1, 'of': 11, 'map': 4, 'procedure': 1, 'which': 4, 'performs': 2, 'filtering': 1, 'sorting': 2, 'such': 3, 'as': 5, 'students': 2, 'by': 5, 'first': 1, 'name': 4, 'into': 2, 'queues': 1, 'one': 1, 'queue': 2, 'each': 2, 'reduce': 5, 'method': 1, 'summary': 1, 'operation': 2, 'counting': 1, 'the': 24, 'number': 1, 'in': 6, 'yielding': 1, 'frequencies': 1, 'system': 2, 'also': 1, 'called': 1, 'infrastructure': 1, 'or': 1, 'framework': 4, 'orchestrates': 1, 'marshalling': 1, 'servers': 1, 'running': 1, 'various': 2, 'tasks': 1, 'managing': 1, 'all': 1, 'communications': 1, 'data': 2, 'transfers': 1, 'between': 1, 'parts': 1, 'providing': 1, 'redundancy': 1, 'fault': 2, 'tolerance': 2, 'specia

Works pretty well. However, if the task was really time consuming we would have to wait the whole process finish before continue.

But if we split the problem into small groups and run each group in a isolated thread, then we could have the same job done in a short time. To do so first we need to chunk the data (the text in this case).

In [3]:
batchsize = 50
text_words = text.split()
text_chunks = []
while text_words:
    text_chunks.append(text_words[:batchsize])
    text_words = text_words[batchsize:]

partial_results = list(map(count_words, text_chunks))
print(*partial_results, sep='\n\n')

{'mapreduce': 2, 'is': 2, 'a': 5, 'programming': 1, 'model': 1, 'and': 3, 'an': 1, 'associated': 1, 'implementation': 1, 'for': 2, 'processing': 1, 'generating': 1, 'bigdata': 1, 'sets': 1, 'with': 1, 'parallel': 1, 'distributed': 1, 'algorithm': 1, 'on': 1, 'cluster': 1, 'program': 1, 'composed': 1, 'of': 1, 'map': 1, 'procedure': 1, 'which': 1, 'performs': 1, 'filtering': 1, 'sorting': 2, 'such': 1, 'as': 1, 'students': 1, 'by': 1, 'first': 1, 'name': 1, 'into': 1, 'queues': 1, 'one': 1, 'queue': 1, 'each': 1}

{'name': 2, 'and': 2, 'a': 2, 'reduce': 1, 'method': 1, 'which': 1, 'performs': 1, 'summary': 1, 'operation': 1, 'such': 1, 'as': 1, 'counting': 1, 'the': 5, 'number': 1, 'of': 1, 'students': 1, 'in': 2, 'each': 1, 'queue': 1, 'yielding': 1, 'frequencies': 1, 'mapreduce': 1, 'system': 1, 'also': 1, 'called': 1, 'infrastructure': 1, 'or': 1, 'framework': 1, 'orchestrates': 1, 'processing': 1, 'by': 1, 'marshalling': 1, 'distributed': 1, 'servers': 1, 'running': 1, 'various': 1,

Now we have seven groups of partial results which we need to combine in order to have get the total counting of each word.

In [4]:
def reducer(chunks):
    bucket = {}
    for partial_result in chunks:
        for word, partial_count in partial_result.items():
            bucket[word] = bucket.get(word, 0) + partial_count
    return bucket

print(reducer(partial_results))

{'mapreduce': 11, 'is': 9, 'a': 13, 'programming': 3, 'model': 4, 'and': 15, 'an': 1, 'associated': 1, 'implementation': 4, 'for': 7, 'processing': 3, 'generating': 1, 'bigdata': 2, 'sets': 1, 'with': 3, 'parallel': 2, 'distributed': 4, 'algorithm': 2, 'on': 4, 'cluster': 1, 'program': 1, 'composed': 1, 'of': 11, 'map': 4, 'procedure': 1, 'which': 4, 'performs': 2, 'filtering': 1, 'sorting': 2, 'such': 3, 'as': 5, 'students': 2, 'by': 5, 'first': 1, 'name': 4, 'into': 2, 'queues': 1, 'one': 1, 'queue': 2, 'each': 2, 'reduce': 5, 'method': 1, 'summary': 1, 'operation': 2, 'counting': 1, 'the': 24, 'number': 1, 'in': 6, 'yielding': 1, 'frequencies': 1, 'system': 2, 'also': 1, 'called': 1, 'infrastructure': 1, 'or': 1, 'framework': 4, 'orchestrates': 1, 'marshalling': 1, 'servers': 1, 'running': 1, 'various': 2, 'tasks': 1, 'managing': 1, 'all': 1, 'communications': 1, 'data': 2, 'transfers': 1, 'between': 1, 'parts': 1, 'providing': 1, 'redundancy': 1, 'fault': 2, 'tolerance': 2, 'specia

Now, to get the benefit of the multithreading we need to create a pool of threads passing the number of parallel tasks we want to run (in this case it will be five). Then, we spawn the tasks by mapping the function `count_words` over the chunks. Finally we combine the partial results using the function `reducer`.

Notice that in order the `multiprocessing` works in a Jupyter notebook we need to save the functions and variables in a different file (here called `multithreadcore.py`) and import it in the cell.

In [5]:
from multiprocessing import Pool
from multithreadcore import *

if __name__ == '__main__':  # only for Windows compatibility
    with Pool(5) as pool:
        partial_results = pool.map(count_words, text_chunks)
        final_result = reducer(partial_results)
        print(final_result)

{'mapreduce': 11, 'is': 9, 'a': 13, 'programming': 3, 'model': 4, 'and': 15, 'an': 1, 'associated': 1, 'implementation': 4, 'for': 7, 'processing': 3, 'generating': 1, 'bigdata': 2, 'sets': 1, 'with': 3, 'parallel': 2, 'distributed': 4, 'algorithm': 2, 'on': 4, 'cluster': 1, 'program': 1, 'composed': 1, 'of': 11, 'map': 4, 'procedure': 1, 'which': 4, 'performs': 2, 'filtering': 1, 'sorting': 2, 'such': 3, 'as': 5, 'students': 2, 'by': 5, 'first': 1, 'name': 4, 'into': 2, 'queues': 1, 'one': 1, 'queue': 2, 'each': 2, 'reduce': 5, 'method': 1, 'summary': 1, 'operation': 2, 'counting': 1, 'the': 24, 'number': 1, 'in': 6, 'yielding': 1, 'frequencies': 1, 'system': 2, 'also': 1, 'called': 1, 'infrastructure': 1, 'or': 1, 'framework': 4, 'orchestrates': 1, 'marshalling': 1, 'servers': 1, 'running': 1, 'various': 2, 'tasks': 1, 'managing': 1, 'all': 1, 'communications': 1, 'data': 2, 'transfers': 1, 'between': 1, 'parts': 1, 'providing': 1, 'redundancy': 1, 'fault': 2, 'tolerance': 2, 'specia