# Синхронизация потоков

* Очереди
* Блокировки
* Условные переменные

## Очереди. Модуль queue

Пример как можно использовать очереди (queue) для обмена данными между потоками:

In [2]:
from queue import Queue
from threading import Thread

# Функция, которую будут выполнять потоки
def worker(q, n):
    while True:
        item = q.get()
        if item is None:
            break
        print('Thread: {}. Process item from queue: {}{}'.format(n, item, '\n'), end='')


q = Queue(5) # Создаем объект типа Queue (очередь) с максимальным размером 5
th1 = Thread(target=worker, args=(q, 1)) # Первый поток в который передаем очередь и условный номер потока
th2 = Thread(target=worker, args=(q, 2)) # Передаем параметры аналогично предыдущему потоку

# Стартуем потоки из главного потока (main).
# Бесконечный цикл, пока поток не встретит элемент со значением None
th1.start()
th2.start()

for i in range(20):
    # Метод put() позволяет положить элементы в очередь.
    # Если очередь будет наполнена 5-ю элементами, то вызов метода put() заблокирует выполнение потока,
    # который вызвал этот метод и будет ждать, пока в очереди не появится свободное место для
    # следующих элементов цикла for.
    q.put(i)

q.put(None) # Завершаем бесконечный цикл для одного из потоков передав в очередь элемент None
q.put(None) # Завершаем бесконечный цикл для одного из потоков передав в очередь элемент None

th1.join() # Ожидаем в главном потоке выполнение первого потока
th2.join() # Ожидаем в главном потоке выполнение второго потока

Thread: 2. Process item from queue: 0
Thread: 1. Process item from queue: 1
Thread: 1. Process item from queue: 2
Thread: 2. Process item from queue: 8
Thread: 1. Process item from queue: 3
Thread: 2. Process item from queue: 9
Thread: 1. Process item from queue: 4
Thread: 2. Process item from queue: 10
Thread: 1. Process item from queue: 5
Thread: 2. Process item from queue: 11
Thread: 1. Process item from queue: 6
Thread: 2. Process item from queue: 12
Thread: 1. Process item from queue: 7
Thread: 2. Process item from queue: 13
Thread: 1. Process item from queue: 16
Thread: 2. Process item from queue: 14
Thread: 1. Process item from queue: 17
Thread: 2. Process item from queue: 15
Thread: 1. Process item from queue: 19
Thread: 2. Process item from queue: 18


## Синхронизация потоков, блокировки

Предпочтительно использовать очереди, но иногда приходится использовать блокировки.  
Блокировки как минимум замедляют работу программы, тем не менее их нужно применять.

Представим, что мы создали объект класса `Point` и используем этот объект в большом количестве потоков.  
Одни из этих потоков одновременно обращаются к методу `get()`, другие к методу `set()`.  
Если не использовать блокировку, то может возникнуть ситуация, когда один поток изменил значение `x` (строка `self._x = x`), но еще не успел изменить значение `y` (строка `self._y = y`), а другой поток в это время вернул значения `x` и `y`, то мы получим неконсистентное состояние объекта, когда у него частично одна переменная изменена, а другая нет.

In [3]:
import threading

class Point():

    def __init__(self):
        self._mutex = threading.RLock()
        self._x = 0
        self._y = 0

    def get(self):
        # С помощью контекстного менеджера захватываем блокировку, а при выходе из него - освобождаем
        with self._mutex:
            return (self._x, self._y)

    def set(self, x, y):
        # С помощью контекстного менеджера захватываем блокировку, а при выходе из него - освобождаем
        with self._mutex:
            self._x = x
            self._y = y

## Еще вариант применения блокировок

Блокировки можно использовать без контекстного менеджера:

In [4]:
import threading

# Создаем объекты класса RLock:
a = threading.RLock()
b = threading.RLock()

def foo():
    try:
        # Вызываем метод acquire() чтобы получить/захватить блокировку
        a.acquire()
        b.acquire()
    finally: # Выполнится в любом случае
        # Вызываем метод release() чтобы высвободить блокировку.
        # В этом примере может возникнуть ситуация deadlock, когда мы высвобождаем
        # блокировки в неправильной последовательности:
        a.release()
        b.release()

## Синхронизация потоков, условные переменные

In [5]:
import threading

class Queue():

    def __init__(self, size=5):
        self._size = size
        self._queue = []
        self._mutex = threading.RLock
        self._empty = threading.Condition(self._mutex)
        self._full = threading.Condition(self._mutex)

    def put(self, val):
        with self._full:
            while len(self._queue) >= self._size:
                self._full.wait()

            self._queue.append(val)
            self._empty.notify()

    def get(self):
        with self._empty:
            while len(self._queue) == 0:
                self._empty.wait()

            ret = self._queue.pop(0)
            self._full.notify()
            return ret

## Итоги

* Потоки выполняются в рамках процесса
* Потоки разделяют ресурсы и память процесса
* Потоки более легковесны по сранению с процессами
* Для обмена данными между потоками существуют готовые механизмы - Queue
* Многопоточные программы могут использовать объекты блокировки
* Потоки выполняются с GIL