# Основы Python. Часть 7

## Многопоточность

**Мотивация**: производительность современных вычислительных машин обеспечивается за счёт кол-ва ядер процессора, в то время как частота процессора растёт слабо.

### Отличие Thread от Process

В стандартной библиотеке Python есть два модуля, подерживающие параллельные вычисления: Thread и Process

#### Thread (поток)

- Выполняются в одном процессе из-за GIL (Global Interpreter Lock). В интерпретаторе Python есть блокировка, которая не позволяет одному экземпляру интерпретатора выполняться одновременно в нескольких процессах. Таким образом, Thread - это псевдопараллельные вычисления; в реальности после выполнения одного потока идёт переключение на другой и обратно.
- Так как выполнение в рамках одного процесса и адресного пространства, то внутри Thread доступны все данные, объявленные в scope и переданные потоку

_Use case_: 

- Скорость не критична, можно пожертвовать ради удобства использования общего адресного пространства
- Есть один нагруженный поток с вычислениями, а другие часто находятся в режиме ожидания

#### Process (процесс)

- Запускаются как отдельные экземпляры интерпретатора, поэтому выполняется естественный обход GIL
- Для коммуникаций между процессами и для выделения адресного пространства между процессами необходимо использовать специальные примитивы
- Процессы могут использовать все ядра процессора (как именно происходит это использование решает операционная система)
- Процессы могут запускаться на разных машинах

_Use case_: 

- Скорость критична и распараллеливание вычислений даёт существенный выигрыш
- Больше одного нагруженного процесса (например, один принимает данные, а другой их обрабатывает в реальном времени)

In [43]:
import multiprocessing as mp

from ctypes import c_bool, c_uint32, c_float

import os
import time
import random

In [2]:
n_cpu = mp.cpu_count()

print('I have {} CPU cores'.format(n_cpu))

I have 8 CPU cores


### Пример запуска простого процесса

In [3]:
# функция, которую мы хотим запустить в отдельном потоке
def func(a):
    
    print('Я поток process. Жду 2 секунды и печатаю вывод.')
    
    time.sleep(2)    
    
    print('Я поток process. Результат выполнения: {}'.format(a ** 3))

# создаём отдельный поток
process = mp.Process(target=func, args=(80,))

# запускаем его
process.start()

print('Я основной поток. Продолжаю выполняться.')

# На некоторое время (3+ секунд) здесь одновременно выполняются два потока:
# - основной (main)
# - поток process

# ждём завершения потока process (это блокирующая операция)
process.join()

# здесь остался только основной поток

Я поток process. Жду 2 секунды и печатаю вывод.
Я основной поток. Продолжаю выполняться.
Я поток process. Результат выполнения: 512000


### Пример запуска двух процессов

In [4]:
def func(a):
    time.sleep(2)
    print(a ** 3)
    
process_1 = mp.Process(target=func, args=(2,))    
process_2 = mp.Process(target=func, args=(4,))

process_1.start()
process_2.start()

process_1.join()
process_2.join()

8
64


### Пример запуска нескольких процессов

In [5]:
def func(a):
    time.sleep(2)
    print(a ** 3)

In [6]:
%%time

processes = []

n_cpu = mp.cpu_count()

for i in range(n_cpu):
    
    random_number = random.randint(10, 100)
    
    process = mp.Process(target=func, args=(random_number,))
    
    process.start()
    
    processes.append(process)
    
print('Мы запустили {} процессов. Ждём их выполнения.'.format(n_cpu))    

for i in range(n_cpu):
    
     processes[i].join()  
        
print('Все процессы выполнелись и завершились.')

Мы запустили 8 процессов. Ждём их выполнения.
1728
9261
42875
32768
13824
64000
1728
4096
Все процессы выполнелись и завершились.
CPU times: user 12.8 ms, sys: 20.4 ms, total: 33.2 ms
Wall time: 2.06 s


### Пример работы с пулом потоков

In [7]:
def func(a):
    time.sleep(2)
    print(a ** 3)

In [8]:
%%time

random_numbers = random.sample(range(10, 100), n_cpu)

with mp.Pool(processes=n_cpu) as pool:        
    
    pool.map(func, random_numbers)

64000
175616
493039
59319
571787
148877
140608
456533
CPU times: user 7.82 ms, sys: 20.1 ms, total: 27.9 ms
Wall time: 2.14 s


### Сравнение производительности последовательного и параллельного выполнения

In [9]:
def func_h(x):
    i = 0
    while i < 20:
        x = x * x
        i += 1    
    
random_numbers = random.sample(range(50), n_cpu)    

Если был бы один поток (последовательное выполнение)

In [10]:
%%timeit 3

for random_number in random_numbers:
    
    func_h(random_number)

1.75 s ± 12.6 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


Запускаем пул потоков (параллельное выполнение)

In [11]:
%%timeit 3

with mp.Pool(processes=n_cpu) as pool:        
    
    pool.map(func_h, random_numbers) 

653 ms ± 72.6 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


Загрузка процессора. Сначала при последовательном выполнении. Затем при параллельном.

<img src="./python_7_files/seq_vs_parallel.png">

## Блокировки и использование with

Здесь и далее для краткости записи мы будем использовать with вместо явного блокирования/разблокирования.

Например, следующая запись для mp.Value, обладающего встроенной блокировкой:

In [12]:
v = mp.Value(c_uint32)

v.acquire()   # lock v for changing in other processes

v.value += 1  # do something with v

v.release()   # unlock v


Эквивалентнa:

In [13]:
with v.get_lock():
    v.value += 1

Эквивалентнa:

In [14]:
with v:
    v.value += 1

## Разделяемые данные

Рассмотрим варианты общего доступа к памяти со стороны нескольких процессов.

- RawValue
- Value
- RawArray
- Array

Также есть способ разделения некоторых Python объектов с помощью менеджера

- Manager

RawValue - выделение общей памяти для одного значения определённого типа (без гарантии синхронизации).

Пример использования RawValue:

In [15]:
raw_value = mp.RawValue(c_float, 0.0)

def func(v):
    
    v.value = v.value + 1.0

    print(v)
    
processes = [ mp.Process(target=func, args=(raw_value,)) for i in range(n_cpu) ]

_ = list(map(lambda p: p.start(),  processes))
_ = list(map(lambda p: p.join(),  processes))

c_float(1.0)
c_float(2.0)
c_float(3.0)
c_float(4.0)
c_float(5.0)
c_float(6.0)
c_float(7.0)
c_float(8.0)


**Внимание** Работа с RawValue в общем случае не атомарна, поэтому необходимо использовать **Value**

In [16]:
value = mp.Value(c_float, 0.0)

def func(v):
    
    with v:
        
        v.value = v.value + 1.0        
    
    print(v)
    
processes = [ mp.Process(target=func, args=(value,)) for i in range(n_cpu) ]

_ = list(map(lambda p: p.start(),  processes))
_ = list(map(lambda p: p.join(),  processes))

<Synchronized wrapper for c_float(1.0)>
<Synchronized wrapper for c_float(2.0)>
<Synchronized wrapper for c_float(3.0)>
<Synchronized wrapper for c_float(4.0)>
<Synchronized wrapper for c_float(5.0)>
<Synchronized wrapper for c_float(6.0)>
<Synchronized wrapper for c_float(7.0)>
<Synchronized wrapper for c_float(8.0)>


Пример работы с RawArray

In [17]:
raw_array = mp.RawArray(c_float, 10000)

In [18]:
%%timeit 3

def func(arr):
    for i in range(len(arr)):
        arr[i] += 1.0

processes = [ mp.Process(target=func, args=(raw_array,)) for i in range(n_cpu) ]

list(map(lambda p: p.start(),  processes))
list(map(lambda p: p.join(),  processes))        

12.3 ms ± 189 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)


**Внимание** здесь та же самая особенность, как и с RawValue. Нет синхронизации. При одновременном изменении переменной со стороны разных потоков в общем случае нет гарантии того, что это произойдёт так как мы видим. 

Пример использования Array

In [19]:
array = mp.Array(c_float, 10000)

In [20]:
%%timeit 2

def func(arr):
    with arr:  # блокировка
        for i in range(len(arr)):
            arr[i] += 1.0

processes = [ mp.Process(target=func, args=(array,)) for i in range(n_cpu) ]

list(map(lambda p: p.start(),  processes))
list(map(lambda p: p.join(),  processes))

153 ms ± 496 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)


Использование менеджера для доступа к общим объектам.

In [21]:
def func(l):
    for i in range(len(l)):
        l[i] += 1

with mp.Manager() as manager:
    
    _list = manager.list([1, 2, 3])   
            
    processes = [ mp.Process(target=func, args=(_list,)) for i in range(n_cpu) ]
    
    list(map(lambda p: p.start(),  processes))
    list(map(lambda p: p.join(),  processes))    
    
    print(_list)

[9, 10, 11]


Попытка разделить объект класса между процессами.

In [22]:
class A:
    def __init__(self):
        self.__i = 0
        
    def set_i(self, i):
        self.__i = i
        
    def __repr__(self):
        return 'A({})'.format(self.__i)
        
def func(objs):    
    for obj in objs:        
        obj.set_i(2)

with mp.Manager() as manager:
    
    _list = manager.list([A(), A(), A()])   
            
    processes = [ mp.Process(target=func, args=(_list,)) for i in range(n_cpu) ]
    
    list(map(lambda p: p.start(),  processes))
    list(map(lambda p: p.join(),  processes))    
    
    print(_list)


[A(0), A(0), A(0)]


Но это сработает с потоками (Thread)

In [23]:
import threading as thr

class A:
    def __init__(self):
        self.__i = 0
        
    def set_i(self, i):
        self.__i = i
        
    def __repr__(self):
        return 'A({})'.format(self.__i)
        
def func(objs):    
    for obj in objs:        
        obj.set_i(2)     
        
_list = [A(), A(), A()]        

threads = [ thr.Thread(target=func, args=(_list,)) for i in range(n_cpu) ]

_ = list(map(lambda p: p.start(),  threads))
_ = list(map(lambda p: p.join(),  threads))         
        
_list

[A(2), A(2), A(2)]

Если в лист положить объекты класса, то так не заработает. Объект не модифицировался.

https://docs.python.org/3.5/library/multiprocessing.html#multiprocessing.managers.SyncManager.list

Все типы, которые можно сделать разделяемые с помощью менеджера:
    
https://docs.python.org/3.5/library/multiprocessing.html#managers

## Коммуникации

Между процессами можно перемещать любые сериализуемые Python объекты.

- Queue
- Pipe

Пример очереди (Queue) между процессами.

In [24]:
def reporter(q):
    for i in range(5):                
        q.put('Hello, {}'.format(i))                
        time.sleep(0.005)

def consumer(q):    
    while not q.empty():        
        res = q.get()
        print(res)
        time.sleep(0.5)
    
queue = mp.Queue()

process_1 = mp.Process(target=reporter, args=(queue,))
process_2 = mp.Process(target=consumer, args=(queue,))

process_1.start()
process_2.start()

process_1.join()
process_2.join()


Hello, 0
Hello, 1
Hello, 2
Hello, 3
Hello, 4


Пример использования прямого соединения между процессами (Pipe)

In [25]:
def reporter(conn_write):
    for i in range(5):                
        conn_write.send('Hello, {}'.format(i))                
        time.sleep(0.005)

def consumer(conn_read):
    while True:
        res = conn_read.recv()
        print(res)
        time.sleep(0.5)
        if not conn_read.poll():
            break
    
conn_read, conn_write = mp.Pipe()

process_1 = mp.Process(target=reporter, args=(conn_write,))
process_2 = mp.Process(target=consumer, args=(conn_read,))

process_1.start()
process_2.start()

process_1.join()
process_2.join()


Hello, 0
Hello, 1
Hello, 2
Hello, 3
Hello, 4


### Классы процессов

Можно создать класс процесса, унаследовавшись от Process

In [26]:
class MyProcess(mp.Process):
    def __init__(self):
        super().__init__()
        
    def run(self):
        print('MyProcess started')
        time.sleep(2)
        print('MyProcess finished')
        
p = MyProcess()
p.start()

MyProcess started
MyProcess finished


Преимущество классов в том, что можно инкапсулировать поведение и атрибуты внутри класса. Например, можно таким образом сделать UDP сервер.

In [27]:
import socket

class UDPServer(mp.Process):
    def __init__(self, host, port):
        self.__host = host
        self.__port = port
        self.__sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        self.__sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self.__sock.bind((self.__host, self.__port))
        self.__sock.settimeout(1)
        super().__init__()
        
    def run(self):
        while True:
            try:
                data = self.__sock.recvfrom(10)
            except socket.timeout:
                break
        self.__sock.close()
            
udp_server = UDPServer('127.0.0.1', 8003) 
udp_server.start()
udp_server.join()

## Синхронизация

Часто мы должны убедиться, что только один поток имеет доступ к разделяемым данным.

In [28]:
# для примера, в качестве разделяемых данных мы можем использовать RawValue, так как в этот тип не встроена синхронизация
raw_value = mp.RawValue(c_uint32, 0)


In [29]:
def func(v):
    v.value += 1
    
process_1 = mp.Process(target=func, args=(raw_value,))
process_2 = mp.Process(target=func, args=(raw_value,))

process_1.start()
process_2.start()

process_1.join()
process_2.join()
    
print(raw_value)

c_uint(2)


Для управления доступом к разделяемой памяти в примере выше необходима синхронизация. 

Можем использовать для этого **Lock**.

In [30]:
%%time

lock = mp.Lock()

def func(rw, lock):
    with lock:  # гарантия того, что значение может менять только один поток
        rw.value += 1
        time.sleep(2)
    
processes = [ mp.Process(target=func, args=(raw_value, lock)) for i in range(n_cpu) ]

list(map(lambda p: p.start(),  processes))
list(map(lambda p: p.join(),  processes)) 

print(raw_value)

c_uint(10)
CPU times: user 2.48 ms, sys: 17 ms, total: 19.4 ms
Wall time: 16 s


Иногда требуется уведомить другой поток о событии. Для этого можно использовать **Condition** или **Event**

In [31]:
condition = mp.Condition()

def reporter(c):
    time.sleep(5)
    with c:
        c.notify()
        
def consumer(c):
    with c:
        print('Wait')
        c.wait()
        print('Exit')
    
process_1 = mp.Process(target=reporter, args=(condition,))        
process_2 = mp.Process(target=consumer, args=(condition,))  

process_1.start()
process_2.start()

process_1.join()
process_2.join()    

Wait
Exit


Пример использования **Event**

In [32]:
event = mp.Event()

def reporter(e):
    time.sleep(3)
    e.set()  # уведомили другие процессы
        
def consumer(e):    
    print('Wait')
    e.wait()  # ждём уведомления (блокирующая операция)
    print('Exit')
    e.clear()  # сброс события, чтобы можно было снова вызвать set
        
process_1 = mp.Process(target=reporter, args=(event,))        
process_2 = mp.Process(target=consumer, args=(event,))  

process_1.start()
process_2.start()

process_1.join()
process_2.join()
        

Wait
Exit


Хорошая заметка про примитивы синхронизации с примерами:
    
http://www.laurentluce.com/posts/python-threads-synchronization-locks-rlocks-semaphores-conditions-events-and-queues/

Официальная документация по multiprocessing:

https://docs.python.org/3.5/library/multiprocessing.html

## Заметка по использованию numpy массивов в процессах

Для данных хочется организовать доступ, чтобы с ними можно было бы работать также, как и с numpy массивами. Напрямую это сделать нельзя, но можно воспользоваться RawArray и инициализировать numpy обёртку уже внутри процессов. Например:

In [33]:
import numpy as np

In [34]:
raw_array = mp.RawArray(c_uint32, 9)

# Важно! тип создаваемого RawArray должен соответствовать dtype numpy массива

def func(raw_arr, b):
    np_arr = np.frombuffer(raw_arr, dtype=np.uint32)
    np_arr = np_arr.reshape((3, 3))
    np_arr[:] = b
    print(mp.current_process(), np_arr.shape)
    
process_1 = mp.Process(target=func, args=(raw_array, 5))        
process_2 = mp.Process(target=func, args=(raw_array, 3))  

process_1.start()
process_2.start()

process_1.join()
process_2.join() 
    
print([a for a in raw_array])    

<Process(Process-7274, started)> (3, 3)
<Process(Process-7275, started)> (3, 3)
[3, 3, 3, 3, 3, 3, 3, 3, 3]


## Техника передачи экземпляров классов в процессы

При использовании threading все потоки работают в одном процессе и в одном адресном пространстве, поэтому мы можем передавать экземпляры классов в поток Thread и изменения сделанные одним потоком мы увидим в другом. 

С процессами (Process) дела обстоят иначе. Если мы передадим экземпляр класса в процесс, то по-умолчанию он и все его атрибуты скопируются. То есть он будет работать, но сделанные в нём изменения будут носить локальный характер.

Имеем обычный класс:

In [54]:
class A:
    def __init__(self):
        self.__i = 0
        
    def set_i(self, i):
        self.__i = i
        print('ID of self.__i: {}'.format(id(self.__i)))
        
    def __str__(self):
        return 'A-{}'.format(self.__i)

С ним можно работать как обычно:

In [55]:
a = A()

a.set_i(1)

print(a)

ID of self.__i: 10919328
A-1


Теперь в нескольких процессах:

In [58]:
def do_smth_with_A(a, i):
    a.set_i(i)
    print('Process: {}'.format(os.getpid()))
    print('ID of a: {}'.format(id(a)))
    print('a: {}'.format(a))
    
a = A()    

print('ID of __main__ a: {}'.format(id(a)))
    
process_1 = mp.Process(target=do_smth, args=(a, 1))    
process_2 = mp.Process(target=do_smth, args=(a, 2))    

process_1.start()
process_2.start()

process_1.join()
process_2.join()

ID of __main__ a: 140508947362200
ID of self.__i: 10919328
ID of self.__i: 10919360
Process: 8995
ID of a: 140508947362200
Process: 8998
ID of a: 140508947362200
a: A-2
a: A-1


In [57]:
print(a)

A-0


Видим, что в классе ничего не удалось изменить. Притом видно, что адрес расположения экземпляра класса один и тот же, но адрес переменной **i** внутри класса совсем не тот. Это копия этого атрибута, её бессмысленно менять, только если она работает и имеет значение внутри процесса.

Можно выйти из положения, если использовать разделяемые типы внутри класса. Например:

In [64]:
class B:
    def __init__(self):
        self.__i = mp.Value(c_uint32, 0)
        
    def set_i(self, i):
        with self.__i.get_lock():
            self.__i.value = i
        print('ID of __i: {}'.format(id(self.__i)))
            
    def __str__(self):
        return 'B-{}'.format(self.__i.value)
    
    
def do_smth_with_B(b, i):
    b.set_i(i)
    
b = B()    
    
process_1 = mp.Process(target=do_smth_with_B, args=(b, 1))    
process_2 = mp.Process(target=do_smth_with_B, args=(b, 2))    

process_1.start()
process_2.start()

process_1.join()
process_2.join()

ID of __i: 140508945478768
ID of __i: 140508945478768


In [65]:
print(b)

B-2


Это как раз то, что нам хотелось получить. Разделять эксземпляры классов можно, на для изменяемых и видных снаружи атрибутах должны быть использованы примитивы из sharedtypes (Value, RawValue, Array, RawArray).

Можно даже передавать в процессы сами классы, если сделать их callable. Это более наглядно. Например:

In [72]:
class C:
    def __init__(self):
        self.__i = mp.Value(c_uint32)
        
    def __call__(self, i):  # переопределим встроенный метод __call__, чтобы передать процессу сразу экземпляр класса
        with self.__i:
            self.__i.value = i
            
    def __str__(self):
        return 'C-{}'.format(self.__i.value)

In [73]:
c = C()    
    
process_1 = mp.Process(target=c, args=(1,))    
process_2 = mp.Process(target=c, args=(2,))    

process_1.start()
process_2.start()

process_1.join()
process_2.join()

In [74]:
print(c)

C-2
