# Multithreading

In [None]:
# Let's simulate IO-bound task
# Each task execution is randomly delayed
# (e.g. waiting a response from database)

import time
import random

def io_task(process_id):
    seconds = random.randrange(1, 4)
    print(f"Process {process_id} started, response in {seconds} second(s)")
    time.sleep(seconds)
    print(f"Process {process_id} finished (after {seconds} second(s))")

io_task(1)

Process 1 started, response in 1 second(s)
Process 1 finished (after 1 second(s))


In [None]:
%%time

for process_id in range(5):
    io_task(process_id)

Process 0 started, response in 2 second(s)
Process 0 finished (after 2 second(s))
Process 1 started, response in 2 second(s)
Process 1 finished (after 2 second(s))
Process 2 started, response in 1 second(s)
Process 2 finished (after 1 second(s))
Process 3 started, response in 1 second(s)
Process 3 finished (after 1 second(s))
Process 4 started, response in 2 second(s)
Process 4 finished (after 2 second(s))
CPU times: user 43 ms, sys: 7.08 ms, total: 50.1 ms
Wall time: 8.01 s


In [None]:
# Entering Threads

import threading

t = threading.Thread(target=io_task, args=(42,))

t.start()
t.join()

# same as:
# t.run()

Process 42 started, response in 2 second(s)
Process 42 finished (after 2 second(s))


In [None]:
%%time

for process_id in range(5):
    t = threading.Thread(target=io_task, args=(process_id,))
    t.start()
    t.join()

Process 0 started, response in 2 second(s)
Process 0 finished (after 2 second(s))
Process 1 started, response in 1 second(s)
Process 1 finished (after 1 second(s))
Process 2 started, response in 2 second(s)
Process 2 finished (after 2 second(s))
Process 3 started, response in 1 second(s)
Process 3 finished (after 1 second(s))
Process 4 started, response in 3 second(s)
Process 4 finished (after 3 second(s))
CPU times: user 50.8 ms, sys: 7.38 ms, total: 58.2 ms
Wall time: 9.02 s


In [None]:
%%time

threads = []

for process_id in range(5):
    t = threading.Thread(target=io_task, args=(process_id,))
    threads.append(t)

for t in threads:
    t.start()

for t in threads:
    t.join()

Process 0 started, response in 2 second(s)
Process 1 started, response in 2 second(s)
Process 2 started, response in 2 second(s)
Process 3 started, response in 3 second(s)Process 4 started, response in 2 second(s)

Process 0 finished (after 2 second(s))
Process 1 finished (after 2 second(s))
Process 2 finished (after 2 second(s))
Process 4 finished (after 2 second(s))
Process 3 finished (after 3 second(s))
CPU times: user 22.1 ms, sys: 5.13 ms, total: 27.2 ms
Wall time: 3.01 s


## Threads and returned values

In [None]:
# Unfortunately, Thread cannot return a result of task execution

def io_task_with_return(process_id):
    seconds = random.randrange(1, 4)
    print(f"Process {process_id} started, response in {seconds} second(s)")
    time.sleep(seconds)
    print(f"Process {process_id} finished (after {seconds} second(s))")
    return f"Here is the result: {random.random()}"

result = io_task_with_return(1)

print(result)

Process 1 started, response in 1 second(s)
Process 1 finished (after 1 second(s))
Here is the result: 0.5809050753995355


In [None]:
# However, no results from a thread...

t = threading.Thread(target=io_task_with_return, args=(42,))

t.start()

t.join()

# ¯\_(ツ)_/¯

Process 42 started, response in 1 second(s)
Process 42 finished (after 1 second(s))


In [None]:
# Solution 1: use mutable global object
# Step 1: Modify the task
# Step 2: Pass global object to it

def io_task_with_return_2(process_id, results_object):
    seconds = random.randrange(1, 4)
    print(f"Process {process_id} started, response in {seconds} second(s)")
    time.sleep(seconds)
    print(f"Process {process_id} finished (after {seconds} second(s))")
    result = f"Here is the result: {random.random()}"
    results_object.append(result)
    return result

results = []

t = threading.Thread(target=io_task_with_return_2, args=(42, results))
t.start()
t.join()

print(results)

Process 42 started, response in 2 second(s)
Process 42 finished (after 2 second(s))
['Here is the result: 0.474396006472352']


In [None]:
# Solution 2: use threads from `concurrent.futures` library

import concurrent.futures

with concurrent.futures.ThreadPoolExecutor() as executor:
    future = executor.submit(io_task_with_return, 42)
    return_value = future.result()
    print(return_value)


Process 42 started, response in 1 second(s)
Process 42 finished (after 1 second(s))
Here is the result: 0.4931428335578516


In [None]:
%%time

with concurrent.futures.ThreadPoolExecutor() as executor:
    futures = [executor.submit(io_task_with_return, id) for id in range(5)]
    results = [future.result() for future in futures]
    print('All results:')
    [print(_) for _ in results]


Process 0 started, response in 3 second(s)
Process 1 started, response in 2 second(s)
Process 2 started, response in 1 second(s)
Process 3 started, response in 2 second(s)
Process 4 started, response in 3 second(s)
Process 2 finished (after 1 second(s))
Process 3 finished (after 2 second(s))
Process 1 finished (after 2 second(s))
Process 0 finished (after 3 second(s))
Process 4 finished (after 3 second(s))
All results:
Here is the result: 0.22190182858607888
Here is the result: 0.4953590503339743
Here is the result: 0.3410620585420795
Here is the result: 0.8795457966231804
Here is the result: 0.3498200449407156
CPU times: user 26.2 ms, sys: 5.27 ms, total: 31.4 ms
Wall time: 3.01 s


# Multiprocessing

In [None]:
# Check cores count

import multiprocessing

multiprocessing.cpu_count()

2

In [None]:
process = multiprocessing.Process(target=lambda x: print(x), args=(42,))

process.start()
process.join()

42


In [None]:
import time
# Let's simulate CPU-bound task

def cpu_task(process_id):
    start_time = time.time()
    print(f'Process {process_id} started')
    result = sum(range(100_000_000))
    end_time = time.time()
    print(f'Process {process_id} finished, time:', end_time - start_time)
    return result

cpu_task(1)

Process 1 started
Process 1 finished, time: 2.37158465385437


4999999950000000

In [None]:
%%time

for process_id in range(2):
    cpu_task(process_id)

Process 0 started
Process 0 finished, time: 2.075269937515259
Process 1 started
Process 1 finished, time: 2.136685609817505
CPU times: user 4.2 s, sys: 19.9 ms, total: 4.22 s
Wall time: 4.21 s


In [None]:
%%time

processes = 4

with multiprocessing.Pool(processes=processes) as pool:
    results = pool.starmap(cpu_task, [(i,) for i in range(processes)])
    print(results)


Process 0 startedProcess 1 startedProcess 3 started
Process 2 started


Process 0 finished, time: 10.336417436599731
Process 3 finished, time: 10.383132457733154
Process 1 finished, time: 10.44711422920227
Process 2 finished, time: 10.492219686508179
[4999999950000000, 4999999950000000, 4999999950000000, 4999999950000000]
CPU times: user 106 ms, sys: 42.6 ms, total: 148 ms
Wall time: 10.6 s


In [None]:
# Try this code on your own machine

import time
import multiprocessing

def cpu_task(process_id):
    start_time = time.time()
    print(f'Process {process_id} started')
    result = sum(range(100_000_000))
    end_time = time.time()
    print(f'Process {process_id} finished, time:', end_time - start_time)
    return result

if __name__ == '__main__':
    print('Cores:', multiprocessing.cpu_count())

    print('Single task:')
    cpu_task(1)

    print('Multiprocessing pool:')
    processes = 4
    with multiprocessing.Pool(processes=processes) as pool:
        results = pool.map(cpu_task, [(i,) for i in range(processes)])
        print(results)

My output:
```
% time python test.py
Cores: 8
Single task:
Process 1 started
Process 1 finished, time: 2.047819137573242
Multiprocessing pool:
Process (0,) started
Process (1,) started
Process (2,) started
Process (3,) started
Process (2,) finished, time: 2.899091958999634
Process (1,) finished, time: 2.988703966140747
Process (0,) finished, time: 2.993403911590576
Process (3,) finished, time: 2.966623067855835
[4999999950000000, 4999999950000000, 4999999950000000, 4999999950000000]
python test.py  8.61s user 0.43s system 162% cpu 5.566 total
```

# Project 2

In [None]:
# Batching files

from typing import List, DefaultDict

# Batch files based on the number of processes.
# (split file paths equally into N buckets)

file_paths = [f'{i:02}.csv' for i in range(11)]
n_processes = 3

def batch_files(file_paths: List[str], n_processes: int) -> List[set]:
    if n_processes > len(file_paths):
        return []

    #batches = []

    # n_per_batch = len(file_paths) // n_processes
    # first_list = file_paths[0: n_per_batch * n_processes]
    # print(first_list)


    batches = DefaultDict(list)

    for i, file in enumerate(file_paths):
        batches[i % n_processes].append(file)

    return [batches[i] for i in batches]


#print(file_paths)
#print(n_processes)
batch_files(file_paths, n_processes)

[['00.csv', '03.csv', '06.csv', '09.csv'],
 ['01.csv', '04.csv', '07.csv', '10.csv'],
 ['02.csv', '05.csv', '08.csv']]

In [None]:
# Multirocessing Pool

# Suppose we have run() function that takes
# a list of file paths and process them
def run(file_paths, process_id):
    import random
    results = [random.random() for file in file_paths]
    return sum(results)


def main(file_paths):
    # use batch_files() to split files
    batches = [] # CODE HERE

    # use multiprocessing.pool to process each batch with run() function
    results = [] # CODE HERE

    return results

run(file_paths, 1)