### 1. Multiprocessing

> 1. What is a multiprocessing?  
> &rarr; It is to run a program in multiple CPU (core).  
> 2. When it is good for?   
> &rarr; It is good for multi CPU bound problem. (Such as a problem which is tightly bound 'for' loop)    
> 3. Multiprocessing properties:  
    Seperate memory    
> 4. **For multiprocessing, it is suggested to run in Python instead of Jupyter Notebook**  
> 5. Note: sklearn has already implemented multiprocessing into many algorithms.  
**n_jobs** :number of processes it should use to enable multiprocessing.

#### How many logical CPUs in your system?

In [2]:
import multiprocessing
multiprocessing.cpu_count()

56

#### 1.1 Pool class: Part 1. Input argument is a number and get return values

In [None]:
# It runs on Python enviroment
import multiprocessing
from multiprocessing import Pool, current_process
import time

def do_computation(v):
    print('run at {}'.format(current_process().name))
    time.sleep(v)       
    return v*v    

if __name__ == "__main__":  
    num_processes = multiprocessing.cpu_count()
    print('num_processes = ', num_processes)
    vs = [1, 2, 3, 4]  # make input args as iterable

    with Pool(num_processes) as p:
        results = p.map(func=do_computation, 
                        iterable=vs)
    print('results = ', results)

# OUTPUT
# C:\ProgramData\Anaconda3\python.exe D:/multiprocessing/mp_pool_1.py
# num_processes =  56
# run at SpawnPoolWorker-2
# run at SpawnPoolWorker-1
# run at SpawnPoolWorker-3
# run at SpawnPoolWorker-4
# results =  [1, 4, 9, 16]

#### 1.1 Pool class: Part 2. Multiple input arguments (list, string, etc) and get return values

In [None]:
import multiprocessing
from multiprocessing import Pool, current_process
import time


def do_expensive_computation(values, name):
    results = []
    for v in values:
        time.sleep(v)  # Assuming this part will take time in real application
        print('run at {}'.format(current_process().name))
        results.append({name: v * 2})  # Just an example to return something
    return results


if __name__ == "__main__":
    num_processes = multiprocessing.cpu_count()
    print('num_processes = ', num_processes)

    # Prepare list of arguments for do_expensive_computation: list of tuple
    # since values argument is list and name is string we have to specify them.
    args = [([1], 'test1'), ([2], 'test2'), ([3], 'test3'), ([4], 'test4')]

    with Pool(num_processes) as p:
        
        # Note of using starmap() instead of map() since we input a list of tuple
        results = p.starmap(func=do_expensive_computation,
                            iterable=args)
    print('results = ', results)
    
# OUTPUT
# C:\ProgramData\Anaconda3\python.exe D:/multiprocessing/mp_pool_2.py
# num_processes =  56
# run at SpawnPoolWorker-1
# run at SpawnPoolWorker-3
# run at SpawnPoolWorker-5
# run at SpawnPoolWorker-2
# results =  [[{'test1': 2}], [{'test2': 4}], [{'test3': 6}], [{'test4': 8}]]
#
#            [{'test1': 2}] is output of function at SpawnPoolWorker-1
#            [{'test2': 4}] is output of function at SpawnPoolWorker-3 and so on

#------------------------------------------------------------------------

# If RUNNING only 2 CPU by setting num_processes = 2, the output is as below
# num_processes =  2
# run at SpawnPoolWorker-1
# run at SpawnPoolWorker-2
# run at SpawnPoolWorker-1
# run at SpawnPoolWorker-2
# results =  [[{'test1': 2}], [{'test2': 4}], [{'test3': 6}], [{'test4': 8}]]

#### 2. Process class 

In [None]:
from multiprocessing import Process

def print_func(city_name='Seoul'):
    print('Name of city is {}'.format(city_name))
    # print(f'Name of city is {city_name}') # using f string function

if __name__ == '__main__':
    city_names = ['Hanoi', 'Seoul', 'Paris']
    procs = []   # list of process
    
    # Initialize (parallel) for each process
    for city_name in city_names:
        proc = Process(target=print_func,  # target is 'target' function
                       args=(city_name,))  # args: arguments of target function.
                                           # in this case, args is a tuple with extra comma
        
        proc.start()   # just inialiaze: start to run
        ## NOTE DO NOT put proc.join() because we want to run parallel
        procs.append(proc)

    # Run (parallel) the processes to complete.
    for proc in procs:
        proc.join()  # join means the process need to be finished 
                     # before running the next code in script
    print('The end')

# OUTPUT:
# Name of city is Seoul
# Name of city is Hanoi
# Name of city is Paris
# The end

#### 3. Queue class

#### Example 1

In [None]:
from multiprocessing import Queue

city_names = ['Hanoi', 'Seoul', 'Paris']
count = 1

queue = Queue()  # instantiating a queue object

# Put items to queue
print('Put items to queue')
for city_name in city_names:
    print('city name = {}, count = {}'.format(city_name, count))
    queue.put(city_name)
    count += 1
    
print('\nGetting (Popping) items to queue')
cnt = 0
while not queue.empty():
    print('city name = {}, cnt = {}'.format(queue.get(), cnt))
    cnt += 1
    
# OUTPUT:
# Put items to queue
# city name = Hanoi, count = 1
# city name = Seoul, count = 2
# city name = Paris, count = 3

# Getting (Popping) items to queue
# city name = Hanoi, cnt = 0
# city name = Seoul, cnt = 1
# city name = Paris, cnt = 2

#### Example 2

In [None]:
from multiprocessing import Process, Queue, Lock, current_process
import time
import queue  # imported for using "queue.Empty" exception

def do_tasks(task_to_do, task_done):
    while True:
        try:
            task = task_to_do.get_nowait()
        except queue.Empty:
            break
        else:  # do it if no exception has been raised, add task to task_done queue
            print(task)
            task_done.put(task + 'done by ' + current_process().name)
            time.sleep(0.5)  # [s]
    # End of while
    return True

if __name__ == "__main__":
    num_tasks = 5
    num_processes = 4
    task_to_do = Queue()
    task_done = Queue()
    
    processes = []
    
    for i in range(num_tasks):
        task_to_do.put('Task number ' + str(i))
        
    # Creating process
    for _ in range(num_processes):
        p = Process(target=do_tasks, args=(task_to_do, task_done))
        processes.append(p)
        p.start()
        
    # Completing processes
    for p in processes:
        p.join()
        
    while not task_done.empty():
        print(task_done.get())   
        
# OUPTPUT
# Task number 0
# Task number 1
# Task number 2
# Task number 3
# Task number 4
# Task number 0: done by Process-2
# Task number 1: done by Process-4
# Task number 2: done by Process-1
# Task number 3: done by Process-3
# Task number 4: done by Process-2