#  Вебинар 5. Параллельные вычисления. Многопоточность и многопроцессность. Асинхронность

## Проверка связи

**Поставьте в чат:**<br>
\+ если меня видно и слышно<br>
– если нет

**Если у вас нет звука:**

* убедитесь, что на вашем устройстве и в колонках включён звук

* обновите страницу вебинара или закройте страницу и переподключитесь

* откройте вебинар в другом браузере

* перезагрузите ваше устройство и войдите снова

## О спикере

**Глеб Пехов**
- Backend-разработчик на Python с 5-летним опытом работы
- Преподаватель в Нетологии

## Правила участия

1. Продолжительность вебинара — 80 минут. Через 40 минут сделаем перерыв на 5 минут
2. Запустите Jupyter Notebook / Google Colab / IDE для выполнения практических заданий вебинара. Во время демонстрации работы повторяйте за спикером: так вы лучше поймёте материал
3. Вопросы и уточнения:
  - создайте копию этого блокнота, чтобы фиксировать вопросы и важную информацию во время занятия
  - вы можете писать вопросы в чате во время вебинара или озвучивать их в блоке «Ваши вопросы»
4. Запись вебинара будет доступна в личном кабинете

## Цели занятия

- Познакомиться с принципом параллельных вычислений
- Узнать, что такое потоки и процессы
- Рассмотреть принципы многопоточности и многопроцессности
- Узнать, как использовать async/await
- Научиться решать несложные задачи, используя принцип параллельных вычислений

## План занятия

[1. Терминология конкурентности](#1.-Терминология-конкурентности)<br>
[2. Реализация индикатора с потоками](#2.-Реализация-индикатора-с-потоками)<br>
[3. Реализация индикатора с процессами](№3.-Реализация-индикатора-с-процессами)<br>
[4. Реализация индикатора с асинхронностью](№4.-Реализация-индикатора-с-асинхронностью)<br>
[5. Сравнение супервизоров](№5.-Сравнение-супервизоров)<br>
[6. Примеры кода с async/await](№6.-Примеры-кода-с-async/await)<br>

## Ваши вопросы

## 1. Терминология конкурентности

**Конкурентность** — способность обрабатывать несколько задач с чередованием или, если это возможно, параллельно, так что каждая задача в итоге завершается успешно или с ошибкой. Одноядерный процессор допускает конкурентность, если он работает под управлением планировщика ОС, который чередует выполнение ожидающих задач. Иногда используется название «многозадачность».

**Параллелизм** — способность выполнять несколько вычислений одновременно. Для этого необходим многоядерный процессор, несколько процессоров, графический процессор (GPU) или кластер из нескольких компьютеров.

**Единица выполнения** — общий термин для объектов, выполняющих код конкурентно, каждый из которых имеет независимые от других состояния и стек вызовов. Python поддерживает три вида единиц выполнения: процессы, потоки и сопрограммы.

**Процесс** — экземпляр компьютерной программы во время её выполнения, которому выделены память и квант процессорного времени. Современные ОС для настольных компьютеров свободно управляют сотнями конкурентных процессов, при этом каждый процесс изолирован в своём адресном пространстве.

Процессы взаимодействуют через каналы, сокеты или отображённые на память файлы — все они могут передавать только голые байты. Чтобы передать объект Python из одного процесса в другой, его нужно сериализовать в виде последовательности байтов. Это дорого, и не все объекты допускают сериализацию. Процесс может порождать подпроцессы или дочерние процессы. Они изолированы и друг от друга, и от родительского процесса.

Процессы допускают вытесняющую многозадачность: планировщик ОС периодически вытесняет работающий процесс, т. е. приостанавливает его, чтобы дать возможность поработать остальным. Это означает, что в теории зависший процесс не может застопорить всю систему.

**Поток** — единица выполнения внутри одного процесса.

Сразу после запуска процесс содержит один поток — главный. Вызывая системные API, процесс может создавать дополнительные потоки, которые будут работать конкурентно.

Потоки внутри одного процесса разделяют общее пространство памяти, в которой находятся активные объекты Python. Это позволяет потокам совместно использовать данные, но может приводить к повреждению данных, когда сразу несколько потоков пытаются обновить один и тот же объект. Как и процессы, потоки допускают вытесняющую многозадачность под управлением планировщика ОС. Поток потребляет меньше ресурсов, чем процесс, для выполнения одной и той же работы.

**Сопрограмма** — функция, которая может приостановить своё выполнение и возобновить его позже.

В Python классические сопрограммы строятся на основе генераторных функций, а платформенные определяются с помощью ключевых слов async def.

Обычно сопрограммы в Python исполняются в одном потоке под управлением цикла событий, который работает в том же потоке. Каркасы асинхронного программирования asyncio, curio или trio предоставляют цикл событий и поддерживающие библиотеки для реализации неблокирующего ввода-вывода на основе сопрограмм.

Сопрограммы поддерживают кооперативную многозадачность: каждая из них должна явно уступать процессор с помощью ключевого слова yield или await, чтобы другие части программы могли работать конкурентно, но не параллельно. Это означает, что любой блокирующий код внутри сопрограммы блокирует выполнение цикла событий и остальных сопрограмм, в отличие от вытесняющей многозадачности, которую поддерживают процессы и потоки. С другой стороны, сопрограммы потребляют меньше ресурсов по сравнению с процессами и потоками, выполняющими ту же работу.

**Блокировка** — объект, который единицы выполнения могут использовать для синхронизации своих действий, чтобы избежать повреждения данных.

Во время обновления разделяемой структуры данных исполняемый код должен удерживать ассоциированную блокировку. Это служит для остальных частей программы сигналом, что нужно подождать, пока блокировка освободится, и только потом обращаться к той же структуре данных.

Простейший вид блокировки называется **мьютексом** (mutual exclusion, взаимное исключение). Реализация блокировки зависит от модели конкурентности.

**Состязание** — спор за ограниченный ресурс. Оно возникает, когда несколько единиц выполнения пытаются обратиться к разделяемому ресурсу, например к блокировке или хранилищу. Бывает и состязание за процессор, когда
счётные процессы или потоки должны ждать, пока планировщик ОС выделит им долю процессорного времени.

Теперь воспользуемся изученной терминологией, чтобы понять, как конкурентность поддерживается в Python.

## Ваши вопросы

## 2. Реализация индикатора с потоками

In [None]:
import itertools
import time
from threading import Thread, Event

def spin(message: str, done_event: Event) -> None:
    for character in itertools.cycle(r"\|/-"):
        status = f"\r{character} {message}"
        print(status, end="", flush=True)
        if done_event.wait(0.1):
            break
        blanks = ' ' * (len(status) - 1)
        print(f'\r{blanks}\r', end="")

def slow_function() -> int:
    time.sleep(3)
    return 42

def supervisor() -> int:
    done_event = Event()
    spinner_thread = Thread(target=spin, args=("thinking... ", done_event))
    spinner_thread.start()
    result = slow_function()
    done_event.set()
    spinner_thread.join()
    return result

def main() -> None:
    result = supervisor()
    print(f"Answer: {result}")

if __name__ == '__main__':
    main()

- thinking... Answer: 42


1. Аргумент done, экземпляр класса threading.Event — простой объект для синхронизации потоков
2. Функция slow_function() вызывается из главного потока. Представьте, что это вызов медленного API по сети. Вызов sleep блокирует главный поток, но GIL при этом освобождается, поэтому поток индикатора продолжает работать

## Ваши вопросы

## 3. Реализация индикатора с процессами

In [1]:
import itertools
import time
from multiprocessing import Process, Event, Queue, synchronize

def spin(message: str, done_event: synchronize.Event) -> None:
    for character in itertools.cycle(r"\|/-"):
        status = f"\r{character} {message}"
        print(status, end="", flush=True)
        if done_event.wait(0.1):
            break
        blanks = ' ' * (len(status) - 1)
        print(f'\r{blanks}\r', end="")

def slow_function(queue: Queue) -> None:
    time.sleep(3)
    queue.put(42)

def supervisor() -> int:
    done_event = Event()
    queue = Queue()

    spinner = Process(target=spin, args=("thinking... ", done_event))
    spinner.start()

    slow_function(queue)

    done_event.set()
    spinner.join()

    return queue.get()

def main() -> None:
    result = supervisor()
    print(f"Answer: {result}")

if __name__ == '__main__':
    main()

Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "/Applications/Xcode.app/Contents/Developer/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/multiprocessing/spawn.py", line 116, in spawn_main
    exitcode = _main(fd, parent_sentinel)
  File "/Applications/Xcode.app/Contents/Developer/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/multiprocessing/spawn.py", line 126, in _main
    self = reduction.pickle.load(from_parent)
AttributeError: Can't get attribute 'spin' on <module '__main__' (built-in)>


Answer: 42


## Ваши вопросы

## 4. Реализация индикатора с асинхронностью

## Немного синтаксиса

In [None]:
import asyncio

asyncio.run(coro()) # Вызывается из регулярной функции для управления объектом сопрограммы, который обычно является точкой входа в весь асинхронный код программы, как supervisor в этом примере. Этот вызов блокирует выполнение

In [None]:
import asyncio

asyncio.create_task(coro()) # Вызывается из сопрограммы, чтобы запланировать выполнение другой сопрограммы. Этот вызов не приостанавливает текущую сопрограмму. Он возвращает экземпляр Task — объект, который обёртывает объект сопрограммы и предоставляет методы для управления ею и для опроса её состояния

In [None]:
await coro() # Вызывается из сопрограммы, чтобы передать управление объекту сопрограммы, возвращённому coro(). Этот вызов приостанавливает текущую сопрограмму до возврата из coro. Значением выражения await является значение, возвращённое coro

## Реализация

In [None]:

import asyncio
import itertools

async def spin(msg: str) -> None:
    for char in itertools.cycle(r"\|/-"):
        status = f"{char} {msg}"
        print(status, end="\r")
        try:
            await asyncio.sleep(.1)
        except asyncio.CancelledError:
            break
    blanks = " " * len(status)
    print(f"{blanks}\r", end="")

async def slow_function() -> int:
    await asyncio.sleep(3)
    return 42


def main() -> None:
    result = asyncio.run(supervisor())
    print(f"Answer: {result}")

async def supervisor() -> int:
    spinner = asyncio.create_task(spin('thinking...'))
    print(f"spinner object: {spinner}")
    result = await slow_function()
    spinner.cancel()
    return result


if __name__ == '__main__':
    main()


## Сломаем код ради понимания процесса

In [None]:

import asyncio
import itertools
import time

async def spin(msg: str) -> None:
    for char in itertools.cycle(r"\|/-"):
        status = f"{char} {msg}"
        print(status, end="\r")
        try:
            await asyncio.sleep(.1)
        except asyncio.CancelledError:
            break
    blanks = " " * len(status)
    print(f"{blanks}\r", end="")

async def slow_function() -> int:
    time.sleep(3)
    return 42


def main() -> None:
    result = asyncio.run(supervisor())
    print(f"Answer: {result}")

async def supervisor() -> int:
    spinner = asyncio.create_task(spin('thinking...'))
    print(f"spinner object: {spinner}")
    result = await slow_function()
    spinner.cancel()
    return result


if __name__ == '__main__':
    main()


1. Создаётся задача spinner, чтобы в конечном итоге активировать выполнение spin
2. На экране показано, что Task находится в состоянии pending
3. Выражение await передаёт управление сопрограмме slow
4. time.sleep(3) блокирует выполнение на 3 секунды. В  программе ничего не  может произойти, потому что главный поток блокирован, а он единственный. Операционная система продолжит заниматься другими делами. Спустя 3 секунды sleep завершается, выполнение возобновляется и slow возвращает управление
5. Сразу после возврата из slow задача spinner отменяется. Поток управления
так и не дошёл до тела сопрограммы spin

## Ваши вопросы

## 5. Сравнение супервизоров

In [None]:
def supervisor() -> int:
    done_event = Event()
    spinner_thread = Thread(target=spin, args=("thinking... ", done_event))
    spinner_thread.start()
    result = slow_function()
    done_event.set()
    spinner_thread.join()
    return result

In [None]:
async def supervisor() -> int:
    spinner = asyncio.create_task(spin('thinking...'))
    print(f"spinner object: {spinner}")
    result = await slow_function()
    spinner.cancel()
    return result

- Класс asyncio.Task приблизительно эквивалентен threading.Thread
- Task управляет объектом сопрограммы, а Thread обращается к вызываемому объекту
- Сопрограмма уступает управление явно с помощью ключевого слова await
- Мы не создаём объекты Task самостоятельно, а получаем их, передавая сопрограмму функции asyncio.create_task(…)
- Когда asyncio.create_task(…) возвращает объект Task, его выполнение уже запланировано. А экземпляру Thread нужно явно сказать, что пора выполняться, вызвав его метод start
- В многопоточной версии supervisor slow является простой функцией и напрямую вызывается из главного потока. В асинхронной же версии slow — сопрограмма, активируемая await
- Не существует API для завершения потока извне. Вместо этого нужно послать потоку сигнал, например установить объект Event done. Для задач есть метод экземпляра Task.cancel(), который возбуждает исключение CancelledError в том выражении await, в котором в текущий момент приостановлено выполнение тела сопрограммы
- Сопрограмму supervisor нужно запускать с помощью asyncio.run в функции main

##  Истинное влияние GIL

In [None]:
import math

def is_prime(n: int) -> bool:
    if n < 2:
        return False
    if n == 2:
        return True
    if n % 2 == 0:
        return False
    max_divisor = math.floor(math.sqrt(n))
    for i in range(3, 1 + max_divisor, 2):
        if n % i == 0:
            return False
    return True

if __name__ == '__main__':
    print(is_prime(10_000_000_000_000_037))

False


### Представим, что эта проверка на простоту работает примерно 3 секунды на среднем компьютере.
### Важный вопрос: можно ли заменить ею вызов time.sleep(3) | asyncio.sleep(3)?

## Ответы на вопросы

### Ответ для multiprocessing

Индикатор управляется дочерним процессом, поэтому будет крутиться и тогда, когда родительский процесс проверяет число на простоту.

### Ответ для threading
Индикатор управляется дополнительным потоком, поэтому будет крутиться
и тогда, когда главный поток проверяет число на простоту.

Я пришёл к этому ответу не сразу: я ожидал, что индикатор прекратит крутиться, потому что переоценил воздействие GIL.

В этом примере индикатор продолжает крутиться, потому что Python приостанавливает работающий поток раз в 5 мс (по умолчанию), делая GIL доступной другим ожидающим потокам. Поэтому главный поток, исполняющий
is_prime, прерывается каждые 5 мс, так что дополнительный поток может проснуться и выполнить одну итерацию цикла for, в конце которой
он вызовет метод wait события done и освободит GIL. Затем главный поток захватит GIL, и вычисление is_prime продолжится на протяжении следующих 5 мс.
Это не оказывает видимого влияния на время работы в этом примере, поскольку функция spin быстро выполняет одну итерацию и освобождает
GIL в ожидании события done, поэтому интенсивность состязания за GIL невелика. Главный поток, исполняющий is_prime, владеет GIL бо́льшую часть времени.

В этом простом примере мы разобрались со счётной задачей, потому что
потоков всего два: один полностью загружает процессор, а второй просыпается всего 10 раз в секунду, чтобы обновить индикатор.
Но если потоков два или больше и все они сильно потребляют процессорное
время, то программа будет работать медленнее, чем последовательный код.

### Ответ для asyncio

Если вызвать is_prime(5_000_111_000_222_021) в сопрограмме slow в примере
spinner_async.py, то индикатор вообще не появится на экране. Эффект будет такой же, как в примере, где мы заменили await asyncio.sleep(3) на time.sleep(3): никакого вращения. Поток управления переходит от supervisor к slow и затем к is_prime. Когда is_prime возвращается, то же самое делает и slow, и supervisor возобновляет работу и отменяет задачу spinner, не дав ей выполниться даже один раз. Выглядит это так, будто программа зависла на 3 с, а затем выдала ответ.

## Ваши вопросы

## 6. Примеры кода с async/await

In [None]:
import asyncio

async def fetch_data(delay: int, name: str):
    print(f"Начинаю загрузку {name}...")
    await asyncio.sleep(delay)  # Имитация задержки при загрузке
    print(f"{name} загружен!")

async def main():
    tasks = [
        fetch_data(2, "Данные 1"),
        fetch_data(3, "Данные 2"),
        fetch_data(1, "Данные 3"),
    ]

    await asyncio.gather(*tasks)  # Ожидание завершения всех задач

# Запуск основной корутины
if __name__ == '__main__':
    asyncio.run(main())

In [None]:
import aiohttp
import asyncio

async def fetch_url(url):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            return await response.text()

async def main():
    urls = [
        'https://www.example.com',
        'https://www.python.org',
        'https://www.github.com'
    ]

    tasks = [fetch_url(url) for url in urls]
    results = await asyncio.gather(*tasks)

    for i, result in enumerate(results):
        print(f"Content from {urls[i]}: {result[:0]}...")

if __name__ == '__main__':
    asyncio.run(main())


In [None]:
import aiofiles
import asyncio

async def read_file(file_name):
    async with aiofiles.open(file_name, mode='r') as file:
        content = await file.read()
        print(f"Содержимое {file_name}: {content[:50]}...")  # Печатаем первые 50 символов

async def main():
    files = ['file1.txt', 'file2.txt', 'file3.txt']
    tasks = [read_file(file) for file in files]
    await asyncio.gather(*tasks)

if __name__ == '__main__':
    asyncio.run(main())

## Ваши вопросы

## Итоги занятия

- Познакомились с принципом параллельных вычислений
- Узнали, что такое потоки и процессы
- Рассмотрели принципы многопоточности и многопроцессности
- Узнали, как использовать async/await
- Научились решать несложные задачи, используя принцип параллельных вычислений

## Домашнее задание

###  Цели задания:
- научиться использовать принципы многопоточности и многопроцессности в написании кода
- научиться работать с асинхронностью

###  Задание:
1. Напишите программу, которая создаёт два потока для вычисления квадратов и кубов чисел от 1 до 10

2. Напишите программу, которая создаёт несколько потоков для выполнения функции, выводящей числа от 1 до 10 с задержкой в 1 секунду

3. Напишите программу, которая асинхронно обрабатывает список чисел, вычисляя их квадрат. Каждая операция обработки должна имитировать задержку в 1 секунду

4. Напишите программу, которая использует многопроцессность для вычисления факториала чисел от 1 до 10. Каждый процесс должен вычислять факториал одного числа

## Анонс следующего занятия

Вебинар по теме 6 «Классы и объекты»