# Кибериммунный подход к разработке. Учебный пример "Светофор"

## Об авторе 

Этот блокнот разработан для вас Сергеем Соболевым, sergey.p.sobolev@kaspersky.com

Больше информации о кибериммунном подходе можно найти на странице https://github.com/sergey-sobolev/cyberimmune-systems/wiki/%D0%9A%D0%B8%D0%B1%D0%B5%D1%80%D0%B8%D0%BC%D0%BC%D1%83%D0%BD%D0%B8%D1%82%D0%B5%D1%82

Подписывайтесь на телеграм-канал @learning_cyberimmunity (https://t.me/learning_cyberimmunity)

Обучающие видео на тему кибериммунного подхода вы можете найти на youtube канале https://www.youtube.com/@learning_cyberimmunity/

## О примере 

Светофор - это, на первый взгляд, очень простая система, но она оказывает критическое влияние на безопасность дорожного движения. 
Применим идеи конструктивной безопасности к архитектуре и реализации прототипа светофора. За отправную точку возьмём [код базового примера](https://colab.research.google.com/github/sergey-sobolev/cyberimmune-systems-basic-demo-notebook01/blob/main/cyberimmunity-basics.ipynb) и немного его переработаем.

Для переработки будем использовать описание архитектуры решения, которое обсуждалось на занятии.

## Простая реализация выбранной политики архитектуры

Будем использовать политику архитектуры, показанную на рис. 1.

![Рис. 1. Политика архитектуры](images/tl-archpol-0.02.png)

Рис. 1. Политика архитектуры светофора

1. Создадим функциональные компоненты (сущности 1-4) и монитор безопасности, который будет контролировать их взаимодействие, в том числе реализовывать контроль конфигураций светофора (сущность №5 на архитектурной диаграмме)
2. Определим политики безопасности
3. Сымитируем запрос на изменение режима для проверки работы всех элементов

- В качестве интерфейса взаимодействия используем очереди сообщений, у каждой сущности есть своя «персональная» очередь, ассоциированная с ней
- Компоненты 1-4 отправляют сообщения только в очередь monitor сущности SecurityMonitor
- SecurityMonitor проверяет сообщения на соответствие политикам безопасности, в случае положительного решения перенаправляет сообщение в очередь соответствующей сущности

В коде назовём сущности следующим образом
1. Связь - CitySystemConnector
2. Система управления светофора - ControlSystem
3. Управление светодиодами - LightsGPIO
4. Система диагностики - SelfDiagnosticsSystem

Логику контроля режимов светофора (компонент №5 на рис. 1) реализуем в виде политик безопасности в мониторе безопасности

![Рис. 2. Политика архитектуры с именами классов](images/tl-archpol-code.png)

Рис. 2. Политика архитектуры с именами классов

Очередь событий для монитора безопасности: все запросы от сущностей друг к другу должны отправляться только в неё

In [1]:
from multiprocessing import Queue
monitor_events_queue = Queue()

Зафиксируем формат сообщений

In [2]:
from dataclasses import dataclass


@dataclass
class Event:
    source: str       # отправитель
    destination: str  # получатель
    operation: str    # чего хочет (запрашиваемое действие)
    parameters: str   # с какими параметрами

### Монитор безопасности

Ниже в методе _check_policies можно увидеть пример политики безопасности:

```python
if event.source == "ControlSystem" \
        and event.destination == "LightsGPIO" \
        and event.operation == "set_mode" \ 
        and self._check_mode(event.operation):
    authorized = True
```            

в этом примере проверяется отправитель сообщения, получатель, запрашиваемая операция и даже параметры операции. Это максимально жёсткий вариант, очевидно, в зависимости от ситуации количество проверок можно уменьшить.

А пока это место для экспериментов, как можно из монитора безопасности заблокировать взаимодействие между сущностями.

In [31]:
from multiprocessing import Queue, Process
from multiprocessing.queues import Empty
import json

# формат управляющих команд для монитора
@dataclass
class ControlEvent:
    operation: str

# список разрешенных сочетаний сигналов светофора
# любые сочетания, отсутствующие в этом списке, запрещены
traffic_lights_allowed_configurations = [
    {"direction_1": "red", "direction_2": "green"},
    {"direction_1": "red", "direction_2": "red"},    
    {"direction_1": "red", "direction_2": "yellow"},    
    {"direction_1": "yellow", "direction_2": "yellow"},    
    {"direction_1": "off", "direction_2": "off"},
    {"direction_1": "green", "direction_2": "red"},    
    {"direction_1": "green", "direction_2": "yellow"},
]


# Класс, реализующий поведение монитора безопасности
class Monitor(Process):

    def __init__(self, events_q: Queue):
        # вызываем конструктор базового класса
        super().__init__()
        self._events_q = events_q  # очередь событий для монитора (входящие сообщения)
        self._control_q = Queue()  # очередь управляющих команд (например, для остановки монитора)
        self._entity_queues = {}   # словарь очередей известных монитору сущностей
        self._force_quit = False   # флаг завершения работы монитора

    # регистрация очереди новой сущности
    def add_entity_queue(self, entity_id: str, queue: Queue):
        print(f"[монитор] регистрируем сущность {entity_id}")
        self._entity_queues[entity_id] = queue

    def _check_mode(self, mode_str: str) -> bool:
        mode_ok = False
        try:
            # извлечём структуру из строки, в случае ошибки запретим изменение режима
            mode = json.loads(mode_str)
            # проверим входит ли запрашиваемый режим в список разрешённых
            print(f"[монитор] проверяем конфигурацию {mode}")
            if mode in traffic_lights_allowed_configurations:
                # такой режим найден, можно активировать
                mode_ok = True
        except:
            mode_ok = False
        return mode_ok

    # проверка политик безопасности        
    def _check_policies(self, event):
        print(f'[монитор] обрабатываем событие {event}')

        # default deny: всё, что не разрешено, запрещено по умолчанию!
        authorized = False

        # проверка на входе, что это экземпляр класса Event, 
        # т.е. имеет ожидаемый формат
        if not isinstance(event, Event):
            return False

        # 
        #  политики безопасности
        #

        # пример политики безопасности
        if event.source == "ControlSystem" \
                and event.destination == "LightsGPIO" \
                and event.operation == "set_mode" \
                and self._check_mode(event.parameters):
            authorized = True

        if authorized is False:
            print("[монитор] событие не разрешено политиками безопасности")
        return authorized

    # выполнение разрешённого запроса
    # метод должен вызываться только после проверки политик безопасности
    def _proceed(self, event):
        print(f'[монитор] отправляем запрос {event}')
        try:
            # найдём очередь получателя события
            dst_q: Queue = self._entity_queues[event.destination]
            # и положим запрос в эту очередь
            dst_q.put(event)
        except  Exception as e:
            # например, запрос пришёл от или для неизвестной сущности
            print(f"[монитор] ошибка выполнения запроса {e}")

    # основной код работы монитора безопасности    
    def run(self):
        print('[монитор] старт')

        # в цикле проверяет наличие новых событий, 
        # выход из цикла по флагу _force_quit
        while self._force_quit is False:
            event = None
            try:
                # ожидание сделано неблокирующим, 
                # чтобы можно было завершить работу монитора, 
                # не дожидаясь нового сообщения
                event = self._events_q.get_nowait()
                # сюда попадаем только в случае получение события,
                # теперь нужно проверить политики безопасности
                authorized = self._check_policies(event)
                if authorized:
                    # если политиками запрос авторизован - выполняем
                    self._proceed(event)
            except Empty:
                # сюда попадаем, если новых сообщений ещё нет,
                # в таком случае немного подождём
                sleep(0.5)
            except Exception as e:
                # что-то пошло не так, выведем сообщение об ошибке
                print(f"[монитор] ошибка обработки {e}, {event}")
            self._check_control_q()
        print('[монитор] завершение работы')

    # запрос на остановку работы монитора безопасности для завершения работы
    # может вызываться вне процесса монитора
    def stop(self):
        # поскольку монитор работает в отдельном процессе,
        # запрос помещается в очередь, которая проверяется из процесса монитора
        request = ControlEvent(operation='stop')
        self._control_q.put(request)

    # проверка наличия новых управляющих команд
    def _check_control_q(self):
        try:
            request: ControlEvent = self._control_q.get_nowait()
            print(f"[монитор] проверяем запрос {request}")
            if isinstance(request, ControlEvent) and request.operation == 'stop':
                # поступил запрос на остановку монитора, поднимаем "красный флаг"
                self._force_quit = True
        except Empty:
            # никаких команд не поступило, ну и ладно
            pass

### Сущность ControlSystem

Эта сущность отправляет сообщение для другой сущности (LightsGPIO)

In [32]:
from multiprocessing import Queue, Process
import json


class ControlSystem(Process):

    def __init__(self, monitor_queue: Queue):
        # вызываем конструктор базового класса
        super().__init__()
        # мы знаем только очередь монитора безопасности для взаимодействия с другими сущностями
        # прямая отправка сообщений в другую сущность запрещена в концепции FLASK
        self.monitor_queue = monitor_queue
        # создаём собственную очередь, в которую монитор сможет положить сообщения для этой сущности
        self._own_queue = Queue()

    # выдаёт собственную очередь для взаимодействия
    def entity_queue(self):
        return self._own_queue

    # основной код сущности
    def run(self):        
        print(f'[{self.__class__.__name__}] старт')
        print(f'[{self.__class__.__name__}] отправляем тестовый запрос')

        mode = {"direction_1": "red", "direction_2": "green"}

        # запрос для сущности WorkerB - "скажи hello"
        event = Event(source=self.__class__.__name__,
                      destination='LightsGPIO',
                      operation='set_mode',
                      parameters=json.dumps(mode)
                      )
        
        self.monitor_queue.put(event)
        print(f'[{self.__class__.__name__}] завершение работы')

### Сущность LightsGPIO

Эта сущность ждёт входящее сообщение в течение заданного периода времени, если получает - обрабатывает и завершает работу или выходит по таймауту.

In [33]:
from multiprocessing import Queue, Process
from time import sleep


class LightsGPIO(Process):

    def __init__(self, monitor_queue: Queue):
        # вызываем конструктор базового класса
        super().__init__()
        # мы знаем только очередь монитора безопасности для взаимодействия с другими сущностями
        # прямая отправка сообщений в другую сущность запрещена в концепции FLASK
        self.monitor_queue = monitor_queue
        # создаём собственную очередь, в которую монитор сможет положить сообщения для этой сущности
        self._own_queue = Queue()

    def entity_queue(self):
        return self._own_queue

    # основной код сущности
    def run(self):        
        print(f'[{self.__class__.__name__}] старт')
        attempts = 10
        while attempts > 0:
            try:
                event: Event = self._own_queue.get_nowait()
                if event.operation == "set_mode":
                    print(f"[{self.__class__.__name__}] {event.source} запрашивает изменение режима {event.parameters}")
                    print(f"[{self.__class__.__name__}] новый режим: {event.parameters}!")
                    break
            except Empty:
                sleep(0.2)
                attempts -= 1
        print(f'[{self.__class__.__name__}] завершение работы')

### Инициализируем монитор и сущности

In [34]:
monitor = Monitor(monitor_events_queue)
control_system = ControlSystem(monitor_events_queue)
lights_gpio = LightsGPIO(monitor_events_queue)

регистрируем очереди сущностей в мониторе

In [35]:
monitor.add_entity_queue(control_system.__class__.__name__, control_system.entity_queue())
monitor.add_entity_queue(lights_gpio.__class__.__name__, lights_gpio.entity_queue())

[монитор] регистрируем сущность ControlSystem
[монитор] регистрируем сущность LightsGPIO


### Запускаем всё

Ожидаемая последовательность событий

![Диаграмма последовательности вызовов](https://www.plantuml.com/plantuml/png/dPBVIiCm6CNlynIvdtk1NSZ02n4KXJr1w886-cUqcR0xgoBpIdoJLgqhtRg-mfStygJPM3js9OLyoPTpVia97ITQn7eU-6o6gZmr4w7c5r6euyYVB18j0ouIxhb6JpIHtZnMUd4JXKf7iPK5RjgJNQlx1vrStbtTMeNVhXZR0VdmV6yQ34QSQjhI5sNcWrE5QMrUgQHlysAUq7oZqcxyq1hbW6KxG8S5KWEBPHMe5MLeOBa6uHd4YWbVSs1JQg1OKktEF3ATSTI2Vk7Os6b6AupGerctn3uM5WWfn_OAxGRGr4OogTtktjCzmt3utyZ2q-fHQBb_JrSEv177oHigRB1E2ChOL1vxfPz8ZWjdBhv9ELn5Bw_vfFf4L2fFlZsEtkBBpNjxWH8mkyHWbbZbC1TCXbCsne1Vxmy0)

In [36]:
monitor.start()
control_system.start()
lights_gpio.start()
sleep(2)

[монитор] старт
[ControlSystem] старт
[ControlSystem] отправляем тестовый запрос
[LightsGPIO] старт
[ControlSystem] завершение работы
[LightsGPIO] завершение работы
[монитор] проверяем запрос ControlEvent(operation='stop')
[монитор] завершение работы


### Теперь останавливаем

In [37]:
monitor.stop()
control_system.join()
lights_gpio.join()
monitor.join()

## Заключение

В этом блокноте продемонстрирован базовый функционал контролируемого изменения режима работы светофора. 

В примере не реализованы некоторые сущности и большая часть логики работы светофора, которую можно предположить по архитектурной диаграмме. Попробуйте сделать это самостоятельно!

## Упражнения

Уровень "Новичок"

- в коде ControlSystem измените режим на недопустимый (два зелёных) и выполните все ячейки. Убедитесь, что монитор безопасности заблокировал сообщение, как нарушающее политику безопасности
- измените политики безопасности так, чтобы был возможен режим "моргающий жёлтый" (yellow_blinking), переводящий перекрёсток в режим нерегулируемого

Уровень "Средней сложности"

- добавьте политики безопасности для доп. секций со стрелками (поворот налево или направо)
- измените код сущностей, чтобы они не завершали работу после одного сообщения, а работали произвольное время (см. реализацию монитора безопасности)
- реализуйте сущность само-диагностики (SelfDiagnostics) и отправку сообщений от LightsGPIO (необходимо доработать политики безопасности!)

Уровень "Продвинутый"

- измените код сущности ControlSystem, реализуйте смену режимов по таймеру (заданную длительность зелёного по каждому направлению)
- реализуйте сущность CitySystemConnector, которая будет имитировать получение изменения режима, реализуйте взаимодействие CitySystemConnector и ControlSystem (понадобится доработать политики безопасности). Например, изменение длительности зелёного по направлениям или отключение светофора (перевод перекрёстка в режим нерегулируемого)
- реализуйте передачу в компонент CitySystemConnector информации об исправности светофора (статус самодиагностики; текущий режим работы). В компоненте реализуйте вывод в виде сообщений о состоянии системы

## Уровень "Новичок"

In [137]:
from multiprocessing import Queue
monitor_events_queue = Queue()

In [138]:
from dataclasses import dataclass


@dataclass
class Event:
    source: str       # отправитель
    destination: str  # получатель
    operation: str    # чего хочет (запрашиваемое действие)
    parameters: str   # с какими параметрами

In [139]:
from multiprocessing import Queue, Process
from multiprocessing.queues import Empty
import json

# формат управляющих команд для монитора
@dataclass
class ControlEvent:
    operation: str

# список разрешенных сочетаний сигналов светофора
# любые сочетания, отсутствующие в этом списке, запрещены
traffic_lights_allowed_configurations = [
    {"direction_1": "red", "direction_2": "green", "arrow_1_right": "on", "arrow_2_right": "on", "arrow_1_left": "off", "arrow_2_left": "off"},
    {"direction_1": "red", "direction_2": "green", "arrow_1_right": "off", "arrow_2_right": "on", "arrow_1_left": "off", "arrow_2_left": "off"},
    {"direction_1": "red", "direction_2": "green", "arrow_1_right": "on", "arrow_2_right": "off", "arrow_1_left": "off", "arrow_2_left": "off"},
    {"direction_1": "red", "direction_2": "green", "arrow_1_right": "off", "arrow_2_right": "off", "arrow_1_left": "off", "arrow_2_left": "off"},
    {"direction_1": "red", "direction_2": "red", "arrow_1_right": "on", "arrow_2_right": "on", "arrow_1_left": "on", "arrow_2_left": "on"},    
    {"direction_1": "red", "direction_2": "red", "arrow_1_right": "on", "arrow_2_right": "on", "arrow_1_left": "on", "arrow_2_left": "off"},
    {"direction_1": "red", "direction_2": "red", "arrow_1_right": "on", "arrow_2_right": "on", "arrow_1_left": "off", "arrow_2_left": "on"},    
    {"direction_1": "red", "direction_2": "red", "arrow_1_right": "on", "arrow_2_right": "on", "arrow_1_left": "off", "arrow_2_left": "off"},    
    {"direction_1": "red", "direction_2": "red", "arrow_1_right": "on", "arrow_2_right": "off", "arrow_1_left": "on", "arrow_2_left": "on"},    
    {"direction_1": "red", "direction_2": "red", "arrow_1_right": "on", "arrow_2_right": "off", "arrow_1_left": "on", "arrow_2_left": "off"},    
    {"direction_1": "red", "direction_2": "red", "arrow_1_right": "on", "arrow_2_right": "off", "arrow_1_left": "off", "arrow_2_left": "on"},    
    {"direction_1": "red", "direction_2": "red", "arrow_1_right": "on", "arrow_2_right": "off", "arrow_1_left": "off", "arrow_2_left": "off"},    
    {"direction_1": "red", "direction_2": "red", "arrow_1_right": "off", "arrow_2_right": "on", "arrow_1_left": "on", "arrow_2_left": "on"},    
    {"direction_1": "red", "direction_2": "red", "arrow_1_right": "off", "arrow_2_right": "on", "arrow_1_left": "on", "arrow_2_left": "off"},    
    {"direction_1": "red", "direction_2": "red", "arrow_1_right": "off", "arrow_2_right": "on", "arrow_1_left": "off", "arrow_2_left": "on"},    
    {"direction_1": "red", "direction_2": "red", "arrow_1_right": "off", "arrow_2_right": "on", "arrow_1_left": "off", "arrow_2_left": "off"},    
    {"direction_1": "red", "direction_2": "red", "arrow_1_right": "off", "arrow_2_right": "off", "arrow_1_left": "on", "arrow_2_left": "on"},    
    {"direction_1": "red", "direction_2": "red", "arrow_1_right": "off", "arrow_2_right": "off", "arrow_1_left": "on", "arrow_2_left": "off"},    
    {"direction_1": "red", "direction_2": "red", "arrow_1_right": "off", "arrow_2_right": "off", "arrow_1_left": "off", "arrow_2_left": "on"},    
    {"direction_1": "red", "direction_2": "red", "arrow_1_right": "off", "arrow_2_right": "off", "arrow_1_left": "onff", "arrow_2_left": "off"},    
    {"direction_1": "red", "direction_2": "yellow", "arrow_1_right": "on", "arrow_2_right": "on", "arrow_1_left": "off", "arrow_2_left": "off"},
    {"direction_1": "red", "direction_2": "yellow", "arrow_1_right": "off", "arrow_2_right": "on", "arrow_1_left": "off", "arrow_2_left": "off"},
    {"direction_1": "red", "direction_2": "yellow", "arrow_1_right": "on", "arrow_2_right": "off", "arrow_1_left": "off", "arrow_2_left": "off"},
    {"direction_1": "red", "direction_2": "yellow", "arrow_1_right": "off", "arrow_2_right": "off", "arrow_1_left": "off", "arrow_2_left": "off"},
    #{"direction_1": "yellow", "direction_2": "yellow"},    # последний раз я такой светофор встречал в Тихвине в 2014 году. современные ждут некоторое время между красным одного светофора и красно-желтым другого.
    {"direction_1": "off", "direction_2": "off", "arrow_1_right": "off", "arrow_2_right": "off", "arrow_1_left": "off", "arrow_2_left": "off"},
    {"direction_1": "green", "direction_2": "red", "arrow_1_right": "off", "arrow_2_right": "off", "arrow_1_left": "off", "arrow_2_left": "off"},    
    {"direction_1": "green", "direction_2": "red", "arrow_1_right": "off", "arrow_2_right": "on", "arrow_1_left": "off", "arrow_2_left": "off"},    
    {"direction_1": "green", "direction_2": "red", "arrow_1_right": "on", "arrow_2_right": "off", "arrow_1_left": "off", "arrow_2_left": "off"},    
    {"direction_1": "green", "direction_2": "red", "arrow_1_right": "on", "arrow_2_right": "on", "arrow_1_left": "off", "arrow_2_left": "off"},    
    {"direction_1": "green", "direction_2": "red", "arrow_1_right": "on", "arrow_2_right": "on", "arrow_1_left": "off", "arrow_2_left": "off"},    
    {"direction_1": "yellow", "direction_2": "red", "arrow_1_right": "on", "arrow_2_right": "on", "arrow_1_left": "off", "arrow_2_left": "off"},
    {"direction_1": "yellow", "direction_2": "red", "arrow_1_right": "off", "arrow_2_right": "on", "arrow_1_left": "off", "arrow_2_left": "off"},
    {"direction_1": "yellow", "direction_2": "red", "arrow_1_right": "on", "arrow_2_right": "off", "arrow_1_left": "off", "arrow_2_left": "off"},
    {"direction_1": "yellow", "direction_2": "red", "arrow_1_right": "off", "arrow_2_right": "off", "arrow_1_left": "off", "arrow_2_left": "off"},
    #{"direction_1": "green", "direction_2": "yellow"}, # в такой конфигурации прекрасный киберимунный светофор будет собирать на себе огромное количество людей, пытающихся проскакивать на жёлтый.
    {"direction_1": "blinking_yellow", "direction_2": "blinking_yellow", "arrow_1_right": "off", "arrow_2_right": "off", "arrow_1_left": "off", "arrow_2_left": "off"},
]


# Класс, реализующий поведение монитора безопасности
class Monitor(Process):

    def __init__(self, events_q: Queue):
        # вызываем конструктор базового класса
        super().__init__()
        self._events_q = events_q  # очередь событий для монитора (входящие сообщения)
        self._control_q = Queue()  # очередь управляющих команд (например, для остановки монитора)
        self._entity_queues = {}   # словарь очередей известных монитору сущностей
        self._force_quit = False   # флаг завершения работы монитора

    # регистрация очереди новой сущности
    def add_entity_queue(self, entity_id: str, queue: Queue):
        print(f"[монитор] регистрируем сущность {entity_id}")
        self._entity_queues[entity_id] = queue

    def _check_mode(self, mode_str: str) -> bool:
        mode_ok = False
        try:
            # извлечём структуру из строки, в случае ошибки запретим изменение режима
            mode = json.loads(mode_str)
            # проверим входит ли запрашиваемый режим в список разрешённых
            print(f"[монитор] проверяем конфигурацию {mode}")
            if mode in traffic_lights_allowed_configurations:
                # такой режим найден, можно активировать
                mode_ok = True
        except:
            mode_ok = False
        return mode_ok

    # проверка политик безопасности        
    def _check_policies(self, event):
        print(f'[монитор] обрабатываем событие {event}')

        # default deny: всё, что не разрешено, запрещено по умолчанию!
        authorized = False

        # проверка на входе, что это экземпляр класса Event, 
        # т.е. имеет ожидаемый формат
        if not isinstance(event, Event):
            return False

        # 
        #  политики безопасности
        #

        # пример политики безопасности
        if event.source == "ControlSystem" \
                and event.destination == "LightsGPIO" \
                and event.operation == "set_mode" \
                and self._check_mode(event.parameters):
            authorized = True

        if authorized is False:
            print("[монитор] событие не разрешено политиками безопасности")
        return authorized

    # выполнение разрешённого запроса
    # метод должен вызываться только после проверки политик безопасности
    def _proceed(self, event):
        print(f'[монитор] отправляем запрос {event}')
        try:
            # найдём очередь получателя события
            dst_q: Queue = self._entity_queues[event.destination]
            # и положим запрос в эту очередь
            dst_q.put(event)
        except  Exception as e:
            # например, запрос пришёл от или для неизвестной сущности
            print(f"[монитор] ошибка выполнения запроса {e}")

    # основной код работы монитора безопасности    
    def run(self):
        print('[монитор] старт')

        # в цикле проверяет наличие новых событий, 
        # выход из цикла по флагу _force_quit
        while self._force_quit is False:
            event = None
            try:
                # ожидание сделано неблокирующим, 
                # чтобы можно было завершить работу монитора, 
                # не дожидаясь нового сообщения
                event = self._events_q.get_nowait()
                # сюда попадаем только в случае получение события,
                # теперь нужно проверить политики безопасности
                authorized = self._check_policies(event)
                if authorized:
                    # если политиками запрос авторизован - выполняем
                    self._proceed(event)
            except Empty:
                # сюда попадаем, если новых сообщений ещё нет,
                # в таком случае немного подождём
                sleep(0.5)
            except Exception as e:
                # что-то пошло не так, выведем сообщение об ошибке
                print(f"[монитор] ошибка обработки {e}, {event}")
            self._check_control_q()
        print('[монитор] завершение работы')

    # запрос на остановку работы монитора безопасности для завершения работы
    # может вызываться вне процесса монитора
    def stop(self):
        # поскольку монитор работает в отдельном процессе,
        # запрос помещается в очередь, которая проверяется из процесса монитора
        request = ControlEvent(operation='stop')
        self._control_q.put(request)

    # проверка наличия новых управляющих команд
    def _check_control_q(self):
        try:
            request: ControlEvent = self._control_q.get_nowait()
            print(f"[монитор] проверяем запрос {request}")
            if isinstance(request, ControlEvent) and request.operation == 'stop':
                # поступил запрос на остановку монитора, поднимаем "красный флаг"
                self._force_quit = True
        except Empty:
            # никаких команд не поступило, ну и ладно
            pass

In [140]:
from multiprocessing import Queue, Process
from time import sleep


class LightsGPIO(Process):

    def __init__(self, monitor_queue: Queue):
        # вызываем конструктор базового класса
        super().__init__()
        # мы знаем только очередь монитора безопасности для взаимодействия с другими сущностями
        # прямая отправка сообщений в другую сущность запрещена в концепции FLASK
        self.monitor_queue = monitor_queue
        # создаём собственную очередь, в которую монитор сможет положить сообщения для этой сущности
        self._own_queue = Queue()
        self._force_quit = False

    def entity_queue(self):
        return self._own_queue

    # основной код сущности
    def run(self):        
        print(f'[{self.__class__.__name__}] старт')
        while True:
            try:
                event: Event = self._own_queue.get_nowait()
                if event.operation == "set_mode":
                    print(f"[{self.__class__.__name__}] {event.source} запрашивает изменение режима {event.parameters}")
                    print(f"[{self.__class__.__name__}] новый режим: {event.parameters}!")
                if event.operation == "stop":
                    break
            except Empty:
                sleep(0.2)
        #print(f'[{self.__class__.__name__}] завершение работы')
    def stop(self):
        print(f'[{self.__class__.__name__}] завершение работы')
        request = ControlEvent(operation='stop')
        self._own_queue.put(request)

In [141]:
from multiprocessing import Queue, Process
import json


class ControlSystem(Process):

    def __init__(self, monitor_queue: Queue):
        # вызываем конструктор базового класса
        super().__init__()
        # мы знаем только очередь монитора безопасности для взаимодействия с другими сущностями
        # прямая отправка сообщений в другую сущность запрещена в концепции FLASK
        self.monitor_queue = monitor_queue
        # создаём собственную очередь, в которую монитор сможет положить сообщения для этой сущности
        self._own_queue = Queue()

    # выдаёт собственную очередь для взаимодействия
    def entity_queue(self):
        return self._own_queue

    # основной код сущности
    def run(self):        
        print(f'[{self.__class__.__name__}] старт')
        print(f'[{self.__class__.__name__}] отправляем тестовый запрос')

        mode = {"direction_1": "green", "direction_2": "green", "arrow_1_right": "off", "arrow_2_right": "off", "arrow_1_left": "off", "arrow_2_left": "off"}

        # запрос для сущности WorkerB - "скажи hello"
        event = Event(source=self.__class__.__name__,
                      destination='LightsGPIO',
                      operation='set_mode',
                      parameters=json.dumps(mode)
                      )
        
        self.monitor_queue.put(event)
        #sleep(2)
        print(f'[{self.__class__.__name__}] перевод в мигающий жёлтый')
        mode = {"direction_1": "blinking_yellow", "direction_2": "blinking_yellow", "arrow_1_right": "off", "arrow_2_right": "off", "arrow_1_left": "off", "arrow_2_left": "off"}

        event2 = Event(source=self.__class__.__name__,
              destination='LightsGPIO',
              operation='set_mode',
              parameters=json.dumps(mode)
              )
        self.monitor_queue.put(event2)
        
        print(f'[{self.__class__.__name__}] завершение работы')

In [142]:
monitor_novice = Monitor(monitor_events_queue)
control_system_novice = ControlSystem(monitor_events_queue)
lights_gpio_novice = LightsGPIO(monitor_events_queue)

In [143]:
monitor_novice.add_entity_queue(control_system.__class__.__name__, control_system.entity_queue())
monitor_novice.add_entity_queue(lights_gpio.__class__.__name__, lights_gpio.entity_queue())

[монитор] регистрируем сущность ControlSystem
[монитор] регистрируем сущность LightsGPIO


In [144]:
monitor_novice.start()
control_system_novice.start()
lights_gpio_novice.start()
sleep(2)

[монитор] старт
[ControlSystem] старт
[ControlSystem] отправляем тестовый запрос[LightsGPIO] старт

[ControlSystem] перевод в мигающий жёлтый
[ControlSystem] завершение работы
[монитор] обрабатываем событие Event(source='ControlSystem', destination='LightsGPIO', operation='set_mode', parameters='{"direction_1": "green", "direction_2": "green", "arrow_1_right": "off", "arrow_2_right": "off", "arrow_1_left": "off", "arrow_2_left": "off"}')
[монитор] проверяем конфигурацию {'direction_1': 'green', 'direction_2': 'green', 'arrow_1_right': 'off', 'arrow_2_right': 'off', 'arrow_1_left': 'off', 'arrow_2_left': 'off'}
[монитор] событие не разрешено политиками безопасности
[монитор] обрабатываем событие Event(source='ControlSystem', destination='LightsGPIO', operation='set_mode', parameters='{"direction_1": "blinking_yellow", "direction_2": "blinking_yellow", "arrow_1_right": "off", "arrow_2_right": "off", "arrow_1_left": "off", "arrow_2_left": "off"}')
[монитор] проверяем конфигурацию {'direct

In [145]:
monitor_novice.stop()
lights_gpio_novice.stop()
control_system_novice.join()
lights_gpio_novice.join()
monitor_novice.join()

[LightsGPIO] завершение работы
