Взаимоисключение (mutual exclusion, сокращённо  —  mutex)  — «флажок», переходящий к потоку, который в данный момент имеет право работать с общими ресурсами. Исключает доступ остальных потоков к занятому участку памяти. Мьютексов в приложении может быть несколько, и они могут разделяться между процессами. Есть подвох: mutex заставляет приложение каждый раз обращаться к ядру операционной системы, что накладно.

Семафор  —  позволяет вам ограничить число потоков, имеющих доступ к ресурсу в конкретный момент. Так вы снизите нагрузку на процессор при выполнении кода, где есть узкие места. Проблема в том, что оптимальное число потоков зависит от машины пользователя.

Событие  —  вы определяете условие, при наступлении которого управление передаётся нужному потоку. Данными о событиях потоки обмениваются, чтобы развивать и логически продолжать действия друг друга. Один получил данные, другой проверил их корректность, третий  —  сохранил на жёсткий диск. События различаются по способу отмены сигнала о них. Если нужно уведомить о событии несколько потоков, для остановки сигнала придётся вручную ставить функцию отмены. Если же целевой поток только один, можно создать событие с автоматическим сбросом. Оно само остановит сигнал, после того как он дойдёт до потока. Для гибкого управления потоками события можно выстраивать в очередь.

Критическая секция  — более сложный механизм, который объединяет в себе счётчик цикла и семафор. Счётчик позволяет отложить запуск семафора на нужное время. Преимущество в том, что ядро задействуется лишь в случае, если секция занята и нужно включать семафор. В остальное время поток выполняется в пользовательском режиме. Увы, секцию можно использовать только внутри одного процесса.

In [2]:
from datetime import datetime
import threading
def factorial(number): 
    fact = 1
    for n in range(1, number+1): 
        fact *= n 
    return fact 
number = 100000 
thread = threading.Thread(target=factorial, args=(number,)) 
startTime = datetime.now() 
thread.start() 
thread.join()

endTime = datetime.now() 
print("Время выполнения: ", endTime - startTime)

Время выполнения:  0:00:03.775813


глобальный шлюз (Global Interpreter Lock, он же GIL), который ограничивает многопоточность на уровне интерпретатора

In [3]:
from datetime import datetime
import threading
def factorial(number): 
    fact = 1
    for n in range(1, number+1): 
        fact *= n 
    return fact 
number = 100000 
thread = threading.Thread(target=factorial, args=(number,))
thread2 = threading.Thread(target=factorial, args=(number,))
startTime = datetime.now() 
thread.start() 
thread2.start() 
thread.join()
thread2.join()

endTime = datetime.now() 
print("Время выполнения: ", endTime - startTime)

Время выполнения:  0:00:07.525986


Numpy, Scipy, Numba

Numba — динамически, «на лету» компилирует Python-код, превращая его в машинный код для исполнения на CPU и GPU. Такая технология компиляции называется JIT — “Just in time”. Она помогает оптимизировать производительность программ за счет ускорения работы циклов и компиляции функций при первом запуске.

In [5]:
from numba import jit
@jit
def arr_sum(x,y): 
    result_arr = nupmy.empty_like ( x)
    for i in range (len (x)) : 
        result_arr [i ] = x[i ] + y[i ] 
    return result_arr

https://habr.com/ru/post/317328/

In [6]:
from numba import cuda
@cuda.jit
def call_for_kernel(io_arr):
    # Идентификатор потока в одномерном блоке
    thread_x = cuda.threadIdx.x
    # Идентификатор блока в одномерной сетке
    thread_y = cuda.blockIdx.x
    # Число потоков на блок (т.е. ширина блока)
    block_width = cuda.blockDim.x
    # Находим положение в массиве
    t_position = thread_x + thread_y * block_width
    if  t_position < io_arr.size:  # Убеждаемся, что не вышли за границы массива
        io_arr[ t_position] *= 2 # Считаем

Стоит ли преодолевать связанные c GIL сложности и тратить время на реализацию многопоточности? Вот примеры ситуаций, когда многопоточность несёт с собой больше плюсов, чем минусов.

-- Для длительных и несвязанных друг с другом операций ввода-вывода. Например, нужно обрабатывать ворох разрозненных запросов с большой задержкой на ожидание. В режиме «живой очереди» это долго  — лучше распараллелить задачу.

-- Вычисления занимают более миллисекунды и вы хотите сэкономить время за счёт их параллельного выполнения. Если операции укладываются в 1 мс, многопоточность не оправдает себя из-за высоких накладных расходов.


-- Число потоков не превышает количество ядер. В противном случае параллельной работы всех потоков не получается и мы больше теряем, чем выигрываем.

Взаимоисключение (mutual exception, кратко — mutex) — простейшая блокировка, которая на время работы потока с ресурсом закрывает последний от других обращений. Реализуют это с помощью класса Lock.

In [7]:
import threading
mutex = threading.Lock()

In [9]:
resource = 0

def thread_safe_function():
    global resource
    for i in range(1000000):
        mutex.acquire()
        # Делаем что-то с переменной resource
        mutex.release()

In [14]:
import threading

protected_resource = 0
unprotected_resource = 0

NUM = 5000000
mutex = threading.Lock()

# Потокобезопасный инкремент
def safe_plus():
    global protected_resource
    for i in range(NUM):
        # Ставим блокировку
        mutex.acquire()
        protected_resource += 1
        mutex.release()

# Потокобезопасный декремент
def safe_minus():
    global protected_resource
    for i in range(NUM):
        mutex.acquire()
        protected_resource -= 1
        mutex.release()

# То же, но без блокировки
def risky_plus():
    global unprotected_resource
    for i in range(NUM):
        unprotected_resource += 1

def risky_minus():
    global unprotected_resource
    for i in range(NUM):
        unprotected_resource -= 1

In [16]:
thread1 = threading.Thread(target = safe_plus)
thread2 = threading.Thread(target = safe_minus)
thread3 = threading.Thread(target = risky_plus)
thread4 = threading.Thread(target = risky_minus)
thread1.start()
thread2.start()
thread3.start()
thread4.start()
thread1.join()
thread2.join()
thread3.join()
thread4.join()
print ("Результат при работе с блокировкой %s" % protected_resource)
print ("Результат без блокировки %s" % unprotected_resource)

Результат при работе с блокировкой 0
Результат без блокировки 1434995


In [17]:
try:
    mutex.acquire()
    # Ваш код...

except SomethingGoesWrong:
    # Обрабатываем исключения

finally:
    # Ещё код
    mutex.release()

IndentationError: expected an indented block (168032309.py, line 8)

In [18]:
from threading import Thread, BoundedSemaphore
from time import sleep, time

ticket_office = BoundedSemaphore(value=3)

def ticket_buyer(number):
    start_service = time()
    with ticket_office:       
        sleep(1)
        print(f"client {number}, service time: {time() - start_service}")

buyer = [Thread(target=ticket_buyer, args=(i,)) for i in range(5)]
for b in buyer:
    b.start()

client 0, service time: 1.000699758529663client 1, service time: 1.0012290477752686

client 2, service time: 1.0027799606323242
client 4, service time: 2.0063419342041016client 3, service time: 2.0071208477020264



In [21]:
e = threading.Event()

def event_manager():
    # Ждём, когда кто-нибудь захватит флаг
    e.wait()
    ...
    # Ставим флаг
    e.set()

    # Работаем с ресурсом
        ...
   # Снимаем флаг и ждём нового
    e.clear()

IndentationError: unexpected indent (4132040075.py, line 11)

In [28]:
from threading import Thread, Event
from time import sleep, time


event = Event()


def worker(name: str):   
    event.wait()
    print(f"Worker: {name} - ", time() - curr_time)


# Clear event
event.clear()

# Create and start workers
workers = [Thread(target=worker, args=(f"wrk {i}",)) for i in range(5)]
for w in workers:
    w.start()
curr_time = time()
print("Main thread - ", time() - curr_time)

sleep(1)
event.set()

Main thread -  7.724761962890625e-05
Worker: wrk 0 - Worker: wrk 3 - Worker: wrk 4 -  1.0016810894012451
Worker: wrk 1 - Worker: wrk 2 -  1.0024452209472656
 1.0032241344451904
 1.0022778511047363
 1.0030951499938965


In [34]:
import threading, random

counter = 0
re_mutex = threading.RLock()

def step_one():
    global counter
#     re_mutex.acquire()
    counter = random.randint(1,100)
    print("Random number %s" % counter)
#     re_mutex.release()

def step_two():
    global counter
#     re_mutex.acquire()
    counter *= 2
    print("Doubled = %s" % counter)
#     re_mutex.release()

def walkthrough():
    #re_mutex.acquire()
    try:
        step_one()
        step_two()
    finally:
        re_mutex.release()

t = threading.Thread(target = walkthrough)
t2 = threading.Thread(target = walkthrough)

t.start()
t2.start()
t.join()
t2.join()

Exception in thread Thread-67:
Traceback (most recent call last):
  File "/Users/mgordenko/opt/anaconda3/lib/python3.9/threading.py", line 973, in _bootstrap_inner
Exception in thread Thread-68:
Traceback (most recent call last):
  File "/Users/mgordenko/opt/anaconda3/lib/python3.9/threading.py", line 973, in _bootstrap_inner
        self.run()
  File "/Users/mgordenko/opt/anaconda3/lib/python3.9/threading.py", line 910, in run
    self._target(*self._args, **self._kwargs)
  File "/var/folders/2x/wnh_0jw147xg4qlczcty3m9w0000gn/T/ipykernel_10646/3368899187.py", line 26, in walkthrough
self.run()
  File "/Users/mgordenko/opt/anaconda3/lib/python3.9/threading.py", line 910, in run
    self._target(*self._args, **self._kwargs)
  File "/var/folders/2x/wnh_0jw147xg4qlczcty3m9w0000gn/T/ipykernel_10646/3368899187.py", line 26, in walkthrough
RuntimeError: cannot release un-acquired lock
RuntimeError: cannot release un-acquired lock


Random number 19Random number 37
Doubled = 74

Doubled = 148


In [None]:
from threading import Condition, Thread
from queue import Queue
from time import sleep


cv = Condition()
q = Queue()


# Consumer function for order processing
def order_processor(name):
    while True:
        with cv:
           # Wait while queue is empty
           while q.empty():
                cv.wait()
           try:
               # Get data (order) from queue
               order = q.get_nowait()
               print(f"{name}: {order}")

               # If get "stop" message then stop thread
               if order == "stop":                   
                   break

           except:
               pass

           sleep(0.1)


# Run order processors
Thread(target=order_processor, args=("thread 1",)).start()
Thread(target=order_processor, args=("thread 2",)).start()
Thread(target=order_processor, args=("thread 3",)).start()

# Put data into queue
for i in range(10):
   q.put(f"order {i}")

# Put stop-commands for consumers
for _ in range(3):
   q.put("stop")

# Notify all consumers
with cv:
   cv.notify_all()

In [36]:
# Создаём рекурсивную блокировку
mutex = threading.RLock()

# Создаём переменную состояния и связываем с блокировкой
cond = threading.Condition(mutex)

# Поток-потребитель ждёт свободного ресурса и захватывает его
def consumer():
    while True:
            cond.acquire()
            while not resourse_free():
                cond.wait()
            get_free_resource()
            cond.release()

# Поток-производитель разблокирует ресурс и уведомляет об этом потребителя
def producer():
    while True:
            cond.acquire()
            unblock_resource()
            # Сигналим потоку: "Налетай на новые данные!"
            cond.notify()
            cond.release()

In [40]:
def safe_plus():
    global protected_resource
    for i in range(NUM):
        with mutex:
            protected_resource += 1

In [42]:
from threading import Barrier, Thread
from time import sleep, time


br = Barrier(3)
store = []


def f1(x):
   print("Calc part1")
   store.append(x**2)
   sleep(0.5)
   br.wait()


def f2(x):
   print("Calc part2")
   store.append(x*2)
   sleep(1)
   br.wait()


Thread(target=f1, args=(3,)).start()
Thread(target=f2, args=(7,)).start()

br.wait()

print("Result: ", sum(store))

Calc part1
Calc part2
Result:  23
