In [33]:
f'{4:x}'

'4'

# Многозадачность

Python предоставляет основные инструменты для работы в многозадачных системах: потоки и процессы.

## Потоки (Thread)

> **Поток** - это легковесный процесс в том же адресном пространстве, что и основной процесс.

Из определения выше следует, что все потоки могут свободно получать доступ к общей информации. Из этого проистикает самая большая опасноcть - рассинхронизированность данных, которая может приводить к состоянию гонки (race condition).

Также важно помнить, что создавать больше потоков, чем у вас ядер - не очень эффективно, так как в этом случае ОС будет тратить ресурсы на переключение контекста между потоками.

In [15]:
import threading
from time import sleep

# Не самый удобный способ создать поток
class MyThread(threading.Thread):
    def __init__(self, msg, sleep):
        super().__init__()
        self.msg = msg
        self.sleep = sleep
        
    def run(self):
        print(f"START[{self.msg}]")
        sleep(self.sleep)
        print(f"END  [{self.msg}]")

t1 = MyThread("T1", 2)
t2 = MyThread("T2", 5)
t1.start()
t2.start()

# do magic

t1.join()
t2.join()

START[T1]
START[T2]
END  [T1]
END  [T2]


In [7]:
import threading
from time import sleep

def Task(msg, t):
    print(f"START[{msg}]")
    sleep(t)
    print(f"END  [{msg}]")

# Так чуток удобнее
t1 = threading.Thread(target=Task, args=("T1", 5))
t2 = threading.Thread(target=Task, args=("T2", 2))
t1.start()
t2.start()

t1.join()
t2.join()

START[T1]
START[T2]
END  [T2]
END  [T1]


> `Thread.join([timeout])` - позволяет дождаться выполнения кода внутри потока

## Синхронизация

Самым простым объектом синхронизации является мьютекс, которые в Python имеет название `Lock` (`RLock`). Мьютекс работает просто, при попытке любым потоком его захватить, то если данный мьютекс уже захвачен, то поток будет ждать, пока он не освободится. Нужно очень осторожно организовывать код доступа к общим данным, чтобы из-за взаимной блокировки разных мьютексов не возник deadlock.

In [26]:
from threading import Thread
from time import sleep

counter = 0

def increase(value):
    global counter

    local_counter = counter
    local_counter += value

    sleep(0.1)

    counter = local_counter
    print(f'counter={counter}')


# Создаем потоки
t1 = Thread(target=increase, args=(10,))
t2 = Thread(target=increase, args=(20,))

# Стартуем
t1.start()
t2.start()


# Ждем
t1.join()
t2.join()

# Вывод будет случайным
print(f'{counter=}')

counter=20
counter=10
counter=10


In [27]:
from threading import Thread, Lock
from time import sleep

counter = 0
lock = Lock()

def increase(by):
    # альтернатива, можно вручную вызвать lock.acquire() 
    # и lock.release(), но это опаснее
    with lock:
        global counter
    
        local_counter = counter
        local_counter += by
    
        sleep(0.1)
    
        counter = local_counter
        print(f'counter={counter}')


# Создаем потоки
t1 = Thread(target=increase, args=(10,))
t2 = Thread(target=increase, args=(20,))

# Стартуем
t1.start()
t2.start()


# Ждем
t1.join()
t2.join()

# Вывод будет однозначным
print(f'{counter=}')

counter=10
counter=30
counter=30


# `concurrent.futures`

Вариант использования потоков из предыдущего раздела подходит либо для разовым операций или для организации одной фоновой задачи. Если есть множество задач, которые нужно максимально эффективно выполнить, то удобнее использовать модуль [concurrent.futures](https://docs.python.org/3/library/concurrent.futures.html).

Оригинал: [Зачем, когда и как использовать multithreading и multiprocessing в Python](https://habr.com/ru/companies/otus/articles/501056/)

> Задача:получить данные по всем URL, заданным в списке

In [19]:
import urllib.request
from concurrent.futures import ThreadPoolExecutor

urls = [
  'http://www.python.org',
  'https://docs.python.org/3/',
  'https://docs.python.org/3/whatsnew/3.12.html',
  'https://docs.python.org/3/tutorial/index.html',
  'https://docs.python.org/3/library/index.html',
  'https://docs.python.org/3/reference/index.html',
  'https://docs.python.org/3/using/index.html',
  'https://docs.python.org/3/howto/index.html',
  'https://docs.python.org/3/installing/index.html',
  'https://docs.python.org/3/distributing/index.html',
  'https://docs.python.org/3/extending/index.html',
  'https://docs.python.org/3/c-api/index.html',
  'https://docs.python.org/3/faq/index.html'
]

def get_data(url):
    with urllib.request.urlopen(url) as src:
        return src.read()

In [21]:
%%time

results = []
for url in urls:
    page = get_data(url)
    results.append(page)

CPU times: total: 203 ms
Wall time: 4.41 s


In [29]:
%%time

# А теперь используем пул потоков
with ThreadPoolExecutor(1) as executor:
    # можно добавлять по одной через submit (возвращает Future, вместо генератора), 
    # но map в данном случае удобнее
    results = executor.map(get_data, urls)
    print(next(results)[:20])

b'<!doctype html>\n<!--'
CPU times: total: 234 ms
Wall time: 4.29 s


In [30]:
%%time

# А теперь используем несколько потоков
with ThreadPoolExecutor(4) as executor:
    # можно добавлять по одной через submit (возвращает Future, вместо генератора), 
    # но map в данном случае удобнее
    results = executor.map(get_data, urls)

CPU times: total: 203 ms
Wall time: 1.27 s


In [31]:
%%time

# А теперь используем несколько потоков
with ThreadPoolExecutor(8) as executor:
    # можно добавлять по одной через submit (возвращает Future, вместо генератора), 
    # но map в данном случае удобнее
    results = executor.map(get_data, urls)

CPU times: total: 281 ms
Wall time: 667 ms


In [32]:
%%time

# А теперь используем несколько потоков
with ThreadPoolExecutor(16) as executor:
    # можно добавлять по одной через submit (возвращает Future, вместо генератора), 
    # но map в данном случае удобнее
    results = executor.map(get_data, urls)

CPU times: total: 250 ms
Wall time: 674 ms


In [33]:
%%time

# А теперь используем несколько потоков
with ThreadPoolExecutor(32) as executor:
    # можно добавлять по одной через submit (возвращает Future, вместо генератора), 
    # но map в данном случае удобнее
    results = executor.map(get_data, urls)

CPU times: total: 266 ms
Wall time: 735 ms


# Ложка дегтя

Попробуем посчитать число $\pi$ интегрируя $y = \sqrt{1 - x^2}$ от -1 до +1

In [2]:
import math

def linspace(a, b, N):
    assert N > 1
    result = [a,]
    dx = (b - a) / (N - 1)    
    for n in range(1, N - 1):
        result.append(a + dx * n)
    result.append(b)
    return result

def integrate(f, *, a, b, N):
    x = linspace(a, b, N)

    value = 0
    for left, right in zip(x[:-1], x[1:]):
        value += (F(left) + F(right)) * 0.5 * (right - left)
    return value

def F(x):
    return math.sqrt(1 - x**2)

In [54]:
%%time

# простая реализация
integrate(F, a=-1, b=1, N=5000000) * 2

CPU times: total: 5.66 s
Wall time: 5.71 s


3.141592653292644

In [3]:
N = 5000000
steps = 10
dN = N // steps
nodes = linspace(-1, 1, steps + 1)

args = []
for a, b in zip(nodes[:-1], nodes[1:]):
    args.append(dict(a=a, b=b, N=dN))    
args

[{'a': -1, 'b': -0.8, 'N': 500000},
 {'a': -0.8, 'b': -0.6, 'N': 500000},
 {'a': -0.6, 'b': -0.3999999999999999, 'N': 500000},
 {'a': -0.3999999999999999, 'b': -0.19999999999999996, 'N': 500000},
 {'a': -0.19999999999999996, 'b': 0.0, 'N': 500000},
 {'a': 0.0, 'b': 0.20000000000000018, 'N': 500000},
 {'a': 0.20000000000000018, 'b': 0.40000000000000013, 'N': 500000},
 {'a': 0.40000000000000013, 'b': 0.6000000000000001, 'N': 500000},
 {'a': 0.6000000000000001, 'b': 0.8, 'N': 500000},
 {'a': 0.8, 'b': 1, 'N': 500000}]

In [66]:
%%time 

value = 0
with ThreadPoolExecutor(1) as executor:
    value = sum(executor.map(lambda args: integrate(F, **args), args)) * 2    
value

CPU times: total: 4.84 s
Wall time: 4.86 s


3.1415926532923013

In [67]:
%%time 

value = 0
with ThreadPoolExecutor(4) as executor:
    value = sum(executor.map(lambda args: integrate(F, **args), args)) * 2    
value

CPU times: total: 4.83 s
Wall time: 4.87 s


3.1415926532923013

In [71]:
%%time 

value = 0
with ThreadPoolExecutor(8) as executor:
    value = sum(executor.map(lambda args: integrate(F, **args), args)) * 2    
value

CPU times: total: 4.89 s
Wall time: 4.99 s


3.1415926532923013

Не важно, сколько потоков поставим - время выполнения почти не меняется. Проблема в GIL (global interpreter lock). **Грубо говоря в один момент времени может выполнятся только один поток, не важно сколько при этом их было создано.**

Но почему тогда для чтения данных с сайта мы получили прирост скорости?

## `multiprocessing`

Вместо создания потоков можно создавать процессы. Это немного дороже с точки зрения времени создания и оперативной памяти, но безопасней с точки зрения доступа к данным. Процесс не может (простым способом) получить доступ к памяти другого процесса.

Работа с процессами практически идентична. Вместо `Thread` мы имеем `Proccess`. Единственная сложность - организация обмена данными между процессами немного сложнее.

Есть точно также пул процессов.

Также обязательно нужно использовать конструкцию для запуска основного кода, так как текущий модуль будет использоваться всеми процессами
```Python
if __name__ == '__main__':
    ...
```

Пример в `mp.py`.