### Что такое процесс и поток

#### Процесс
**Процесс** — это экзeмпляр программы, выполняющейся в операционной системе. Он включает в себя:
- Исполняемый код программы.
- Выделенную память (оперативную память), в которой хранятся данные программы.
- Системные ресурсы, такие как файлы, сетевые соединения и устройства ввода-вывода.

Каждый процесс в операционной системе изолирован от других процессов. У него есть собственное адресное пространство, что делает процессы независимыми и предотвращает прямой доступ одного процесса к данным другого.

##### Пример процесса:
Если вы запускаете текстовый редактор, веб-браузер и медиаплеер одновременно, операционная система создаёт отдельные процессы для каждой из этих программ.

##### Код на Python: запуск процесса с использованием модуля `os`


In [2]:
import os


def show_process_info():
    print(f"Process ID: {os.getpid()}")  # Получаем ID текущего процесса
    print(f"Parent Process ID: {os.getppid()}")  # Получаем ID родительского процесса


show_process_info()


Process ID: 17868
Parent Process ID: 17700


#### Поток
**Поток (thread)** — это наименьшая единица исполнения кода внутри процесса. Потоки внутри одного процесса могут:
- Выполняться параллельно.
- Совместно использовать память и ресурсы процесса.
  
Так как потоки в одном процессе делят память, их создание и управление требуют меньше ресурсов, чем у процессов.

##### Пример потока:
В текстовом редакторе поток может быть ответственен за автоматическое сохранение, другой — за проверку орфографии, а третий — за отображение пользовательского интерфейса.

##### Код на Python: запуск потока с использованием модуля `threading`


In [3]:
import threading


def print_thread_info():
    print(f"Thread Name: {threading.current_thread().name}")


thread = threading.Thread(target=print_thread_info, name="MyThread")
thread.start()
thread.join()


Thread Name: MyThread


In [4]:
%%html

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <title>Operating System Structure</title>
    <style>
        body {
            font-family: Arial, sans-serif;
            background-color: #f4f4f9;
            margin: 0;
            padding: 20px;
        }

        .os-container {
            border: 3px solid #333;
            padding: 20px;
            border-radius: 10px;
            background-color: #f0f0ff;
            max-width: 800px;
            margin: 0 auto;
        }

        .title {
            font-weight: bold;
            font-size: 24px;
            text-align: center;
            margin-bottom: 20px;
        }

        .resource {
            margin: 10px 0;
            padding: 10px;
            border: 1px solid #666;
            border-radius: 5px;
            background-color: #e0e7ff;
            text-align: center;
        }

        .process-container {
            display: flex;
            gap: 20px;
            margin-top: 20px;
        }

        .process {
            border: 2px solid #555;
            border-radius: 8px;
            padding: 15px;
            background-color: #fff;
            flex: 1;
        }

        .process-title {
            font-weight: bold;
            margin-bottom: 10px;
            font-size: 18px;
            text-align: center;
        }

        .thread {
            border: 1px dashed #999;
            border-radius: 5px;
            padding: 10px;
            margin: 10px 0;
            background-color: #f9f9f9;
        }

        .thread-title {
            font-weight: bold;
            margin-bottom: 5px;
        }

        .action {
            font-size: 14px;
            margin: 5px 0;
            padding: 5px;
            background-color: #d4f4dd;
            border: 1px solid #888;
            border-radius: 3px;
        }

        .shared-resources {
            margin-top: 15px;
            padding: 10px;
            border: 2px dotted #666;
            background-color: #ffefd5;
            text-align: center;
            border-radius: 8px;
        }
    </style>
</head>
<body>
    <div class="os-container">
        <div class="title">Operating System</div>
        
        <div class="shared-resources">
            <div class="resource">Shared Resource: Disk</div>
            <div class="resource">Shared Resource: Network</div>
            <div class="resource">Shared Resource: Memory</div>
        </div>
        
        <div class="process-container">
            <!-- Process 1 -->
            <div class="process">
                <div class="process-title">Process 1</div>
                <div class="thread">
                    <div class="thread-title">Thread 1.1</div>
                    <div class="action">Reading data from disk</div>
                </div>
                <div class="thread">
                    <div class="thread-title">Thread 1.2</div>
                    <div class="action">Processing data in memory</div>
                </div>
            </div>

            <!-- Process 2 -->
            <div class="process">
                <div class="process-title">Process 2</div>
                <div class="thread">
                    <div class="thread-title">Thread 2.1</div>
                    <div class="action">Sending data over network</div>
                </div>
                <div class="thread">
                    <div class="thread-title">Thread 2.2</div>
                    <div class="action">Logging results to disk</div>
                </div>
            </div>
        </div>
    </div>
</body>
</html>


### Как это выглядит на уровне выполнения программ
На уровне операционной системы:
1. **Процессы** изолированы друг от друга. Операционная система выделяет каждому процессу своё адресное пространство, что предотвращает их вмешательство друг в друга.
2. **Потоки** в пределах одного процесса совместно используют память и другие ресурсы процесса. Это облегчает обмен данными, но требует синхронизации для предотвращения ошибок.

#### Пример: многозадачность
Представьте, что вы пишете программу, которая одновременно скачивает несколько файлов из интернета. Реализация:
- С использованием **процессов**: каждому скачиванию соответствует отдельный процесс. Это эффективно для использования всех ядер процессора, но требует больше памяти.
- С использованием **потоков**: скачивания выполняются в одном процессе, но отдельными потоками. Это меньше нагружает систему, но может быть ограничено GIL в Python.

---

### Сравнение потоков и процессов на практике

#### Ключевые отличия:
1. **Изоляция:**
   - **Процессы:** изолированы, не имеют доступа к памяти друг друга. Для обмена данными нужно использовать межпроцессное взаимодействие (например, `Queue` или `Pipe`).
   - **Потоки:** имеют общий доступ к памяти процесса, что облегчает обмен данными, но требует синхронизации.

2. **Создание и управление:**
   - **Процессы:** создаются дольше и используют больше памяти, так как каждый процесс имеет своё адресное пространство.
   - **Потоки:** создаются быстрее и используют меньше памяти, так как разделяют ресурсы процесса.

3. **GIL в Python:**
   - В Python потоки ограничены глобальной блокировкой интерпретатора (GIL), из-за чего только один поток может выполнять Python-код одновременно, даже на многоядерных процессорах.
   - Процессы обходят ограничение GIL, так как каждый процесс имеет собственный интерпретатор.

#### Пример: параллельное вычисление суммы квадратов
##### Реализация с потоками


In [None]:
import threading


def calculate_squares(numbers):
    for n in numbers:
        print(f"Квадрат {n}: {n * n}")


nums = [1, 2, 3, 4, 5]
thread1 = threading.Thread(target=calculate_squares, args=(nums[:3],))
thread2 = threading.Thread(target=calculate_squares, args=(nums[3:],))

thread1.start()
thread2.start()

thread1.join()
thread2.join()
print("Все вычисления завершены.")


Квадрат 4: 16
Квадрат 5: 25
Квадрат 1: 1
Квадрат 2: 4
Квадрат 3: 9
Все вычисления завершены.


**Вывод:**
Потоки работают в пределах одного процесса, но из-за GIL Python-код выполняется поочерёдно.

##### Реализация с процессами


In [7]:
from multiprocessing import Process


def calculate_squares(numbers):
    for n in numbers:
        print(f"Квадрат {n}: {n * n}")


nums = [1, 2, 3, 4, 5, 6]
process1 = Process(target=calculate_squares, args=(nums[:3],))
process2 = Process(target=calculate_squares, args=(nums[3:],))

process1.start()
process2.start()

process1.join()
process2.join()
print("Все вычисления завершены.")


Квадрат 1: 1
Квадрат 4: 16Квадрат 2: 4

Квадрат 5: 25
Квадрат 3: 9Квадрат 6: 36

Все вычисления завершены.


**Вывод:**
Процессы выполняются параллельно, используя разные ядра процессора, что даёт прирост производительности.

---



В Python, потоки (threads) из модуля `threading` не выполняются **полностью параллельно** для вычислительных задач из-за **GIL (Global Interpreter Lock)**, но они **могут выполняться параллельно для операций ввода-вывода (I/O)**. Давайте разберёмся в деталях:

---

##### **1. Почему потоки в Python выполняются поочерёдно для вычислений?**
Python использует **GIL (Global Interpreter Lock)** — это механизм, который позволяет только одному потоку исполнять Python-байт-код в любой момент времени. Причины существования GIL:
- Упрощение работы с памятью для интерпретатора CPython.
- Обеспечение потокобезопасности при управлении объектами.

**Результат:**  
- Если поток выполняет чисто вычислительные задачи (например, численные вычисления или обработку данных), потоки будут выполняться поочерёдно. Это из-за GIL, который блокирует одновременное выполнение байт-кода. 
- Потоки уступают друг другу управление только в определённые моменты, например, при ожидании.

---

##### **2. Когда потоки могут работать параллельно?**
Потоки из модуля `threading` могут быть полезны для задач, связанных с **вводом-выводом (I/O)**, таких как:
- Чтение/запись файлов.
- Работа с сетевыми соединениями (например, загрузка данных с интернета).
- Взаимодействие с базами данных.
- Обработка ввода/вывода от пользователя.

**Почему?**  
В таких задачах поток часто находится в состоянии ожидания данных (например, загрузка файла с сети). Во время ожидания GIL временно освобождается, позволяя другим потокам работать. Таким образом, задачи I/O могут выполняться параллельно.

---

##### **3. Что такое I/O операции и в чём их специфика?**
I/O (Input/Output) операции связаны с взаимодействием программы с внешними устройствами или системами.  
Примеры:
- **Input:** чтение из файла, получение данных из сети, обработка пользовательского ввода.
- **Output:** запись в файл, отправка данных в сеть, вывод на экран.

**Специфика I/O операций:**
1. **Зависимость от внешних систем:** Скорость I/O операций определяется скоростью внешнего устройства, а не процессора (например, скорость чтения с диска или сети).
2. **Блокирующий характер:** Когда программа ожидает завершения I/O операции, она "блокируется" (ничего не делает) до получения результата.
3. **Ожидание доступности ресурсов:** Например, программа ждёт, пока данные не будут переданы через сетевое соединение.

**Пример:**
- При загрузке большого файла с сервера потоки, ожидающие завершения загрузки, освобождают GIL, позволяя другим потокам продолжить выполнение.

---

##### **4. Когда использовать `threading` в Python?**
Использование `threading` оправдано, если:
- Вы работаете с задачами I/O, где основная часть времени уходит на ожидание.
- Нужно обрабатывать несколько I/O операций параллельно (например, скачивание файлов).

Для вычислительных задач лучше использовать:
- **Модуль `multiprocessing`:** Выполняет задачи в отдельных процессах, которые не зависят от GIL.
- **Сторонние библиотеки:** Например, NumPy или TensorFlow, которые реализуют вычисления вне GIL.

---

#### Задача: загрузка данных с нескольких сайтов
Допустим, нужно загрузить содержимое нескольких веб-страниц.

##### Реализация с потоками


In [8]:
import threading
import requests


def fetch_url(url):
    response = requests.get(url)
    print(f"URL: {url}, длина содержимого: {len(response.text)}")


urls = [
    "https://www.example.com",
    "https://www.python.org",
    "https://www.openai.com"
]

threads = [threading.Thread(target=fetch_url, args=(url,)) for url in urls]

for thread in threads:
    thread.start()

for thread in threads:
    thread.join()


URL: https://www.python.org, длина содержимого: 51129
URL: https://www.openai.com, длина содержимого: 12871
URL: https://www.example.com, длина содержимого: 1256


**Подходит для задач с большим количеством ввода-вывода (I/O).**

##### Реализация с процессами


In [9]:
from multiprocessing import Process
import requests


def fetch_url(url):
    response = requests.get(url)
    print(f"URL: {url}, длина содержимого: {len(response.text)}")


urls = [
    "https://www.example.com",
    "https://www.python.org",
    "https://www.openai.com"
]

processes = [Process(target=fetch_url, args=(url,)) for url in urls]

for process in processes:
    process.start()

for process in processes:
    process.join()


URL: https://www.python.org, длина содержимого: 51129
URL: https://www.openai.com, длина содержимого: 12871
URL: https://www.example.com, длина содержимого: 1256


**Эффективно для загрузки больших объёмов данных, особенно если работа выполняется на многопроцессорной системе.**

---

### Механизмы синхронизации для потоков и процессов

Когда несколько потоков или процессов работают с общими данными, важно обеспечить их синхронизацию, чтобы избежать ошибок, таких как одновременная запись в одну переменную. Python предоставляет несколько инструментов для управления синхронизацией.

---

### Синхронизация потоков: `threading.Lock`

**Lock** (блокировка) — это простой механизм для управления доступом к общим ресурсам. Он работает как «ключ»: пока один поток удерживает блокировку, другие потоки не могут получить доступ к заблокированному ресурсу.

#### Пример: использование `threading.Lock`


In [10]:
import threading

balance = 0  # Общий ресурс
lock = threading.Lock()

def deposit(amount):
    global balance
    with lock:  # Блокировка доступа к общему ресурсу
        local_balance = balance
        local_balance += amount
        balance = local_balance
        print(f"Баланс после депозита {amount}: {balance}")

threads = [threading.Thread(target=deposit, args=(100,)) for _ in range(5)]

for thread in threads:
    thread.start()

for thread in threads:
    thread.join()

print(f"Итоговый баланс: {balance}")


Баланс после депозита 100: 100
Баланс после депозита 100: 200
Баланс после депозита 100: 300
Баланс после депозита 100: 400
Баланс после депозита 100: 500
Итоговый баланс: 500


**Объяснение:**
- Без блокировки доступ к `balance` мог бы вызвать ошибки (например, два потока одновременно читают и записывают в переменную).
- С `lock` потоки получают доступ к переменной поочерёдно.



In [11]:
import threading
import random
import time

class Bank:
    def __init__(self):
        self.balance = 0
        self.lock = threading.Lock()

    def deposit(self):
        for _ in range(100):
            amount = random.randint(50, 500)
            with self.lock:  # Гарантируем эксклюзивный доступ
                self.balance += amount
                print(f"Пополнение: {amount}. Баланс: {self.balance}")
            time.sleep(0.001)  # Имитируем задержку

    def take(self):
        for _ in range(100):
            amount = random.randint(50, 500)
            print(f"Запрос на {amount}")
            with self.lock:  # Гарантируем эксклюзивный доступ
                if amount <= self.balance:
                    self.balance -= amount
                    print(f"Снятие: {amount}. Баланс: {self.balance}")
                else:
                    print("Запрос отклонён, недостаточно средств")
            time.sleep(0.001)  # Имитируем задержку

# Создаем объект класса Bank
bk = Bank()

# Создаем потоки для методов deposit и take
th1 = threading.Thread(target=Bank.deposit, args=(bk,))
th2 = threading.Thread(target=Bank.take, args=(bk,))

# Запускаем потоки
th1.start()
th2.start()

# Ждем завершения потоков
th1.join()
th2.join()

# Итоговый баланс
print(f"Итоговый баланс: {bk.balance}")

Пополнение: 315. Баланс: 315
Пополнение: 146. Баланс: 461
Пополнение: 350. Баланс: 811
Запрос на 253
Снятие: 253. Баланс: 558
Пополнение: 221. Баланс: 779
Запрос на 116
Снятие: 116. Баланс: 663
Пополнение: 151. Баланс: 814
Запрос на 333
Снятие: 333. Баланс: 481
Запрос на 486
Запрос отклонён, недостаточно средств
Пополнение: 197. Баланс: 678
Пополнение: 153. Баланс: 831
Запрос на 116
Снятие: 116. Баланс: 715
Пополнение: 230. Баланс: 945
Запрос на 189
Снятие: 189. Баланс: 756
Запрос на 379
Снятие: 379. Баланс: 377
Пополнение: 79. Баланс: 456
Пополнение: 276. Баланс: 732
Запрос на 241
Снятие: 241. Баланс: 491
Запрос на 355
Снятие: 355. Баланс: 136
Пополнение: 227. Баланс: 363
Запрос на 334
Снятие: 334. Баланс: 29
Пополнение: 306. Баланс: 335
Пополнение: 378. Баланс: 713
Пополнение: 51. Баланс: 764
Запрос на 51
Снятие: 51. Баланс: 713
Запрос на 218
Снятие: 218. Баланс: 495
Пополнение: 455. Баланс: 950
Запрос на 283
Снятие: 283. Баланс: 667
Запрос на 441
Снятие: 441. Баланс: 226
Пополнение:

---

### Синхронизация процессов: `multiprocessing.Lock`

Для процессов также доступна блокировка. Она работает аналогично, но синхронизирует доступ между процессами.

#### Пример: использование `multiprocessing.Lock`


In [12]:

from multiprocessing import Process, Lock

balance = 0
lock = Lock()

def deposit(amount):
    global balance
    with lock:
        local_balance = balance
        local_balance += amount
        balance = local_balance
        print(f"Баланс после депозита {amount}: {balance}")

processes = [Process(target=deposit, args=(100,)) for _ in range(5)]

for process in processes:
    process.start()

for process in processes:
    process.join()

print(f"Итоговый баланс: {balance}")


Баланс после депозита 100: 100
Баланс после депозита 100: 100
Баланс после депозита 100: 100
Баланс после депозита 100: 100
Баланс после депозита 100: 100
Итоговый баланс: 0


**Объяснение:**
- Процессы используют отдельное адресное пространство, поэтому глобальные переменные не передаются между ними напрямую. Код выше продемонстрирует ограничения, так как `balance` остаётся равным `0`.
- Чтобы корректно синхронизировать данные между процессами, используют **общую память** или **очереди**.

---



### Очереди: безопасный обмен данными между потоками и процессами

### Очереди в Python: что это такое?

Очереди (`Queue`) — это структуры данных, которые работают по принципу "первым вошел — первым вышел" (FIFO). Они часто используются для безопасной передачи данных между потоками или процессами.

В Python есть две основные реализации очередей:
1. **`queue.Queue`** (для многопоточности, из модуля `queue`).
2. **`multiprocessing.Queue`** (для мультипроцессинга, из модуля `multiprocessing`).

---

### Различия между `queue.Queue` и `multiprocessing.Queue`

1. **`queue.Queue` (потоковая очередь):**
   - Предназначена для обмена данными между потоками в одном процессе.
   - Потокобезопасна: реализует блокировки (`Lock`), чтобы избежать одновременного изменения данных несколькими потоками.
   - Нельзя использовать для передачи данных между процессами, так как потоки разделяют память, а процессы — нет.

2. **`multiprocessing.Queue` (процессная очередь):**
   - Предназначена для обмена данными между процессами.
   - Реализована через механизмы IPC (межпроцессное взаимодействие) и использует каналы (pipes) и блокировки.
   - Учитывает изоляцию памяти процессов, копируя данные между ними.

---


#### Очередь для потоков: `queue.Queue`
**Очередь** предоставляет безопасный способ обмена данными между потоками, автоматически управляя блокировкой доступа.

##### Пример: использование `queue.Queue`


In [13]:
import threading
import queue

q = queue.Queue()

def producer():
    for i in range(5):
        q.put(i)  # Добавление элемента в очередь
        print(f"Производитель добавил: {i}")

def consumer():
    while not q.empty():
        item = q.get()  # Получение элемента из очереди
        print(f"Потребитель обработал: {item}")

producer_thread = threading.Thread(target=producer)
consumer_thread = threading.Thread(target=consumer)

producer_thread.start()
producer_thread.join()  # Ждём завершения производителя

consumer_thread.start()
consumer_thread.join()  # Ждём завершения потребителя


Производитель добавил: 0
Производитель добавил: 1
Производитель добавил: 2
Производитель добавил: 3
Производитель добавил: 4
Потребитель обработал: 0
Потребитель обработал: 1
Потребитель обработал: 2
Потребитель обработал: 3
Потребитель обработал: 4


---

#### Очередь для процессов: `multiprocessing.Queue`
`multiprocessing.Queue` позволяет обмениваться данными между процессами.

##### Пример: использование `multiprocessing.Queue`


In [14]:
from multiprocessing import Process, Queue

def producer(q):
    for i in range(5):
        q.put(i)  # Добавление элемента в очередь
        print(f"Производитель добавил: {i}")

def consumer(q):
    while not q.empty():
        item = q.get()  # Получение элемента из очереди
        print(f"Потребитель обработал: {item}")

queue = Queue()

producer_process = Process(target=producer, args=(queue,))
consumer_process = Process(target=consumer, args=(queue,))

producer_process.start()
producer_process.join()  # Ждём завершения производителя

consumer_process.start()
consumer_process.join()  # Ждём завершения потребителя


Производитель добавил: 0
Производитель добавил: 1
Производитель добавил: 2
Производитель добавил: 3
Производитель добавил: 4
Потребитель обработал: 0
Потребитель обработал: 1
Потребитель обработал: 2
Потребитель обработал: 3
Потребитель обработал: 4


**Особенности:**
- Очереди безопасны и управляют синхронизацией автоматически.
- Подходят для передачи данных между потоками и процессами.

---



### Управление пулом потоков и процессов

Когда необходимо выполнять множество задач, создание большого количества потоков или процессов может быть ресурсоёмким. Для эффективного управления используется **пул потоков** или **пул процессов**. Python предоставляет инструменты для работы с пулами в модулях `concurrent.futures` и `multiprocessing`.

---

### Пул потоков: `concurrent.futures.ThreadPoolExecutor`

**ThreadPoolExecutor** создаёт ограниченное количество потоков и использует их повторно для выполнения задач. Это экономит ресурсы и снижает накладные расходы.

#### Пример: использование пула потоков


In [15]:
from concurrent.futures import ThreadPoolExecutor
import time

def worker(task_id):
    print(f"Поток начал работу над задачей {task_id}")
    time.sleep(2)
    print(f"Поток завершил работу над задачей {task_id}")

tasks = range(5)

# Создаём пул из 3 потоков
with ThreadPoolExecutor(max_workers=3) as executor:
    executor.map(worker, tasks)  # Передаём задачи в пул


Поток начал работу над задачей 0
Поток начал работу над задачей 1
Поток начал работу над задачей 2
Поток завершил работу над задачей 0
Поток начал работу над задачей 3
Поток завершил работу над задачей 1
Поток начал работу над задачей 4
Поток завершил работу над задачей 2
Поток завершил работу над задачей 3
Поток завершил работу над задачей 4


**Объяснение:**
- Мы ограничиваем количество одновременно выполняющихся потоков до трёх.
- Метод `executor.map` автоматически распределяет задачи между потоками.

---

### Пул процессов: `concurrent.futures.ProcessPoolExecutor` (более верхнеуровневый и рекомендуемый)

**ProcessPoolExecutor** создаёт ограниченное количество процессов и распределяет задачи между ними. Это эффективно для вычислительно затратных задач, так как процессы используют все доступные ядра процессора.

#### Пример: использование пула процессов


In [16]:
from concurrent.futures import ProcessPoolExecutor
import time

def worker(task_id):
    print(f"Процесс {task_id} начал работу")
    time.sleep(2)
    print(f"Процесс {task_id} завершил работу")

tasks = range(5)

# Создаём пул из 3 процессов
with ProcessPoolExecutor(max_workers=3) as executor:
    executor.map(worker, tasks)  # Передаём задачи в пул


Процесс 0 начал работуПроцесс 2 начал работу

Процесс 1 начал работу
Процесс 0 завершил работуПроцесс 2 завершил работуПроцесс 1 завершил работу

Процесс 3 начал работу
Процесс 4 начал работу

Процесс 3 завершил работу
Процесс 4 завершил работу


**Объяснение:**
- Каждый процесс работает независимо, что позволяет эффективно использовать все ядра процессора.
- Подходит для вычислений, где Python сталкивается с ограничением GIL.

---

### Пулы из модуля `multiprocessing`

Модуль `multiprocessing` предоставляет класс `Pool`, который можно использовать для управления процессами.

#### Пример: пул процессов с `multiprocessing.Pool`


In [17]:
from multiprocessing import Pool
import time

def worker(task_id):
    print(f"Процесс {task_id} начал работу")
    time.sleep(2)
    print(f"Процесс {task_id} завершил работу")
    return task_id * 2  # Возвращаем результат

tasks = range(5)

# Создаём пул из 3 процессов
with Pool(processes=3) as pool:
    results = pool.map(worker, tasks)  # Выполняем задачи
    print(f"Результаты: {results}")


Процесс 0 начал работуПроцесс 2 начал работуПроцесс 1 начал работу


Процесс 2 завершил работу
Процесс 0 завершил работуПроцесс 3 начал работу
Процесс 4 начал работу
Процесс 1 завершил работу

Процесс 3 завершил работуПроцесс 4 завершил работу

Результаты: [0, 2, 4, 6, 8]


**Особенности:**
- Метод `pool.map` принимает список задач и возвращает результаты их выполнения.
- Каждый процесс работает независимо.

---

### Сравнение потоков и процессов в пулах

| Характеристика           | ThreadPoolExecutor                     | ProcessPoolExecutor                    |
|--------------------------|-----------------------------------------|----------------------------------------|
| Подходит для задач       | I/O-задачи (сеть, файлы)               | Вычислительные задачи                  |
| Использование памяти     | Низкое (разделяют память процесса)     | Высокое (каждый процесс изолирован)    |
| Скорость создания задач  | Высокая                                | Низкая (создание процессов дороже)     |
| Преимущества             | Быстрая и лёгкая многозадачность       | Параллелизм на уровне процессора       |

---

### Пример: скачивание файлов с использованием пула



#### С помощью потоков


In [18]:
from concurrent.futures import ThreadPoolExecutor
import requests

def fetch_url(url):
    response = requests.get(url)
    print(f"URL: {url}, длина содержимого: {len(response.text)}")

urls = [
    "https://www.example.com",
    "https://www.python.org",
    "https://www.openai.com",
    "https://www.wikipedia.org",
    "https://www.github.com"
]

# Пул из 3 потоков
with ThreadPoolExecutor(max_workers=3) as executor:
    executor.map(fetch_url, urls)


URL: https://www.openai.com, длина содержимого: 12871
URL: https://www.python.org, длина содержимого: 51129
URL: https://www.wikipedia.org, длина содержимого: 100847
URL: https://www.example.com, длина содержимого: 1256
URL: https://www.github.com, длина содержимого: 267371


#### С помощью процессов


In [19]:
from concurrent.futures import ProcessPoolExecutor
import requests

def fetch_url(url):
    response = requests.get(url)
    print(f"URL: {url}, длина содержимого: {len(response.text)}")

urls = [
    "https://www.example.com",
    "https://www.python.org",
    "https://www.openai.com",
    "https://www.wikipedia.org",
    "https://www.github.com"
]

# Пул из 3 процессов
with ProcessPoolExecutor(max_workers=3) as executor:
    executor.map(fetch_url, urls)


URL: https://www.openai.com, длина содержимого: 12871
URL: https://www.python.org, длина содержимого: 51129
URL: https://www.wikipedia.org, длина содержимого: 100847
URL: https://www.example.com, длина содержимого: 1256
URL: https://www.github.com, длина содержимого: 267397


### Очереди с `ThreadPoolExecutor`

Очередь используется для передачи данных между задачами в пулах потоков. Это полезно, если результаты работы одной задачи должны быть обработаны другой задачей.

#### Пример: обработка данных с использованием `queue.Queue` и пула потоков


In [20]:
from concurrent.futures import ThreadPoolExecutor
import queue

data_queue = queue.Queue()
results_queue = queue.Queue()

# Функция производитель
def producer(data):
    print(f"Производитель: добавляет {data} в очередь")
    data_queue.put(data)

# Функция потребитель
def consumer():
    while not data_queue.empty():
        item = data_queue.get()
        result = item * 2
        print(f"Потребитель: обработал {item} -> {result}")
        results_queue.put(result)

# Задачи для обработки
tasks = [1, 2, 3, 4, 5]

# Пул из 2 потоков
with ThreadPoolExecutor(max_workers=2) as executor:
    # Производим данные
    executor.map(producer, tasks)
    # Обрабатываем данные
    executor.submit(consumer).result()  # Ожидаем завершения обработки

# Вывод результатов
print("Результаты:", list(results_queue.queue))


Производитель: добавляет 1 в очередь
Производитель: добавляет 2 в очередь
Производитель: добавляет 3 в очередь
Производитель: добавляет 4 в очередь
Производитель: добавляет 5 в очередь
Потребитель: обработал 1 -> 2
Потребитель: обработал 2 -> 4
Потребитель: обработал 3 -> 6
Потребитель: обработал 4 -> 8
Потребитель: обработал 5 -> 10
Результаты: [2, 4, 6, 8, 10]


---

### Пример с несколькими производителями и потребителями

In [21]:
from concurrent.futures import ThreadPoolExecutor
import queue
import time

data_queue = queue.Queue()

def producer(name, count):
    for i in range(count):
        item = f"{name}-{i}"
        print(f"Производитель {name} добавил: {item}")
        data_queue.put(item)
        time.sleep(0.5)

def consumer():
    while not data_queue.empty():
        item = data_queue.get()
        print(f"Потребитель обработал: {item}")
        time.sleep(1)

# Пул из 4 потоков
with ThreadPoolExecutor(max_workers=4) as executor:
    # Два производителя
    executor.submit(producer, "P1", 3)
    executor.submit(producer, "P2", 3)

    # Два потребителя
    executor.submit(consumer)
    executor.submit(consumer)


Производитель P1 добавил: P1-0


Производитель P2 добавил: P2-0
Потребитель обработал: P1-0
Потребитель обработал: P2-0
Производитель P1 добавил: P1-1
Производитель P2 добавил: P2-1
Производитель P1 добавил: P1-2
Потребитель обработал: P1-1
Потребитель обработал: P2-1
Производитель P2 добавил: P2-2
Потребитель обработал: P1-2
Потребитель обработал: P2-2


---

### Очереди с `ProcessPoolExecutor`

Для процессов используется `multiprocessing.Queue`, которая обеспечивает безопасный обмен данными между процессами.
Однако использовать подобную структуру с `ProcessPoolExecutor` нельзя :(


#### С очередями в процессах используем стандартный `Process`


In [22]:
from multiprocessing import Process, Queue
import time

def producer(queue, name, count):
    for i in range(count):
        item = f"{name}-{i}"
        print(f"Производитель {name} добавил: {item}")
        queue.put(item)
        time.sleep(0.5)

def consumer(queue):
    while not queue.empty():
        item = queue.get()
        print(f"Потребитель обработал: {item}")
        time.sleep(1)

if __name__ == "__main__":
    data_queue = Queue()

    # Производители
    producer1 = Process(target=producer, args=(data_queue, "P1", 3))
    producer2 = Process(target=producer, args=(data_queue, "P2", 3))

    # Потребители
    consumer1 = Process(target=consumer, args=(data_queue,))
    consumer2 = Process(target=consumer, args=(data_queue,))

    producer1.start()
    producer2.start()

    producer1.join()
    producer2.join()

    consumer1.start()
    consumer2.start()

    consumer1.join()
    consumer2.join()


Производитель P1 добавил: P1-0
Производитель P2 добавил: P2-0
Производитель P1 добавил: P1-1Производитель P2 добавил: P2-1

Производитель P1 добавил: P1-2
Производитель P2 добавил: P2-2
Потребитель обработал: P1-0
Потребитель обработал: P2-0
Потребитель обработал: P1-1
Потребитель обработал: P2-1
Потребитель обработал: P1-2
Потребитель обработал: P2-2


### Построение пайплайна производитель-потребитель

Пайплайн "производитель-потребитель" — это классическая архитектура, где производители генерируют данные, а потребители обрабатывают их. В пайплайне могут быть несколько этапов, каждый из которых выполняет свою часть работы. В Python можно реализовать такие пайплайны с помощью потоков или процессов, используя очереди для передачи данных.

---

### Пример пайплайна с потоками

#### Этапы:
1. **Производитель** генерирует числа.
2. **Промежуточный процесс** обрабатывает числа, умножая их на 2.
3. **Потребитель** сохраняет результаты.



In [23]:
import threading
import queue
import time

# Очереди для передачи данных между этапами
stage1_queue = queue.Queue()
stage2_queue = queue.Queue()

# Производитель
def producer():
    for i in range(10):
        print(f"Производитель создал: {i}")
        stage1_queue.put(i)
        time.sleep(0.5)
    stage1_queue.put(None)  # Сигнал завершения

# Промежуточный обработчик
def processor():
    while True:
        item = stage1_queue.get()
        if item is None:  # Завершаем, если пришёл сигнал завершения
            stage2_queue.put(None)
            break
        processed = item * 2
        print(f"Обработчик преобразовал: {item} -> {processed}")
        stage2_queue.put(processed)

# Потребитель
def consumer():
    while True:
        item = stage2_queue.get()
        if item is None:
            break
        print(f"Потребитель получил: {item}")

# Создаём и запускаем поimport threadingтоки
producer_thread = threading.Thread(target=producer)
processor_thread = threading.Thread(target=processor)
consumer_thread = threading.Thread(target=consumer)

producer_thread.start()
processor_thread.start()
consumer_thread.start()

producer_thread.join()
processor_thread.join()
consumer_thread.join()


Производитель создал: 0
Обработчик преобразовал: 0 -> 0
Потребитель получил: 0
Производитель создал: 1
Обработчик преобразовал: 1 -> 2
Потребитель получил: 2
Производитель создал: 2
Обработчик преобразовал: 2 -> 4
Потребитель получил: 4
Производитель создал: 3
Обработчик преобразовал: 3 -> 6
Потребитель получил: 6
Производитель создал: 4
Обработчик преобразовал: 4 -> 8
Потребитель получил: 8
Производитель создал: 5
Обработчик преобразовал: 5 -> 10
Потребитель получил: 10
Производитель создал: 6
Обработчик преобразовал: 6 -> 12
Потребитель получил: 12
Производитель создал: 7
Обработчик преобразовал: 7 -> 14
Потребитель получил: 14
Производитель создал: 8
Обработчик преобразовал: 8 -> 16
Потребитель получил: 16
Производитель создал: 9
Обработчик преобразовал: 9 -> 18
Потребитель получил: 18


#### А теперь то же самое через `ThreadPoolExecutor`

In [24]:
from concurrent.futures import ThreadPoolExecutor
import queue
import time

# Очереди для передачи данных между этапами
stage1_queue = queue.Queue()
stage2_queue = queue.Queue()

# Производитель
def producer():
    for i in range(10):
        print(f"Производитель создал: {i}")
        stage1_queue.put(i)
        time.sleep(0.5)
    stage1_queue.put(None)  # Сигнал завершения

# Промежуточный обработчик
def processor():
    while True:
        item = stage1_queue.get()
        if item is None:  # Завершаем, если пришёл сигнал завершения
            stage2_queue.put(None)
            break
        processed = item * 2
        print(f"Обработчик преобразовал: {item} -> {processed}")
        stage2_queue.put(processed)

# Потребитель
def consumer():
    while True:
        item = stage2_queue.get()
        if item is None:
            break
        print(f"Потребитель получил: {item}")

# Используем ThreadPoolExecutor для управления потоками
with ThreadPoolExecutor(max_workers=3) as executor:
    # Запускаем задачи в пуле потоков
    executor.submit(producer)  # Производитель
    executor.submit(processor)  # Промежуточный обработчик
    executor.submit(consumer)  # Потребитель
    

Производитель создал: 0
Обработчик преобразовал: 0 -> 0
Потребитель получил: 0
Производитель создал: 1
Обработчик преобразовал: 1 -> 2
Потребитель получил: 2
Производитель создал: 2
Обработчик преобразовал: 2 -> 4
Потребитель получил: 4
Производитель создал: 3
Обработчик преобразовал: 3 -> 6
Потребитель получил: 6
Производитель создал: 4
Обработчик преобразовал: 4 -> 8
Потребитель получил: 8
Производитель создал: 5
Обработчик преобразовал: 5 -> 10
Потребитель получил: 10
Производитель создал: 6
Обработчик преобразовал: 6 -> 12
Потребитель получил: 12
Производитель создал: 7
Обработчик преобразовал: 7 -> 14
Потребитель получил: 14
Производитель создал: 8
Обработчик преобразовал: 8 -> 16
Потребитель получил: 16
Производитель создал: 9
Обработчик преобразовал: 9 -> 18
Потребитель получил: 18


---

### Пример пайплайна с процессами

#### Этапы:
1. **Производитель** генерирует числа.
2. **Промежуточный процесс** вычисляет квадрат числа.
3. **Потребитель** сохраняет результаты в список.



In [25]:
from multiprocessing import Process, Queue
import time

def producer(queue):
    for i in range(10):
        print(f"Производитель создал: {i}")
        queue.put(i)
        time.sleep(0.5)
    queue.put(None)  # Сигнал завершения

def processor(input_queue, output_queue):
    while True:
        item = input_queue.get()
        if item is None:
            output_queue.put(None)
            break
        processed = item ** 2
        print(f"Обработчик преобразовал: {item} -> {processed}")
        output_queue.put(processed)

def consumer(queue):
    results = []
    while True:
        item = queue.get()
        if item is None:
            break
        print(f"Потребитель получил: {item}")
        results.append(item)
    print(f"Итоговый результат: {results}")

if __name__ == "__main__":
    stage1_queue = Queue()
    stage2_queue = Queue()

    producer_process = Process(target=producer, args=(stage1_queue,))
    processor_process = Process(target=processor, args=(stage1_queue, stage2_queue))
    consumer_process = Process(target=consumer, args=(stage2_queue,))

    producer_process.start()
    processor_process.start()
    consumer_process.start()

    producer_process.join()
    processor_process.join()
    consumer_process.join()


Производитель создал: 0
Обработчик преобразовал: 0 -> 0
Потребитель получил: 0
Производитель создал: 1
Обработчик преобразовал: 1 -> 1
Потребитель получил: 1
Производитель создал: 2
Обработчик преобразовал: 2 -> 4
Потребитель получил: 4
Производитель создал: 3
Обработчик преобразовал: 3 -> 9
Потребитель получил: 9
Производитель создал: 4
Обработчик преобразовал: 4 -> 16
Потребитель получил: 16
Производитель создал: 5
Обработчик преобразовал: 5 -> 25
Потребитель получил: 25
Производитель создал: 6
Обработчик преобразовал: 6 -> 36
Потребитель получил: 36
Производитель создал: 7
Обработчик преобразовал: 7 -> 49
Потребитель получил: 49
Производитель создал: 8
Обработчик преобразовал: 8 -> 64
Потребитель получил: 64
Производитель создал: 9
Обработчик преобразовал: 9 -> 81
Потребитель получил: 81
Итоговый результат: [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]


---

### Многоступенчатый пайплайн

#### Задача: 
Реализовать три этапа:
1. **Производитель** генерирует числа.
2. **Промежуточный обработчик 1** умножает числа на 2.
3. **Промежуточный обработчик 2** добавляет 3 к числам.
4. **Потребитель** собирает и выводит результаты.



In [26]:
from multiprocessing import Process, Queue
import time

def producer(queue):
    for i in range(10):
        print(f"Производитель создал: {i}")
        queue.put(i)
        time.sleep(0.3)
    queue.put(None)

def processor1(input_queue, output_queue):
    while True:
        item = input_queue.get()
        if item is None:
            output_queue.put(None)
            break
        processed = item * 2
        print(f"Обработчик 1 преобразовал: {item} -> {processed}")
        output_queue.put(processed)

def processor2(input_queue, output_queue):
    while True:
        item = input_queue.get()
        if item is None:
            output_queue.put(None)
            break
        processed = item + 3
        print(f"Обработчик 2 преобразовал: {item} -> {processed}")
        output_queue.put(processed)

def consumer(queue):
    results = []
    while True:
        item = queue.get()
        if item is None:
            break
        print(f"Потребитель получил: {item}")
        results.append(item)
    print(f"Итоговый результат: {results}")


if __name__ == "__main__":
    stage1_queue = Queue()
    stage2_queue = Queue()
    stage3_queue = Queue()

    producer_process = Process(target=producer, args=(stage1_queue,))
    processor1_process = Process(target=processor1, args=(stage1_queue, stage2_queue))
    processor2_process = Process(target=processor2, args=(stage2_queue, stage3_queue))
    consumer_process = Process(target=consumer, args=(stage3_queue,))

    producer_process.start()
    processor1_process.start()
    processor2_process.start()
    consumer_process.start()

    producer_process.join()
    processor1_process.join()
    processor2_process.join()
    consumer_process.join()


Производитель создал: 0
Обработчик 1 преобразовал: 0 -> 0
Обработчик 2 преобразовал: 0 -> 3
Потребитель получил: 3
Производитель создал: 1
Обработчик 1 преобразовал: 1 -> 2
Обработчик 2 преобразовал: 2 -> 5
Потребитель получил: 5
Производитель создал: 2
Обработчик 1 преобразовал: 2 -> 4
Обработчик 2 преобразовал: 4 -> 7
Потребитель получил: 7
Производитель создал: 3
Обработчик 1 преобразовал: 3 -> 6
Обработчик 2 преобразовал: 6 -> 9
Потребитель получил: 9
Производитель создал: 4
Обработчик 1 преобразовал: 4 -> 8
Обработчик 2 преобразовал: 8 -> 11
Потребитель получил: 11
Производитель создал: 5
Обработчик 1 преобразовал: 5 -> 10
Обработчик 2 преобразовал: 10 -> 13
Потребитель получил: 13
Производитель создал: 6
Обработчик 1 преобразовал: 6 -> 12
Обработчик 2 преобразовал: 12 -> 15
Потребитель получил: 15
Производитель создал: 7
Обработчик 1 преобразовал: 7 -> 14
Обработчик 2 преобразовал: 14 -> 17
Потребитель получил: 17
Производитель создал: 8
Обработчик 1 преобразовал: 8 -> 16
Обработ

---

### Особенности пайплайнов
1. **Очереди** обеспечивают безопасную передачу данных между этапами.
2. Каждому этапу можно назначить отдельный поток или процесс.
3. Сигналы завершения (например, `None`) необходимы для корректного завершения работы пайплайна.
