Лучшие практики применения многопоточности (общая форма)

In [1]:
from concurrent.futures import ThreadPoolExecutor as Executor

import os
from time import sleep
from threading import Thread

In [2]:
%script False
def worker(data):
    """<process the data>"""
    
with Executor(max_workers=10) as exe:
    future = exe.submit(worker, data)

UsageError: Line magic function `%script` not found (But cell magic `%%script` exists, did you mean that instead?).


### Пример использования: роботы и столовые приборы

In [3]:
import threading
from queue import Queue

In [4]:
class ThreadBot(threading.Thread): #1
    def __init__(self):
        super().__init__(target=self.manage_table) #2
        self.cutlery = Cutlery(knives=0, forks=0) #3
        self.tasks = Queue() #4
        
    def manage_table(self):
        while True: #5
            task = self.tasks.get()
            if task == 'prepare table':
                kitchen.give(to=self.cutlery, knives=4, forks=4) #6
            elif task =='clear table':
                self.cutlery.give(to=kitchen, knives=4, forks=4)
            elif task == 'shutdown':
                return

1. ThreadBot - подкласс Thread
2. manage_table() - целевая функция потока (будет определена позже)
3. Бот будет ждать столики и будет отвечать за некоторые столовые приборы. Каждый бот отслеживает столовые приборы, которые он взял на кухне. (Класс Cutlery будет определен позже.)
4. Перед ботом также будут поставлены задачи. Они будут добавлены в эту очередь задач, и затем бот будет выполнять их во время основного цикла обработки.
5. Основная процедура этого бота - бесконечный цикл. Если вам нужно выключить бота, вы должны дать им задачу выключения.
6. Для этого бота определены всего три задачи. Первая - подготовить стол, - это то, что должен сделать бот, чтобы подготовить новый стол к обслуживанию. Для нашего теста единственное требование - взять с кухни наборы столовых приборов и поставить их на стол. Очистить стол используется, когда стол должен быть очищен: бот должен вернуть использованные столовые приборы обратно на кухню. shutdown просто отключает бота.

#### Определение объекта Cutlery

In [18]:
from attr import attrs, attrib

@attrs #1
class Cutlery:
    knives = attrib(default=0) #2
    forks = attrib(default=0)
    
    def give(self, to: 'Cutlery' ,knives=0, forks=0): #3
        self.change(-knives, -forks)
        to.change(knives, forks)
        
    def change(self, knives, forks): #4
        self.knives += knives
        self.forks += forks

In [40]:
kitchen = Cutlery(knives=100, forks=100) #5
bots = [ThreadBot() for i in range(10)] #6

In [44]:
def test_kitchen(num_tables):
    
    
    for bot in bots:
        for i in range(num_tables): #7
            bot.tasks.put('prepare table')
            bot.tasks.put('clear table')
        bot.tasks.put('shutdown') #8
    
    print('Kitchen inventory before service:', kitchen)

    for bot in bots:
        bot.start()

    for bot in bots:
        bot.join()

    print('Kitchen inventory after service:', kitchen)

1. attrs, библиотека Python с открытым исходным кодом, не имеющая ничего общего с потоками или asyncio, - действительно замечательная библиотека для упрощения создания классов. Здесь декоратор @attrs гарантирует, что этот класс Cutlery обычный шаблонный код (например, __init __ ()) устанавливается автоматически.
2. Функция attrib () предоставляет простой способ создания атрибутов, включая значения по умолчанию, которые вы обычно могли бы обрабатывать как аргументы ключевого слова в __init __ () метод.
3. Этот метод используется для переноса ножей и вилок с одного предмета столовых приборов на другой. Обычно он используется ботами для получения столовых приборов с кухни для новых столов и для возврата столовых приборов на кухню после того, как стол убран.
4. Это очень простая служебная функция для изменения данных инвентаризации в экземпляре объекта.
5. Мы определили кухню. Обычно каждый из ботов получает столовые приборы в этом месте. Также требуется, чтобы они возвращали столовые приборы в этот инвентарь, когда стол убран.
6. Этот скрипт выполняется при тестировании. Для нашего теста мы будем использовать 10 ThreadBots.
7. Мы получаем количество столов в качестве параметра , а затем даем каждому боту это количество задач для подготовки и очистки столов в ресторане.
8. Задача выключения заставит ботов останавливаться (так что bot.join () немного ниже вернется). Остальная часть скрипта выводит диагностические сообщения и запускает ботов.

Стратегия тестирования кода в основном включает в себя запуск группы ThreadBots над последовательностью обслуживания столов. Каждый ThreadBot должен делать следующее:
- Приготовьте «стол на четверых», что означает получение на кухне четырех комплектов ножей и вилок.
- Очистка стола, что означает возвращение набора из четырех ножей и вилок со стола обратно на кухню.
- 

Если вы запускаете группу ThreadBots на нескольких столах определенное количество раз, вы ожидаете, что после того, как вся работа будет сделана, все ножи и вилки вернутся на кухню и будут учтены.

Вы мудро решаете протестировать это с сотней таблиц, которые должны быть подготовлены и очищены каждым ThreadBot, и все они будут работать одновременно, потому что вы хотите убедиться, что они могут работать вместе и ничего не пойдет не так. Это результат этого
контрольная работа:

In [45]:
kitchen = Cutlery(knives=100, forks=100) #5
bots = [ThreadBot() for i in range(10)] #6


NUM_TABLES = 100
test_kitchen(NUM_TABLES)

Kitchen inventory before service: Cutlery(knives=100, forks=100)
Kitchen inventory after service: Cutlery(knives=100, forks=100)


Все ножи и вилки снова оказываются на кухне! Итак, поздравляете себя с написать хороший код и развернуть ботов. К сожалению, на практике то и дело Вы обнаруживаете, что не получаете все столовые приборы, когда ресторан закрывается. Вы замечаете, что проблема усугубляется, когда вы добавляете больше ботов и / или место становится более загруженным. Разочарованный, вы снова запускаете тесты, ничего не меняя, кроме размера теста (10 000 таблиц!):

In [50]:
kitchen = Cutlery(knives=100, forks=100) #5
bots = [ThreadBot() for i in range(10)] #6


NUM_TABLES = 10000
test_kitchen(NUM_TABLES)

Kitchen inventory before service: Cutlery(knives=100, forks=100)
Kitchen inventory after service: Cutlery(knives=100, forks=104)


Подведем итоги ситуации:
- Ваш код ThreadBot очень прост и удобен для чтения. Логика в порядке.
- У вас есть рабочий тест (со 100 таблицами), который воспроизводимо успешно проходит.
- У вас есть более длинный тест (с 10 000 таблиц), который воспроизводимо терпит неудачу.
- Более длинный тест терпит неудачу по разным невоспроизводимым причинам.

Это несколько типичных признаков ошибки состояния гонки. Опытные читатели уже
видели причину, так что давайте рассмотрим это сейчас. Все сводится к этому методу
внутри нашего класса Cutlery:

In [51]:
def change(self, knives, forks):
    self.knives += knives
    self.forks += forks

Встроенное суммирование, + =, реализовано внутри (внутри кода C для самого интерпретатора Python) в виде нескольких отдельных шагов:
1. Считайте текущее значение self.knives во временное место.
2. Добавьте новое значение, ножи, к значению во временном местоположении.
3. Скопируйте новую сумму из временного местоположения обратно в исходное местоположение.

Проблема с вытесняющей многозадачностью заключается в том, что любой поток, занятый этими шагами, может быть прерван в любое время, а другому потоку может быть предоставлена возможность выполнить те же шаги.

В этом случае предположим, что ThreadBot A выполняет шаг 1, а затем планировщик ОС приостанавливает A и переключается на ThreadBot B. B также считывает текущее значение self.knives; затем выполнение возвращается к A. A увеличивает свою общую сумму и записывает ее обратно, но затем B продолжает работу с того места, где она была приостановлена (после шага 1), и она увеличивает и записывает свою новую сумму, тем самым стирая изменение, сделанное A!

Хотя это может показаться сложным, этот пример состояния гонки - это самый простой из возможных случаев. Мы смогли проверить весь код, и у нас даже есть тесты, которые могут воспроизвести проблему по запросу. В реальном мире, в больших проектах, попробуйте представить, насколько это может стать сложнее!

Эту проблему можно решить, установив блокировку на изменение общего
state (представьте, что мы добавили threading.Lock в класс Cutlery):

In [52]:
def change(self, knives, forks):
    with self.lock:
        self.knives += knives
        self.forks += forks

Но для этого необходимо знать все места, где состояние будет разделяться между несколькими потоками. Этот подход жизнеспособен, когда вы контролируете весь исходный код, но становится очень трудным, когда используется много сторонних библиотек - что, вероятно, имеет место в Python благодаря замечательной экосистеме с открытым исходным кодом.