# Процессы

In [1]:
import random
import os
import time
import threading
import multiprocessing
import os

In [2]:
# Типы задач
def cpu_bound_task(n):
    def fib(n):
        if n == 0:
            return 0
        elif n == 1:
            return 1
        return fib(n-1) + fib(n-2)

    print(f'{n} -> {fib(n)}')


def io_bound_task(ind):
    print(f'start task {ind}')
    time.sleep(5)
    print(f'finish task {ind}\n', end='')

In [3]:
%%time

# CPU bound задача на потоках
tasks = []

for number in [36, 35, 34, 33, 32, 31, 30, 25, 20, 15, 10]:
    task = threading.Thread(target=cpu_bound_task, args=(number,))
    tasks.append(task)
    task.start()

for task in tasks:
    task.join()

20 -> 6765
15 -> 610
10 -> 55
25 -> 75025
30 -> 832040
31 -> 1346269
32 -> 2178309
33 -> 3524578
34 -> 5702887
35 -> 9227465
36 -> 14930352
CPU times: user 22.9 s, sys: 405 ms, total: 23.3 s
Wall time: 23.7 s


In [4]:
%%time

# CPU bound задача на процессах
tasks = []

for number in [36, 35, 34, 33, 32, 31, 30, 25, 20, 15, 10]:
    task = multiprocessing.Process(target=cpu_bound_task, args=(number,))
    tasks.append(task)
    task.start()

for task in tasks:
    task.join()


15 -> 610
20 -> 6765
10 -> 55
25 -> 75025
30 -> 832040
31 -> 1346269
32 -> 2178309
33 -> 3524578
34 -> 5702887
35 -> 9227465
36 -> 14930352
CPU times: user 20.9 ms, sys: 37 ms, total: 57.9 ms
Wall time: 15 s


In [5]:
%%time

# IO bound задача на потоках
tasks = []

for ind in range(10):
    task = threading.Thread(target=io_bound_task, args=(ind,))
    tasks.append(task)
    task.start()

for task in tasks:
    task.join()

start task 0
start task 1
start task 2
start task 3
start task 4
start task 5
start task 6
start task 7
start task 8
start task 9
finish task 0
finish task 1
finish task 2
finish task 3
finish task 4
finish task 6
finish task 8
finish task 9
finish task 7
finish task 5
CPU times: user 8.61 ms, sys: 6.21 ms, total: 14.8 ms
Wall time: 5.01 s


In [6]:
%%time

# IO bound задача на процессах
tasks = []

for ind in range(10):
    task = multiprocessing.Process(target=io_bound_task, args=(ind,))
    tasks.append(task)
    task.start()

for task in tasks:
    task.join()

start task 0
start task 1
start task 2
start task 3
start task 4
start task 5
start task 6
start task 7
start task 9
start task 8
finish task 0
finish task 1
finish task 2
finish task 3
finish task 4
finish task 6
finish task 5
finish task 7
finish task 8
finish task 9
CPU times: user 25.8 ms, sys: 47.2 ms, total: 73 ms
Wall time: 5.1 s


In [7]:
# Получение идентификатора процесса
def io_bound_task(ind):
    print(f'start task {ind} with {os.getpid()}')
    time.sleep(30)
    print(f'finish task {ind}')
    
tasks = []

for ind in range(10):
    task = multiprocessing.Process(target=io_bound_task, args=(ind,))
    tasks.append(task)
    task.start()

for task in tasks:
    task.join()

start task 0 with 34241
start task 1 with 34242
start task 3 with 34244
start task 2 with 34243
start task 6 with 34247
start task 4 with 34245
start task 5 with 34246
start task 7 with 34248
start task 8 with 34249
start task 9 with 34250
finish task 0
finish task 1
finish task 3
finish task 2
finish task 4
finish task 6
finish task 5
finish task 7
finish task 8
finish task 9


In [8]:
# Пулл процессов
from multiprocessing import Pool

def getpid(n):
    time.sleep(2)
    return os.getpid()

with Pool(3) as p:
    print(p.map(getpid, range(5)))

[34373, 34374, 34375, 34374, 34373]


In [9]:
# Шаринг ресурсов
share_memory = {
    'count': 0,
}
def share_memory_task():
    print(f'read {share_memory["count"]}')
    share_memory['count'] += 1
    print(f'write {share_memory["count"]}')

tasks = []

for _ in range(5):
    task = multiprocessing.Process(target=share_memory_task)
    tasks.append(task)
    task.start()
    
for task in tasks:
    task.join()

read 0
read 0
read 0
write 1
write 1
read 0
write 1
read 0
write 1
write 1


In [10]:
# Шаринг ресурсов через файл
filename = 'share_memory.tmp'

with open(filename, 'w') as fd:
    fd.write('0')

def share_memory_task():
    with open(filename) as fd:
        count = int(fd.read())
    print(f'read {count}')

    with open(filename, 'w') as fd:
        fd.write(str(count + 1))
    print(f'write {count + 1}')


tasks = []

for _ in range(10):
    task = multiprocessing.Process(target=share_memory_task)
    tasks.append(task)
    task.start()
    
for task in tasks:
    task.join()

with open(filename) as fd:
    print(f'COUNT: {fd.read()}')

read 0
write 1
read 0
write 1
read 1
read 1
read 2
write 2
write 2
write 3
read 3
write 4
read 4
read 4
write 5
write 5
read 5
write 6
read 5
write 6
COUNT: 6


In [11]:
# Шаринг ресурсов через файл с блокировкой
filename = 'share_memory.tmp'

# TODO исправить ошибку
# Ищите помощь в локументации https://docs.python.org/3.6/library/multiprocessing.html


with open(filename, 'w') as fd:
    fd.write('0')

def share_memory_task():
    with open(filename) as fd:
        count = int(fd.read())
    print(f'read {count}')

    with open(filename, 'w') as fd:
        fd.write(str(count + 1))
    print(f'write {count + 1}')


tasks = []

for _ in range(10):
    task = multiprocessing.Process(target=share_memory_task)
    tasks.append(task)
    task.start()
    
for task in tasks:
    task.join()

with open(filename) as fd:
    print(f'COUNT: {fd.read()}')

read 0
write 1
read 1
write 2
read 2
write 3
read 3
write 4
read 4
write 5
read 5
write 6
read 6
write 7
read 7
write 8
read 8
write 9
read 9
write 10
COUNT: 10


In [12]:
# Шаринг ресурсов через очередь
from multiprocessing import Queue

q = Queue()
q.put(0)

def share_memory_task(q):
    count = q.get()
    print(f'read {count}')

    q.put(count + 1)
    print(f'write {count + 1}')


tasks = []

for _ in range(10):
    task = multiprocessing.Process(target=share_memory_task, args=(q,))
    tasks.append(task)
    task.start()
    
for task in tasks:
    task.join()

print(f'COUNT: {q.get()}')

read 0
write 1
read 1
write 2
read 2
write 3
read 3
write 4
read 4
write 5
read 5
write 6
read 6
write 7
read 7
write 8
read 8
write 9
read 9
write 10
COUNT: 10


# Ассинхронность

In [13]:
import asyncio
import random
import time

loop = asyncio.get_event_loop()

async def sleep(delay):
    await asyncio.sleep(delay)

In [14]:
async def io_bound_task(group_name):
    print(f'group is "{group_name}"')
    await sleep(2)
    count = random.randint(100, 200)
    print(f'count {count}')
    return count

In [15]:
%%time

# Последовательное выполнение
async def main():
    count = 0
    for name in ['Amir', 'Zarina', 'Misha', 'Ilya', 'Igor']:
        count += await io_bound_task(name)
    print(f'COUNT: {count}')

loop.run_until_complete(main())

group is "Amir"
count 155
group is "Zarina"
count 112
group is "Misha"
count 125
group is "Ilya"
count 115
group is "Igor"
count 135
COUNT: 642
CPU times: user 8 ms, sys: 3.18 ms, total: 11.2 ms
Wall time: 10 s


In [16]:
%%time

# Совместное выполнение
async def main():
    tasks = []
    count = 0
    for name in ['Amir', 'Zarina', 'Misha', 'Ilya', 'Igor']:
        tasks.append(io_bound_task(name))
        
    results = await asyncio.gather(*tasks)
    print(f'COUNT: {sum(results)}')

loop.run_until_complete(main())

group is "Zarina"
group is "Amir"
group is "Igor"
group is "Ilya"
group is "Misha"
count 166
count 129
count 109
count 179
count 200
COUNT: 783
CPU times: user 3.1 ms, sys: 895 µs, total: 4 ms
Wall time: 2.01 s


In [17]:
%%time

# Блокирущая операция
async def sleep(delay):
    time.sleep(delay)

loop.run_until_complete(main())

group is "Ilya"
count 130
group is "Igor"
count 124
group is "Zarina"
count 177
group is "Misha"
count 117
group is "Amir"
count 160
COUNT: 708
CPU times: user 6.53 ms, sys: 2.59 ms, total: 9.12 ms
Wall time: 10 s


In [18]:
# Обработка ошибок
async def sleep(delay):
    await asyncio.sleep(delay)
    
async def error_task():
    raise ValueError 
    
async def main():
    tasks = [error_task()]
    for name in ['Amir', 'Zarina', 'Misha', 'Ilya', 'Igor']:
        tasks.append(io_bound_task(name))
        
    try:
        results = await asyncio.gather(*tasks)
    except ValueError:
        print('Error')
    else:
        print(f'COUNT: {sum(results)}')

loop.run_until_complete(main())

group is "Ilya"
group is "Amir"
group is "Zarina"
group is "Igor"
group is "Misha"
Error


In [19]:
# Обработка ошибок
async def main():
    tasks = [error_task()]
    
    for name in ['Amir', 'Zarina', 'Misha', 'Ilya', 'Igor']:
        tasks.append(io_bound_task(name))
        
    count = 0
    results = await asyncio.gather(*tasks, return_exceptions=True)
    # TODO Посчитать сумму участников 
            
    print(f'COUNT: {count}')

loop.run_until_complete(main())

group is "Ilya"
group is "Misha"
group is "Zarina"
group is "Amir"
group is "Igor"
count 152
count 111
count 142
count 194
count 173
count 123
count 160
count 161
count 116
count 105
[ValueError(), 116, 161, 160, 123, 105]
COUNT: 105


# Генераторы