In [3]:
import time
import threading
# import multiprocessing
import multiprocess
import itertools
import os
import logging
import random
import string
import requests
from functools import partial
# from multiprocessing import Queue
from multiprocessing.pool import ThreadPool

# Definitions

In [None]:
# Processes are OS-level resource used to run and manage applications
# Processes have pid, list of open files and sockets, isolated memory,

In [None]:
# Special part of OS (scheduler) iterates over processes and give them time slot to run

# 2900 МГц
TIME_SLOT = 0.5/(1000*1000)
print(TIME_SLOT)
while True:
    for process in processes:
        process.run_for(TIME_SLOT)

In [None]:
# Process has 1 or more threads
# Threads have common memory

In [None]:
# 2 kinds of tasks:
- parallel execution: speedup
- background execution: 
    - daemon process: OS services
    - multithreaded process: notifications, monitor, watcher, etc

In [None]:
Parallel execution can be done in 2 ways:
    - threads
    - processes
    
Which one is better?

# Threads

In [7]:
class timer():
    def __init__(self, message):
        self.message = message

    def __enter__(self):
        self.start = time.time()
        return None

    def __exit__(self, type, value, traceback):
        elapsed_time = (time.time() - self.start)
        print(self.message.format(elapsed_time))

TIME_TO_SLEEP = 4
        
def long_running_task(n=TIME_TO_SLEEP):
    print(threading.current_thread())
#     with timer('Elapsed 2: {}s'):
    time.sleep(n)

# what is it?
with timer('Executed in: {}s'):
#     long_running_task()
    long_running_task()


<_MainThread(MainThread, started 50456)>
Executed in: 4.0016632080078125s


In [9]:
with timer('Executed in: {}s'):
    t1 = threading.Thread(target=long_running_task, args=(TIME_TO_SLEEP/2,))
    t2 = threading.Thread(target=long_running_task, args=(TIME_TO_SLEEP/2,))
    t1.start()
    t2.start()
    print(threading.current_thread())
    print('BEFORE T1 JOIN')
    t1.join() # be carefull
    print('BEFORE T2 JOIN')
    t2.join() 
    
print('next step')
# work with date

<Thread(Thread-7 (long_running_task), started 87356)>
<Thread(Thread-8 (long_running_task), started 50848)>
<_MainThread(MainThread, started 50456)>
BEFORE T1 JOIN
BEFORE T2 JOIN
Executed in: 2.0158863067626953s
next step


In [10]:
def run_threads(func, data, workers):
    threads = [
        threading.Thread(target=func, args=(data / workers, ))
        for _ in range(workers)
    ]
    for t in threads:
        t.start()
    for t in threads:
        t.join()

In [11]:
workers = 10
DATA_SIZE = 10

with timer('Elapsed: {}s'):
    run_threads(long_running_task, DATA_SIZE, workers)

<Thread(Thread-9 (long_running_task), started 87728)>
<Thread(Thread-10 (long_running_task), started 89860)>
<Thread(Thread-11 (long_running_task), started 83800)>
<Thread(Thread-12 (long_running_task), started 80704)>
<Thread(Thread-13 (long_running_task), started 82716)>
<Thread(Thread-14 (long_running_task), started 84812)>
<Thread(Thread-15 (long_running_task), started 67076)>
<Thread(Thread-16 (long_running_task), started 87712)>
<Thread(Thread-17 (long_running_task), started 89268)>
<Thread(Thread-18 (long_running_task), started 62708)>
Elapsed: 1.0213148593902588s


In [None]:
def generate_task_queue(total_tasks, queue_size):
    task_queue = []
    remainder = total_tasks
    while remainder > 0:
        value = min(remainder, random.randint(1, total_tasks//queue_size))
        remainder -= value
        task_queue.append(value)
    return task_queue

assert sum(generate_task_queue(1000, 20)) == 1000
assert sum(generate_task_queue(1000, 1)) == 1000
assert sum(generate_task_queue(1000, 1000)) == 1000
assert sum(generate_task_queue(1000, 42)) == 1000

In [12]:
# input_data = [DATA_SIZE / workers for _ in range(workers)]
input_data = [1, 2, 2, 1, 0.1, 0.1, 0.1, 0.1, 0.1, 0.1]
print(input_data)

with timer('Elapsed: {}s'):
    with ThreadPool(workers) as pool:
        pool.map(long_running_task, input_data)
#         for chunk in input_data:
#             long_running_task(chunk)

[1, 2, 2, 1, 0.1, 0.1, 0.1, 0.1, 0.1, 0.1]
<DummyProcess(Thread-19 (worker), started daemon 14268)>
<DummyProcess(Thread-20 (worker), started daemon 47832)>
<DummyProcess(Thread-21 (worker), started daemon 87372)>
<DummyProcess(Thread-22 (worker), started daemon 86112)>
<DummyProcess(Thread-23 (worker), started daemon 81992)>
<DummyProcess(Thread-24 (worker), started daemon 72244)>
<DummyProcess(Thread-25 (worker), started daemon 88468)>
<DummyProcess(Thread-26 (worker), started daemon 84100)>
<DummyProcess(Thread-27 (worker), started daemon 61188)>
<DummyProcess(Thread-28 (worker), started daemon 8452)>
Elapsed: 2.018418550491333s


<img src="https://www.nginx.com/wp-content/uploads/2016/07/thread-pools-worker-process-event-cycle.png">

# Real world task

In [14]:
def fetch_pic(num_pic):
# def fetch_pic(num_pic, path):
    url = 'https://picsum.photos/400/600'
    path = os.path.join(os.getcwd(), 'img')
    for _ in range(num_pic):
        random_name = ''.join(random.choices(string.ascii_letters + string.digits, k=5))
        response = requests.get(url)
        if response.status_code == 200:
            with open(f'{path}/{random_name}.jpg', 'wb') as f:
                f.write(response.content)
                print(f"Fetched pic [{os.getpid()}]: {f.name}")

In [24]:
workers = 128
DATA_SIZE = 100

with timer('Elapsed: {}s'):
#     with multiprocessing.Pool(workers) as pool:
    with ThreadPool(workers) as pool:
        input_data = [DATA_SIZE // workers for _ in range(workers)]
        # input_data = [(DATA_SIZE // workers, './pics') for _ in range(workers)]
        pool.map(fetch_pic, input_data)

# 1   - 110.45
# 2   - (55) 55
# 4   - (28) 35
# 8   - (25) 25
# 16  - 18
# 32  - 17
# 64  - 10
# 100 - 17
# 128 - 0.12

Elapsed: 0.12281203269958496s


# IO vs CPU bound tasks

In [25]:
DATA_SIZE = 10_000_000
lst2 = []
lst1 = []


def fill_data2(n):
    print(threading.current_thread())
    while n > 0:
        n -= 1
        lst2.append(random.randint(1, 100))

def fill_data1(n):
    print(threading.current_thread())
    while n > 0:
        n -= 1
        lst1.append(random.randint(1, 100))        
        
with timer('Elapsed: {}s'):
#     fill_data(DATA_SIZE, lst)
    fill_data1(DATA_SIZE)

<_MainThread(MainThread, started 50456)>
Elapsed: 5.10992956161499s


In [None]:
Global Interpreter Lock

t1 t2 M
   |  |
|     |
   |  |
|     |
   |  |
______
GIL


for i in range(10):
    ...

t1 i == t2 i    
    
i=0 
 1     2  M
 |     | 
 i=10  i=0
       i=10
       X
 


In [26]:
with timer('Elapsed: {}s'):
#     t1 = threading.Thread(target=fill_data, args=(DATA_SIZE // 2, lst))
#     t2 = threading.Thread(target=fill_data, args=(DATA_SIZE // 2, lst))
    t1 = threading.Thread(target=fill_data2, args=(DATA_SIZE // 2,))
    t2 = threading.Thread(target=fill_data1, args=(DATA_SIZE // 2,))
    t1.start()
    t2.start()
    t1.join()
    t2.join()
    lst = lst2 + lst1
    
print(len(lst), lst[:100])    

<Thread(Thread-549 (fill_data2), started 86664)>
<Thread(Thread-550 (fill_data1), started 43348)>
Elapsed: 4.527665615081787s
20000000 [66, 98, 47, 48, 63, 85, 48, 82, 67, 74, 34, 27, 51, 68, 79, 31, 43, 28, 68, 98, 86, 69, 22, 76, 1, 44, 14, 96, 99, 23, 63, 96, 97, 56, 9, 34, 72, 20, 81, 78, 57, 50, 31, 87, 50, 97, 20, 2, 24, 28, 79, 5, 93, 72, 85, 94, 66, 90, 24, 56, 4, 93, 3, 41, 61, 52, 62, 18, 57, 86, 35, 99, 55, 66, 64, 23, 54, 99, 21, 13, 3, 79, 27, 54, 3, 76, 51, 87, 89, 83, 63, 3, 24, 25, 36, 87, 35, 9, 80, 18]


In [30]:
lst = []
workers = 8
with timer('Elapsed: {}s'):
    with ThreadPool(workers) as pool:
        input_data = [DATA_SIZE // workers for _ in range(workers)]
#         pool.map(partial(fill_data, lst=lst), input_data)
        result = pool.map(fill_data1, input_data)
        
print(len(lst), lst[:100])

# 1 - 4.62
# 2 - 4.45
# 4 - 4.51
# 8 - 4.60

<DummyProcess(Thread-567 (worker), started daemon 89120)><DummyProcess(Thread-568 (worker), started daemon 79924)>
<DummyProcess(Thread-569 (worker), started daemon 80120)>
<DummyProcess(Thread-570 (worker), started daemon 70336)>

<DummyProcess(Thread-571 (worker), started daemon 73784)>
<DummyProcess(Thread-572 (worker), started daemon 86924)>
<DummyProcess(Thread-573 (worker), started daemon 82376)>
<DummyProcess(Thread-574 (worker), started daemon 50560)>
Elapsed: 4.603038549423218s
0 []


In [35]:
lst = []
workers = 16

DATA_SIZE = 10_000_000
lst = []


def fill_data(n):
    # print(threading.current_thread())
    import random
    lst = []
    while n > 0:
        n -= 1
        lst.append(random.randint(1, 100))
        
with timer('Elapsed: {}s'):
    with multiprocess.Pool(workers) as pool:
        input_data = [DATA_SIZE // workers for _ in range(workers)]
        pool.map(fill_data, input_data)
        
print(len(lst), lst[:100])

# 5.31 - 1
# 2.71 - 2
# 1.58 - 4
# 1.43 - 8
# 1.63 - 16


Elapsed: 1.6338353157043457s
0 []


In [None]:
def factorize_naive(n):
    """ A naive factorization method. Take integer 'n', return list of
        factors.
    """
    if n < 2:
        return []
    factors = []
    p = 2

    while True:
        if n == 1:
            return factors

        r = n % p
        if r == 0:
            factors.append(p)
            n = n / p
        elif p * p >= n:
            factors.append(n)
            return factors
        elif p > 2:
            # Advance in steps of 2 over odd numbers
            p += 2
        else:
            # If p == 2, get to 3
            p += 1

    assert False, "unreachable"

In [None]:
DATA_SIZE = 1_000

In [None]:
result = {}
workers = 16
with timer('Elapsed: {}s'):
    with multiprocess.Pool(workers) as pool:
        input_data = (i for i in range(1, DATA_SIZE+1))
        result = [
            (n, factors)
            for n, factors in enumerate(pool.map(factorize_naive, input_data), 1)
        ]
    
print(len(result), result[:100])

In [None]:
result = {}
workers = 16
with timer('Elapsed: {}s'):
    with ThreadPool(workers) as pool:
        input_data = (i for i in range(1, DATA_SIZE+1))
        result = [
            (n, factors)
            for n, factors in enumerate(pool.map(factorize_naive, input_data), 1)
        ]
    
print(len(result), result[:100])

In [None]:
x = 1

In [None]:
x