#**Parallelization in Python**

Example taken from
https://towardsdatascience.com/parallelization-in-python-the-easy-way-aa03ed04c209

In [1]:
import time
import pathlib
from typing import Dict, Generator, Iterable

# type aliases
ResultsType = Dict[str, int]

def read_file(path: pathlib.Path) -> str:
    # a file is read to a string and returned
    # here, it's mocked by the file's name as a string
    # this step should be rather quick
    time.sleep(.05)
    text = path.name
    return text

def preprocess(text: str) -> str:
    # preprocessing is done here
    # we make upper case of text
    # longer than reading but quite shorter than processing
    time.sleep(.25)
    text = text.upper()
    return text

def process(text: str) -> ResultsType:
    # the main process is run here
    # we return the number of "A" and "O" letters in text
    # this is the longest process among all
    time.sleep(1.)
    search = ("A", "B", )
    results = {letter: text.count(letter) for letter in search}
    return results

def pipeline(path: pathlib.Path) -> ResultsType:
    text = read_file(path)
    preprocessed = preprocess(text)
    processed = process(preprocessed)
    return processed

The code above defines a pipeline to process texts:
* read a text file
* preprocess the file; this means cleaning and checking
* process the file
* return the results

In [2]:
%%time
#creating an iterable of paths
file_paths = [
    pathlib.Path(p)
    for p in (
        "book_about_python.txt",
        "book_about_java.txt",
        "book_about_c.txt",
        "science_fiction_book.txt",
        "lolita.txt",
        "go_there_and_return.txt",
        "statistics_for_dummies.txt",
        "data_science_part_1.txt",
        "data_science_part_2.txt",
        "data_science_part_3.txt",
    )
]
print('file_paths is of type: ', type(file_paths))
list(file_paths)

file_paths is of type:  <class 'list'>
CPU times: user 1.78 ms, sys: 0 ns, total: 1.78 ms
Wall time: 1.79 ms


[PosixPath('book_about_python.txt'),
 PosixPath('book_about_java.txt'),
 PosixPath('book_about_c.txt'),
 PosixPath('science_fiction_book.txt'),
 PosixPath('lolita.txt'),
 PosixPath('go_there_and_return.txt'),
 PosixPath('statistics_for_dummies.txt'),
 PosixPath('data_science_part_1.txt'),
 PosixPath('data_science_part_2.txt'),
 PosixPath('data_science_part_3.txt')]

### **Nonparallel executions**

In [3]:
if __name__ == "__main__":
    start = time.perf_counter()
    results = {path: pipeline(path) for path in file_paths}
    print(f"Elapsed time: {round(time.perf_counter() - start, 2)} seconds")
    print(results)

Elapsed time: 13.01 seconds
{PosixPath('book_about_python.txt'): {'A': 1, 'B': 2}, PosixPath('book_about_java.txt'): {'A': 3, 'B': 2}, PosixPath('book_about_c.txt'): {'A': 1, 'B': 2}, PosixPath('science_fiction_book.txt'): {'A': 0, 'B': 1}, PosixPath('lolita.txt'): {'A': 1, 'B': 0}, PosixPath('go_there_and_return.txt'): {'A': 1, 'B': 0}, PosixPath('statistics_for_dummies.txt'): {'A': 1, 'B': 0}, PosixPath('data_science_part_1.txt'): {'A': 3, 'B': 0}, PosixPath('data_science_part_2.txt'): {'A': 3, 'B': 0}, PosixPath('data_science_part_3.txt'): {'A': 3, 'B': 0}}


In [4]:
#the same using map
if __name__ == "__main__":
    start = time.perf_counter()
    pipeline_gen = map(lambda p: (p, pipeline(p)), file_paths)
    # or we can use a generator expression:
    # pipeline_gen = ((path, pipeline(path)) for path in file_paths)
    results = dict(pipeline_gen)
    print(f"Elapsed time: {round(time.perf_counter() - start, 2)} seconds")
    print(results)

Elapsed time: 13.02 seconds
{PosixPath('book_about_python.txt'): {'A': 1, 'B': 2}, PosixPath('book_about_java.txt'): {'A': 3, 'B': 2}, PosixPath('book_about_c.txt'): {'A': 1, 'B': 2}, PosixPath('science_fiction_book.txt'): {'A': 0, 'B': 1}, PosixPath('lolita.txt'): {'A': 1, 'B': 0}, PosixPath('go_there_and_return.txt'): {'A': 1, 'B': 0}, PosixPath('statistics_for_dummies.txt'): {'A': 1, 'B': 0}, PosixPath('data_science_part_1.txt'): {'A': 3, 'B': 0}, PosixPath('data_science_part_2.txt'): {'A': 3, 'B': 0}, PosixPath('data_science_part_3.txt'): {'A': 3, 'B': 0}}


In [5]:
#trick to simplify calls to map() that use lambda
def evaluate_pipeline(path):
    return path, pipeline(path)

if __name__ == "__main__":
    start = time.perf_counter()
    pipeline_gen = map(evaluate_pipeline, file_paths)
    results = dict(pipeline_gen)
    print(f"Elapsed time: {round(time.perf_counter() - start, 2)}")
    print(results)

#The map object yields a generator of two-element tuples (path, pipeline(path)).
#We can use dict(pipeline_gen) to evaluate the generator, and
#convert the results to a dictionary consisting of path-pipeline(path)
#key-value pairs.

Elapsed time: 13.02
{PosixPath('book_about_python.txt'): {'A': 1, 'B': 2}, PosixPath('book_about_java.txt'): {'A': 3, 'B': 2}, PosixPath('book_about_c.txt'): {'A': 1, 'B': 2}, PosixPath('science_fiction_book.txt'): {'A': 0, 'B': 1}, PosixPath('lolita.txt'): {'A': 1, 'B': 0}, PosixPath('go_there_and_return.txt'): {'A': 1, 'B': 0}, PosixPath('statistics_for_dummies.txt'): {'A': 1, 'B': 0}, PosixPath('data_science_part_1.txt'): {'A': 3, 'B': 0}, PosixPath('data_science_part_2.txt'): {'A': 3, 'B': 0}, PosixPath('data_science_part_3.txt'): {'A': 3, 'B': 0}}


### **Parallel execution**

Let’s use multiprocessing, the standard-library module for parallelization.

In [6]:
import multiprocessing as mp

In [7]:
import os
print(f'mp cpus: {mp.cpu_count()} os cpus: {os.cpu_count()}')


mp cpus: 2 os cpus: 2


In [8]:
if __name__ == "__main__":
    start = time.perf_counter()
    with mp.Pool(4) as p:
        pipeline_gen = dict(p.map(evaluate_pipeline, file_paths))
    print(f"Elapsed time: {round(time.perf_counter() - start, 2)}")
    print(results)

Elapsed time: 3.96
{PosixPath('book_about_python.txt'): {'A': 1, 'B': 2}, PosixPath('book_about_java.txt'): {'A': 3, 'B': 2}, PosixPath('book_about_c.txt'): {'A': 1, 'B': 2}, PosixPath('science_fiction_book.txt'): {'A': 0, 'B': 1}, PosixPath('lolita.txt'): {'A': 1, 'B': 0}, PosixPath('go_there_and_return.txt'): {'A': 1, 'B': 0}, PosixPath('statistics_for_dummies.txt'): {'A': 1, 'B': 0}, PosixPath('data_science_part_1.txt'): {'A': 3, 'B': 0}, PosixPath('data_science_part_2.txt'): {'A': 3, 'B': 0}, PosixPath('data_science_part_3.txt'): {'A': 3, 'B': 0}}


In [9]:
if __name__ == "__main__":
    start = time.perf_counter()
    with mp.Pool(8) as p:
        pipeline_gen = dict(p.map(evaluate_pipeline, file_paths))
    print(f"Elapsed time: {round(time.perf_counter() - start, 2)}")
    print(results)

Elapsed time: 2.69
{PosixPath('book_about_python.txt'): {'A': 1, 'B': 2}, PosixPath('book_about_java.txt'): {'A': 3, 'B': 2}, PosixPath('book_about_c.txt'): {'A': 1, 'B': 2}, PosixPath('science_fiction_book.txt'): {'A': 0, 'B': 1}, PosixPath('lolita.txt'): {'A': 1, 'B': 0}, PosixPath('go_there_and_return.txt'): {'A': 1, 'B': 0}, PosixPath('statistics_for_dummies.txt'): {'A': 1, 'B': 0}, PosixPath('data_science_part_1.txt'): {'A': 3, 'B': 0}, PosixPath('data_science_part_2.txt'): {'A': 3, 'B': 0}, PosixPath('data_science_part_3.txt'): {'A': 3, 'B': 0}}


In [10]:
if __name__ == "__main__":
    start = time.perf_counter()
    with mp.Pool(12) as p:
        pipeline_gen = dict(p.map(evaluate_pipeline, file_paths))
    print(f"Elapsed time: {round(time.perf_counter() - start, 2)}")
    print(results)

Elapsed time: 1.48
{PosixPath('book_about_python.txt'): {'A': 1, 'B': 2}, PosixPath('book_about_java.txt'): {'A': 3, 'B': 2}, PosixPath('book_about_c.txt'): {'A': 1, 'B': 2}, PosixPath('science_fiction_book.txt'): {'A': 0, 'B': 1}, PosixPath('lolita.txt'): {'A': 1, 'B': 0}, PosixPath('go_there_and_return.txt'): {'A': 1, 'B': 0}, PosixPath('statistics_for_dummies.txt'): {'A': 1, 'B': 0}, PosixPath('data_science_part_1.txt'): {'A': 3, 'B': 0}, PosixPath('data_science_part_2.txt'): {'A': 3, 'B': 0}, PosixPath('data_science_part_3.txt'): {'A': 3, 'B': 0}}


In [11]:
if __name__ == "__main__":
    start = time.perf_counter()
    with mp.Pool(10) as p:
        pipeline_gen = dict(p.map(evaluate_pipeline, file_paths))
    print(f"Elapsed time: {round(time.perf_counter() - start, 2)}")
    print(results) #10 seems to be the maximum number of availables cores

Elapsed time: 1.5
{PosixPath('book_about_python.txt'): {'A': 1, 'B': 2}, PosixPath('book_about_java.txt'): {'A': 3, 'B': 2}, PosixPath('book_about_c.txt'): {'A': 1, 'B': 2}, PosixPath('science_fiction_book.txt'): {'A': 0, 'B': 1}, PosixPath('lolita.txt'): {'A': 1, 'B': 0}, PosixPath('go_there_and_return.txt'): {'A': 1, 'B': 0}, PosixPath('statistics_for_dummies.txt'): {'A': 1, 'B': 0}, PosixPath('data_science_part_1.txt'): {'A': 3, 'B': 0}, PosixPath('data_science_part_2.txt'): {'A': 3, 'B': 0}, PosixPath('data_science_part_3.txt'): {'A': 3, 'B': 0}}


* *with mp.Pool(4) as p:*: It’s a good rule to use a context manager for mp.Pool, as you do not need to remember to close the pool.
* *p.map(evaluate_pipeline, file_paths):* The only change made was changing *map()* to *p.map()*.

 * *p.map()* does not evaluate lazily like *map()*, but does so greedily (immediately). Therefore, it does not return a generator; instead, it returns a list.

 ***Other modules for parallelization:***
 * Ray: open-source unified framework for scaling AI and Python applications (https://docs.ray.io/en/latest/index.html).

 * Pathos:  framework for heterogeneous computing. It provides a consistent high-level interface for configuring and launching parallel computations across heterogeneous resources (https://pathos.readthedocs.io/en/latest/index.html).