# Multiprocessing Benchmarking

## Testing Parallelizing Methods

### Define help methods

In [5]:
import multiprocessing as mp

import datetime
import os

import numpy as np
import random
from random import randint
import click

class StopWatch:

    start = None
    
    def __init__(self):
        self.reset()
    
    def reset(self):
        self.start = datetime.datetime.now()
    
    def read(self, in_seconds=False):
        delta = datetime.datetime.now() - self.start
        if in_seconds:
            return (delta.total_seconds())
        return(delta)
    
    def show(self, in_seconds=False):
        print(f'Elapsed time: {self.read(in_seconds=in_seconds)}.')

### Prepare data

To test the algorithms, we will sort rows from a matrix. The goal is to sort each row, and measure the elapsed time to perform it.

In [3]:
ROWS = 1000
COLS = 500

print('Generating data...')
sw = StopWatch()
random.seed(100)
data = np.array([randint(0,4000) for i in range(ROWS * COLS)]).reshape((ROWS, COLS))
sw.show()

def bubble(iterable):
    r = iterable.copy()
    for i in range(len(r)-1):
        for j in range(i + 1, len(r)):
            if r[j] < r[i]:
                r[j], r[i] = r[i], r[j]
    return sum(r)

Generating data...
Elapsed time: 0:00:00.424131.


### Sorting without multiprocessing

In [3]:
result = []

sw = StopWatch()
for row in range(ROWS):
    result.append(bubble(data[row]))
    
sw.show()
print(result[0:10])

Elapsed time: 0:00:32.640735.
[998776, 995921, 1030872, 962105, 1046982, 985287, 1041406, 1019643, 1004639, 934978]


### Sorting using Pool.apply()

In [4]:
pool = mp.Pool(processes=mp.cpu_count())

sw = StopWatch()
results = []
for row in data:
    results.append(pool.apply(bubble, args=(row,)))

pool.close()

sw.show()
print(result[0:10])

Elapsed time: 0:00:42.456957.
[998776, 995921, 1030872, 962105, 1046982, 985287, 1041406, 1019643, 1004639, 934978]


### Sorting using Pool.apply_async()

In [5]:
pool = mp.Pool(processes=mp.cpu_count())

sw = StopWatch()
results = []
for row in data:
    results.append(pool.apply_async(bubble, args=(row,)))

pool.close()
pool.join()

sw.show()
print(result[0:10])

Elapsed time: 0:00:12.120840.
[998776, 995921, 1030872, 962105, 1046982, 985287, 1041406, 1019643, 1004639, 934978]


### Sorting using Pool.starmap()

In [6]:
pool = mp.Pool(processes=mp.cpu_count())

sw = StopWatch()
results = []
results = pool.starmap(bubble, [(row,) for row in data])

pool.close()

sw.show()
print(result[0:10])

Elapsed time: 0:00:11.415711.
[998776, 995921, 1030872, 962105, 1046982, 985287, 1041406, 1019643, 1004639, 934978]


### Sorting using Pool.imap()

In [7]:
pool = mp.Pool(processes=mp.cpu_count())

sw = StopWatch()
results = []
results = pool.imap(bubble, [row for row in data])

pool.close()

result_array = []
for (key, content) in enumerate(results):
    result_array.append(content)
    
sw.show()
print(result_array[0:10])

Elapsed time: 0:00:11.708029.
[998776, 995921, 1030872, 962105, 1046982, 985287, 1041406, 1019643, 1004639, 934978]


### Sorting using Pool.starmap_async()

In [8]:
pool = mp.Pool(processes=mp.cpu_count())

sw = StopWatch()
results = []
results = pool.starmap_async(bubble, [(row,) for row in data]).get()

pool.close()

sw.show()

Elapsed time: 0:00:11.041710.


---

## Testing SingletonRetry with Multiprocessing

### Defining a Singleton Class

In [6]:
class SingletonRetry:
    __instance = None

    def __new__(cls):
        if SingletonRetry.__instance is None:
            SingletonRetry.__instance = object.__new__(cls)
            SingletonRetry.__instance.retry_entries = []

        return SingletonRetry.__instance

    @staticmethod
    def force_new():
        SingletonRetry.__instance = None

    @staticmethod
    def get_retry_entries():
        """
        Get the retry entries.

        Returns:
            List of RetryEntries.
        """

        return SingletonRetry.__instance.retry_entries

    @staticmethod
    def add_retry_entry(data):
        """
        Append retry entry into singleton object.

        Args:
            retry_entry: Instance of RetryEntry

        Returns:
            None
        """
        return SingletonRetry.__instance.retry_entries.append(data)

    

In [11]:
SingletonRetry().force_new()
singleton = SingletonRetry()
singleton.add_retry_entry(999)

def add_data_to_singleton(data):
    swi = StopWatch()
    for row in data:
        singleton.add_retry_entry(row)
    singleton.add_retry_entry(swi.read())
    print(f'Inside Process: {singleton.get_retry_entries()}')
    singleton.force_new()

# def callback(result):
#     global singleton
#     singleton.add_retry_entry(result)

# print(f'Singleton entries after first instatiation: {singleton.get_retry_entries()}')

In [12]:
pool = mp.Pool(processes=mp.cpu_count())

data = np.array([i for i in range(100)]).reshape((10, 10))

sw = StopWatch()
results = []
for row in data:
    results.append(pool.apply_async(add_data_to_singleton, args=(row,)))

pool.close()
pool.join()

sw.show()

Inside Process: [999, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, datetime.timedelta(microseconds=44)]
Inside Process: [999, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, datetime.timedelta(microseconds=52)]
Inside Process: [999, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, datetime.timedelta(microseconds=51)]
Inside Process: [999, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, datetime.timedelta(microseconds=53)]
Inside Process: [999, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, datetime.timedelta(microseconds=52)]
Inside Process: [999, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, datetime.timedelta(microseconds=49)]
Inside Process: [999, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, datetime.timedelta(microseconds=49)]
Inside Process: [999, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, datetime.timedelta(microseconds=55)]
Elapsed time: 0:00:00.100695.
