# Multiprocessing in Python

Multiprocessing allows Python to run CPU-intensive tasks side by side. This is done by launching multiple and independent copies of the Python runtime.

## Advantages
- **True multitasking** by giving each Python process its own CPU core
    - Ideal for CPU-intensive tasks
- **Better resource utilization**
- **Isolation of processes**:
    - Each process runs in its own memory space, reducing the risk of data corruption and making debugging simpler.
- **Simplified resource management**:
    - Compared to threading, multiprocessing in Python is often easier to implement and manage, with fewer concerns about thread safety and synchronization.
- Easier code **implementation** compared to threading
    - No need for thread pools
    - No need for queues
    - No exceptions handling complexities
    - No rate limitations

In other words, a lot less work!

## Disadvantages
- **Additional overhead on the CPU**
    - Creating and managing multiple processes introduces overhead, which can impact overall performance.
- **Memory consumption**: Each subprocess needs to have a copy of the data
    - This increases the resources needed ie each process requires its own memory space
    - Need to pass resources between tasks

# A simple example

In [1]:
from multiprocessing import Process

def print_square(num):
    print(f'Square: {num * num}')

if __name__ == "__main__":
    p = Process(target=print_square, args=(10,))
    p.start()
    p.join()

Square: 100


## ok maybe that was to simple

In [2]:
from multiprocessing import Process

def print_square(num):
    print(f'Square of {num}: {num * num}')

if __name__ == "__main__":
    processes = []
    for i in range(1, 11):
        p = Process(target=print_square, args=(i,))
        processes.append(p)
        p.start()

    for p in processes: #wait for everything to finish
        p.join()

Square of 1: 1
Square of 2: 4
Square of 3: 9
Square of 4: 16
Square of 5: 25
Square of 6: 36
Square of 7: 49
Square of 8: 64
Square of 9: 81
Square of 10: 100


Notice that it looks very similar to threading.

# The old way - Forking - Linux, macOS, Unix

**PID (Process Identifier)**: A number used by kernel-based OS to uniquely identify an active process.

On POSIX-compliant systems (such as Linux, macOS, and Unix), the fork system call is used to create a new process. When a process forks, it creates a child process that is an exact copy of the parent process. Both processes then continue executing independently from the point where the fork occurred.

In Python, the os.fork() function allows you to create a child process. After forking, the parent and child processes have separate memory spaces, enabling parallel execution without interference.

os.fork() may not work in windows so it is recommended to use WSL2

In [None]:
import os  # OS manages the CPU tasks

pid_list = []  # List to store the CPU PIDs

tmp = 1  # Setup a variable to track the PID
pid_list.append(os.getpid())
child_pid = os.fork()  # Create 1 fork

if child_pid == 0:
    pid_list.append(os.getpid())
    print("CHLD: TMP value is: %d " % tmp)
    print("CHLD: pids are %s" % pid_list)
else:
    pid_list.append(os.getpid())
    tmp += 1
    print()
    print("PRNT: Child pid: %d" % child_pid)
    print("PRNT: pids are %s" % pid_list)
    print("PRNT: TMP value is: %d " % tmp)


PRNT: Child pid: 57415
PRNT: pids are [57287, 57287]
PRNT: TMP value is: 2 
CHLD: TMP value is: 1 

  child_pid = os.fork()  # Create 1 fork



CHLD: pids are [57287, 57415]




## Forking Example Compatible with Windows and Linux

Since `os.fork()` is not available on Windows, we can use the `multiprocessing` module to achieve similar functionality:

In [36]:
from multiprocessing import Process, current_process
import os

def child_process():
    print(f"CHLD: PID is {os.getpid()}, Parent PID is {os.getppid()}")


print(f"PRNT: PID is {os.getpid()}")
child = Process(target=child_process)
child.start()
child.join()



PRNT: PID is 47776
CHLD: PID is 50489, Parent PID is 47776


### Notes:
- The `os.fork()` function is specific to Unix-like systems (Linux, macOS). For cross-platform compatibility, we use the `multiprocessing` module.
- Ensure you run this code in a terminal or an IDE that supports multiprocessing, as some environments like Jupyter Notebooks may not handle multiprocessing well.

## Another Example

Here's an example showing how to create new processes with the `multiprocessing` module:

In [4]:
from multiprocessing import Process
import os

def work(identifier):
    print('Process {}, pid: {}'.format(identifier, os.getpid()))


processes = [Process(target=work, args=(number,)) for number in range(5)]
for process in processes:# Start all the processes 
    process.start()

for process in processes:# wait for them all to be done
    process.join()


Process 0, pid: 57470
Process 1, pid: 57473
Process 2, pid: 57478
Process 3, pid: 57483
Process 4, pid: 57488


## Large Caclulation to show CPU Utilization
ON windows open up you command prompt and click CPU

On Macos open Activity Monitor, navigate to the "CPU" tab

On Linux systems type mpstat -P ALL 1 100


In [11]:
import multiprocessing
import time

def large_calculation(num):
    result = 0
    for i in range(1, 10000000):
        result += i * num
    print(f'Large calculation result for {num}: {result}')


start_time = time.time()

processes = []
for i in range(1, 11):
    p = multiprocessing.Process(target=large_calculation, args=(i,))
    processes.append(p)
    p.start()

for p in processes:
    p.join()

end_time = time.time()
print(f"Total execution time: {end_time - start_time} seconds")

Large calculation result for 3: 149999985000000
Large calculation result for 7: 349999965000000
Large calculation result for 1: 49999995000000
Large calculation result for 5: 249999975000000
Large calculation result for 9: 449999955000000
Large calculation result for 6: 299999970000000
Large calculation result for 2: 99999990000000
Large calculation result for 8: 399999960000000
Large calculation result for 10: 499999950000000
Large calculation result for 4: 199999980000000
Total execution time: 3.8376574516296387 seconds


In [10]:
#the old way for fun
import time

def large_calculation(num):
    result = 0
    for i in range(1, 10000000):
        result += i * num
    print(f'Large calculation result for {num}: {result}')


start_time = time.time()

for i in range(1, 11):
    large_calculation(i)

end_time = time.time()
print(f"Total execution time: {end_time - start_time} seconds")

Large calculation result for 1: 49999995000000
Large calculation result for 2: 99999990000000
Large calculation result for 3: 149999985000000
Large calculation result for 4: 199999980000000
Large calculation result for 5: 249999975000000
Large calculation result for 6: 299999970000000
Large calculation result for 7: 349999965000000
Large calculation result for 8: 399999960000000
Large calculation result for 9: 449999955000000
Large calculation result for 10: 499999950000000
Total execution time: 7.893022298812866 seconds


## Pools

In [12]:
from multiprocessing import Pool

def square(num):
    return num * num

with Pool(4) as p:
    result = p.map(square, [1, 2, 3, 4, 5])
    print(result)


[1, 4, 9, 16, 25]


In [14]:
from multiprocessing import Pool
import time

def large_calculation(num):
    result = 0
    for i in range(1, 10000000):
        result += i * num
    print(f'Large calculation result for {num}: {result}')


start_time = time.time()

with Pool(4) as p:
    p.map(large_calculation, range(1, 11))

end_time = time.time()
print(f"Total execution time: {end_time - start_time} seconds")

Large calculation result for 2: 99999990000000
Large calculation result for 4: 199999980000000
Large calculation result for 3: 149999985000000
Large calculation result for 1: 49999995000000
Large calculation result for 5: 249999975000000
Large calculation result for 8: 399999960000000
Large calculation result for 6: 299999970000000
Large calculation result for 7: 349999965000000
Large calculation result for 9: 449999955000000
Large calculation result for 10: 499999950000000
Total execution time: 3.673798084259033 seconds


## Communication Between Processes



### Using `multiprocessing.Value` to communicate between processes:

Value is used for sharing simple things like int and doubles


In [41]:
from multiprocessing import Value, Process

def increment(shared_num):
    shared_num.value += 1


shared_num = Value('i', 0) #i for int
processes = [Process(target=increment, args=(shared_num,)) 
             for _ in range(10)] #group them all

for p in processes:
    p.start()
for p in processes:
    p.join()

print(shared_num.value)

10


#### The important lock

In [None]:
from multiprocessing import Value, Process

def sum_of_squares(shared_sum, start, end):
    for i in range(start, end):
        shared_sum.value += i * i


shared_sum = Value('i', 0)  # 'i' for int
num_processes = 4
range_per_process = 100 // num_processes

processes = [
    Process(target=sum_of_squares, 
            args=(shared_sum, i * range_per_process, (i + 1) * range_per_process))
           for i in range(num_processes)]

for p in processes:
    p.start()
for p in processes:
    p.join()

print(shared_sum.value)

In [1]:
from multiprocessing import Process, Value, Lock

def sum_of_squares(start, end, shared_result, lock):
    for i in range(start, end):
        with lock:
            shared_result.value += i*i

if __name__ == "__main__":
    num_processes = 4
    range_per_process = 100 // num_processes
    shared_result = Value('i', 0) 
    lock = Lock()

    processes = [
        Process(target=sum_of_squares, args=(i * range_per_process + 1, (i + 1) * range_per_process + 1, shared_result, lock))
        for i in range(num_processes)
    ]

    for p in processes:
        p.start()
    for p in processes:
        p.join()

    print(shared_result.value)

338350


    Ok so whats the big deal there the same?

In [None]:
from multiprocessing import Process, Value

def factorial_part(start, end, shared_result):
    result = 1
    for i in range(start, end):
        result *= i
        shared_result.value *= result

if __name__ == "__main__":
    num_processes = 4
    n = 100  # Large number for factorial calculation
    range_per_process = n // num_processes
    shared_result = Value('d', 1.0)  # 'd' for double to handle large numbers

    processes = [
        Process(
            target=factorial_part, 
            args=(i * range_per_process + 1, (i + 1) * range_per_process + 1, shared_result))
            for i in range(num_processes)
    ]

    for p in processes:
        p.start()
    for p in processes:
        p.join()

    print(shared_result.value)

inf


In [None]:
from multiprocessing import Process, Value, Lock

def factorial_part(start, end, shared_result, lock):
    result = 1
    for i in range(start, end):
        result *= i
    with lock:
        shared_result.value *= result

if __name__ == "__main__":
    num_processes = 4
    n = 100  # Large number for factorial calculation
    range_per_process = n // num_processes
    shared_result = Value('d', 1.0)  # 'd' for double to handle large numbers
    lock = Lock()

    processes = [
        Process(
            target=factorial_part, 
            args=(i * range_per_process + 1, (i + 1) * range_per_process + 1, shared_result, lock))
            for i in range(num_processes)
    ]

    for p in processes:
        p.start()
    for p in processes:
        p.join()

    print(shared_result.value)

9.332621544394416e+157


### Using 'Multiprocessing.Manager' to communicate 

Value is only useful for simple things tasks Manager is used for  sharing lists or dictionaries between processes

#### Lists

In [None]:
from multiprocessing import Process, Manager

def append_to_list(shared_list):
    for i in range(5):
        shared_list.append(i)


manager = Manager()
shared_list = manager.list()

processes = [
    Process(target=append_to_list, args=(shared_list,)) 
    for _ in range(4)]

for p in processes:
    p.start()
for p in processes:
    p.join()

print(shared_list)

[0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 0, 1, 2, 3, 4]


#### Dictionaries 

In [None]:
from multiprocessing import Process, Manager

def update_dict(shared_dict):
    for i in range(5):
        shared_dict[i] = i * i


manager = Manager()
shared_dict = manager.dict()

processes = [Process(target=update_dict, args=(shared_dict,)) 
             for _ in range(4)]

for p in processes:
    p.start()
for p in processes:
    p.join()

print(shared_dict)

{0: 0, 1: 1, 2: 4, 3: 9, 4: 16}


#### Dictionary with a lock

In [None]:
from multiprocessing import Process, Manager, Lock

def update_dict(shared_dict, lock, key, value):
    with lock:
        shared_dict[key] = value


manager = Manager()
shared_dict = manager.dict()
lock = Lock()

processes = [
    Process(target=update_dict, args=(shared_dict, lock, f'{i}', i * i))
    for i in range(5)
]

for p in processes: 
    p.start()
for p in processes:
    p.join()

print(shared_dict)

{'0': 0, '1': 1, '2': 4, '3': 9, '4': 16}


### Using `multiprocessing.Pipe` to communicate between processes

- Pipe allows for 
    - Bidirectional Communication: You need two-way communication between processes. Pipe allows both ends to send and receive messages.
    - Complex Data: You need to send complex data structures (e.g., lists, dictionaries, objects) between processes. Pipe can handle these types of data.

But note it is a lot more confusing to understand

In [27]:
from multiprocessing import Process, Pipe

def send_message(conn):
    conn.send("Hello from the child process!")
    conn.close()


parent_conn, child_conn = Pipe()

p = Process(target=send_message, args=(child_conn,))
p.start()
p.join()

message = parent_conn.recv()
print(message)

Hello from the child process!


### A little more complex 

this code send an int, string, dictionary, class, and none

In [28]:
from multiprocessing import Process, Pipe

class CustomClass:
    pass

def work(connection):
    while True:
        instance = connection.recv()
        if instance:
            print("CHLD: recv: {}".format(instance))
        else:
            return


parent_conn, child_conn = Pipe()
child = Process(target=work, args=(child_conn,))

for item in (42, 'some string', {'one': 1}, CustomClass(), None):
    print("PRNT: send: {}".format(item))
    parent_conn.send(item)

child.start()
child.join()



PRNT: send: 42
PRNT: send: some string
PRNT: send: {'one': 1}
PRNT: send: <__main__.CustomClass object at 0x72f89f3a63f0>
PRNT: send: None
CHLD: recv: 42
CHLD: recv: some string
CHLD: recv: {'one': 1}
CHLD: recv: <__main__.CustomClass object at 0x72f89f9754c0>


#### AND A LITTLE BIT MORE

Does your mind hurt yet.

In [None]:
from multiprocessing import Process, Pipe

def factorial_part(start, end, conn):
    # Calculate the factorial for a segment of the range
    result = 1
    for i in range(start, end):
        result *= i
    # Send the result through the Pipe
    conn.send(result)
    conn.close()

range_per_process = 100 // 4

# Create Pipes for communication between parent and child processes
parent_conns, child_conns = zip(*[Pipe() for _ in range(num_processes)]) #using zip to order them all

processes = [
    Process(target=factorial_part, args=(i * range_per_process + 1, (i + 1) * range_per_process + 1, child_conns[i]))
    for i in range(num_processes)
]


for p in processes:
    p.start()
for p in processes:
    p.join()

factorial_result = 1
for conn in parent_conns:
    factorial_result *= conn.recv()

print(f"{factorial_result:.15e}")


OverflowError: int too large to convert to float

Using fetch_rates and Manager

In [None]:
from multiprocessing import Process, Manager
import requests
import os
import time

def fetch_rate(bases, symbols=['eur', 'jpy', 'usd'], timing=False, shared_dict=None):
    """
    A function to fetch the conversion rate of a given input.Takes in the desired base currency and outputs 
    the given conversion rate to supplied symbols.
    """
    # Setup time for demonstration
    if timing:
        a = time.time()        

    # Run the main part of the function to get the rates
    web = "http://www.floatrates.com/daily/" + str(bases) + ".json"
    response = requests.get(web)
    rate = response.json()
    rate[bases] = {'rate': 1}

    # Update the shared dictionary with the rates
    shared_dict[bases] = {symbol: rate[symbol]['rate'] for symbol in symbols}

    if timing:
        print("Completed pid: " + str(os.getpid()))
        print("Time Elapsed on CPU:  {:.02f}s\n".format((time.time() - a)))



def multicore(debug=False):
    manager = Manager()
    shared_dict = manager.dict()
    bases = ['usd', 'eur', 'jpy']  # Example base currencies

    processes = [
        Process(target=fetch_rate, args=(base, ['eur', 'jpy', 'usd'], debug, shared_dict))
        for base in bases
    ]

    a = time.time()
    for p in processes:
        p.start()
    for p in processes:
        p.join()

    print("Total Time with processes Elapsed:  {:.02f}s".format(time.time() - a))
    print("Shared Dictionary:", shared_dict)

# Run the multicore function
multicore(True)


Completed pid: 52354
Time Elapsed on CPU:  0.44s

Completed pid: 52358
Time Elapsed on CPU:  0.47s

Completed pid: 52362
Time Elapsed on CPU:  0.48s

Total Time with processes Elapsed:  0.50s
Shared Dictionary: {'usd': {'eur': 0.91516746053894, 'jpy': 149.70470594683, 'usd': 1}, 'eur': {'eur': 1, 'jpy': 163.58176224782, 'usd': 1.0926961929034}, 'jpy': {'eur': 0.0061131509176739, 'jpy': 1, 'usd': 0.0066798167343864}}
