# Кибериммунный подход к разработке

## Об авторе 

Этот блокнот разработан для вас Сергеем Соболевым, sergey.p.sobolev@kaspersky.com

Больше информации о кибериммунном подходе можно найти на странице https://github.com/sergey-sobolev/cyberimmune-systems/wiki/%D0%9A%D0%B8%D0%B1%D0%B5%D1%80%D0%B8%D0%BC%D0%BC%D1%83%D0%BD%D0%B8%D1%82%D0%B5%D1%82

Подписывайтесь на телеграм-канал @learning_cyberimmunity (https://t.me/learning_cyberimmunity)

Обучающие видео на тему кибериммунного подхода вы можете найти на youtube канале https://www.youtube.com/@learning_cyberimmunity/

## Ключевые технологии

- MILS (Multiple Independent Levels of Security/Safety)
- FLASK (FLux Advanced Security Kernel)

## Про MILS 

К сожалению, качественное разделение сущностей и контроль ресурсов в Jupyter блокноте мы не покажем, только лёгкий намёк, об этом в самом конце.

## Демонстрация FLASK подхода

1. Создадим две сущности и монитор, который будет контролировать их взаимодействие
2. Определим политики безопасности
3. Отправим простой запрос для проверки работы всех элементов

- Каждая сущность WorkerA, WorkerB, Monitor (монитор безопасности) в качестве интерфейса использует очереди сообщений, у каждой сущности есть своя «персональная» очередь, ассоциированная с ней
- WorkerA и WorkerB отправляют сообщения только в очередь Monitor
- Monitor проверяет сообщения на соответствие политикам безопасности, в случае положительного решения перенаправляет сообщение в очередь соответствующей сущности

![диаграмма](https://github.com/sergey-sobolev/cyberimmune-systems-basic-demo-notebook01/blob/main/images/flask-diagram.png?raw=true)

Очередь событий для монитора безопасности: все запросы от сущностей друг к другу должны отправляться только в неё

In [1]:
from multiprocessing import Queue
monitor_events_queue = Queue()

Настроим механизм журналирования, т.к. для асинхронных сообщений иначе вывод может оказаться плохо читаемым

In [2]:
# import sys
# import logging
# from multiprocessing import get_logger

# # зададим формата времени
# datefmt = '%Y-%m-%d %H:%M:%S'
# # зададим формат сообщения
# strfmt = '[%(asctime)s] [%(name)s] [%(levelname)s] > %(message)s'
# #logging.basicConfig(stream=sys.stdout, format=strfmt, datefmt=datefmt, level=logging.DEBUG)
# #LOGGER = logging.getLogger("demo")
# # настроим логгер модуля multiprocessing

Зафиксируем формат сообщений

In [3]:
from dataclasses import dataclass


@dataclass
class Event:
    source: str       # отправитель
    destination: str  # получатель
    operation: str    # чего хочет (запрашиваемое действие)
    parameters: str   # с какими параметрами

### Монитор безопасности

Ниже в методе _check_policies можно увидеть пример политики безопасности:

```python
if event.source == "WorkerA" \
        and event.destination == "WorkerB" \
        and event.operation == "say" \
        and event.parameters == "hello":
    authorized = True
```            

в этом примере проверяется отправитель сообщения, получатель, запрашиваемая операция и даже параметры операции. Это максимально жёсткий вариант, очевидно, количество проверок можно уменьшить.

А пока это место для экспериментов, как можно из монитора безопасности заблокировать взаимодействие между сущностями.

In [4]:
from multiprocessing import Queue, Process
from multiprocessing.queues import Empty


# формат управляющих команд для монитора
@dataclass
class ControlEvent:
    operation: str


# Класс, реализующий поведение монитора безопасности
class Monitor(Process):

    def __init__(self, events_q: Queue):
        # вызываем конструктор базового класса
        super().__init__()
        self._events_q = events_q  # очередь событий для монитора (входящие сообщения)
        self._control_q = Queue()  # очередь управляющих команд (например, для остановки монитора)
        self._entity_queues = {}   # словарь очередей известных монитору сущностей
        self._force_quit = False   # флаг завершения работы монитора

    # регистрация очереди новой сущности
    def add_entity_queue(self, entity_id: str, queue: Queue):
        print(f"[монитор] регистрируем сущность {entity_id}")
        self._entity_queues[entity_id] = queue

    # проверка политик безопасности        
    def _check_policies(self, event):
        print(f'[монитор] обрабатываем событие {event}')

        # default deny: всё, что не разрешено, запрещено по умолчанию!
        authorized = False

        # проверка на входе, что это экземпляр класса Event, 
        # т.е. имеет ожидаемый формат
        if not isinstance(event, Event):
            return False

        # 
        #  политики безопасности
        #

        # пример политики безопасности
        if event.source == "WorkerA" \
                and event.destination == "WorkerB" \
                and event.operation == "say" \
                and event.parameters == "hello":
            authorized = True

        if authorized is False:
            print(f"[монитор] событие не разрешено политиками безопасности")
        return authorized

    # выполнение разрешённого запроса
    # метод должен вызываться только после проверки политик безопасности
    def _proceed(self, event):
        print(f'[монитор] отправляем запрос {event}')
        try:
            # найдём очередь получателя события
            dst_q: Queue = self._entity_queues[event.destination]
            # и положим запрос в эту очередь
            dst_q.put(event)
        except  Exception as e:
            # например, запрос пришёл от или для неизвестной сущности
            print(f"[монитор] ошибка выполнение запроса {e}")

    # основной код работы монитора безопасности    
    def run(self):
        print(f'[монитор] старт')

        # в цикле проверяет наличие новых событий, 
        # выход из цикла по флагу _force_quit
        while self._force_quit is False:
            event = None
            try:
                # ожидание сделано неблокирующим, 
                # чтобы можно было завершить работу монитора, 
                # не дожидаясь нового сообщения
                event = self._events_q.get_nowait()
                # сюда попадаем только в случае получение события,
                # теперь нужно проверить политики безопасности
                authorized = self._check_policies(event)
                if authorized:
                    # если политиками запрос авторизован - выполняем
                    self._proceed(event)
            except Empty:
                # сюда попадаем, если новых сообщений ещё нет,
                # в таком случае немного подождём
                sleep(0.5)
            except Exception as e:
                # что-то пошло не так, выведем сообщение об ошибке
                print(f"[монитор] ошибка обработки {e}, {event}")
            self._check_control_q()
        print(f'[монитор] завершение работы')

    # запрос на остановку работы монитора безопасности для завершения работы
    # может вызываться вне процесса монитора
    def stop(self):
        # поскольку монитор работает в отдельном процессе,
        # запрос помещается в очередь, которая проверяется из процесса монитора
        request = ControlEvent(operation='stop')
        self._control_q.put(request)

    # проверка наличия новых управляющих команд
    def _check_control_q(self):
        try:
            request: ControlEvent = self._control_q.get_nowait()
            print(f"[монитор] проверяем запрос {request}")
            if isinstance(request, ControlEvent) and request.operation == 'stop':
                # поступил запрос на остановку монитора, поднимаем "красный флаг"
                self._force_quit = True
        except Empty:
            # никаких команд не поступило, ну и ладно
            pass

### Сущность А

Эта сущность отправляет одно сообщение для другой сущности (WorkerB) и завершает работу.

In [5]:
from multiprocessing import Queue, Process, log_to_stderr


class WorkerA(Process):

    def __init__(self, monitor_queue: Queue):
        # вызываем конструктор базового класса
        super().__init__()
        # мы знаем только очередь монитора безопасности для взаимодействия с другими сущностями
        # прямая отправка сообщений в другую сущность запрещена в концепции FLASK
        self.monitor_queue = monitor_queue
        # создаём собственную очередь, в которую монитор сможет положить сообщения для этой сущности
        self._own_queue = Queue()

    # выдаёт собственную очередь для взаимодействия
    def entity_queue(self):
        return self._own_queue

    # основной код сущности
    def run(self):        
        print(f'[{self.__class__.__name__}] старт')
        print(f'[{self.__class__.__name__}] отправляем тестовый запрос')

        # запрос для сущности WorkerB - "скажи hello"
        event = Event(source=self.__class__.__name__,
                      destination='WorkerB',
                      operation='say',
                      parameters='hello'
                      )
        
        self.monitor_queue.put(event)
        print(f'[{self.__class__.__name__}] завершение работы')

### Сущность B

Эта сущность ждёт входящее сообщение в течение заданного периода времени, если получает - обрабатывает и завершает работу или выходит по таймауту.

In [6]:
from multiprocessing import Queue, Process
from time import sleep


class WorkerB(Process):

    def __init__(self, monitor_queue: Queue):
        # вызываем конструктор базового класса
        super().__init__()
        # мы знаем только очередь монитора безопасности для взаимодействия с другими сущностями
        # прямая отправка сообщений в другую сущность запрещена в концепции FLASK
        self.monitor_queue = monitor_queue
        # создаём собственную очередь, в которую монитор сможет положить сообщения для этой сущности
        self._own_queue = Queue()

    def entity_queue(self):
        return self._own_queue

    # основной код сущности
    def run(self):        
        print(f'[{self.__class__.__name__}] старт')
        attempts = 5
        while attempts > 0:
            try:
                event: Event = self._own_queue.get_nowait()
                if event.operation == "say":
                    print(f"[{self.__class__.__name__}] {event.source} захотел, чтобы мы сказали {event.parameters}")
                    print(f"[{self.__class__.__name__}] мы согласны: {event.parameters}!")
                    break
            except Empty:
                sleep(0.2)
                attempts -= 1
        print(f'[{self.__class__.__name__}] завершение работы')

### Инициализируем монитор и сущности

In [7]:
monitor = Monitor(monitor_events_queue)
worker_a = WorkerA(monitor_events_queue)
worker_b = WorkerB(monitor_events_queue)

регистрируем очереди сущностей в мониторе

In [8]:
monitor.add_entity_queue(worker_a.__class__.__name__, worker_a.entity_queue())
monitor.add_entity_queue(worker_b.__class__.__name__, worker_b.entity_queue())

[монитор] регистрируем сущность WorkerA
[монитор] регистрируем сущность WorkerB


### Запускаем всё

Ожидаемая последовательность событий

![Диаграмма последовательности вызовов](https://www.plantuml.com/plantuml/png/SoWkIImgAStDuU8g038oapCB4lDA5CBpYx9JYnnHn7ppyp9Byekmg7FYue9gKD2rWwdJrhPIkDXuOT-5ZHUxBpPTs75XgyA5BHTsN-o7Qu8gHD9ZfN1X_yK6e9vD5xO2wLqNDW05WDpT5tQ0ZTY5f0TcZAukfZtxBZR1lP2ON99PbbYI2hSMIgCP0J80QEM62nj1ka5MevXwiP-BYyiXDIy565G0)

In [9]:
monitor.start()
worker_a.start()
worker_b.start()
sleep(2)

[монитор] старт
[WorkerA] старт
[WorkerA] отправляем тестовый запрос
[WorkerA] завершение работы
[WorkerB] старт
[монитор] обрабатываем событие Event(source='WorkerA', destination='WorkerB', operation='say', parameters='hello')
[монитор] отправляем запрос Event(source='WorkerA', destination='WorkerB', operation='say', parameters='hello')
[WorkerB] WorkerA захотел, чтобы мы сказали hello
[WorkerB] мы согласны: hello!
[WorkerB] завершение работы


### Теперь останавливаем

In [10]:
monitor.stop()
worker_a.join()
worker_b.join()
monitor.join()

## Заключение

В этом блокноте продемонстрирован подход FLASK: тотальный контроль взаимодействия сущностей с помощью монитора безопасности и простейших политик.

Пример не так прост, как может показаться: используется многопроцессная реализация - каждая сущность запускается в отдельном процессе, а передача данных осуществляется через специализированные очереди. После запуска сущностей доступ к их памяти становится защищён операционной системой. Это можно с натяжкой рассматривать как имитацию MILS функциональности.

## Упражнения

Уровень "Новичок"

- в коде WorkerA измените текст передаваемого сообщения и выполните все ячейки. Убедитесь, что монитор безопасности заблокировал сообщение, как нарушающее политику безопасности
- добавьте текстовое сообщение в мониторе после успешной проверки политик безопасности
- измените политики безопасности так, чтобы любой текст был допустим

Уровень "Шутки закончились"

- измените код сущностей так, чтобы передача сообщений между WorkerA и WorkerB была двунаправленной: после получения сообщения от WorkerA WorkerB посылает ответное сообщение отправителю (например, say "done"), WorkerA должен получить сообщение и выполнить команду. С помощью plantuml.com нарисуйте диаграмму последовательности для этого сценария и вставьте в блокнот.

Уровень "Полубог"

_Возможно, для задач этого уровня сложности имеет смысл перенести разработку кода в VS Code или PyCharm._

- добавьте третью сущность (WorkerC), которая будет периодически (например, раз в минуту) запрашивать внешнюю информацию - например, курс валют с сайта ЦБ РФ (https://www.cbr-xml-daily.ru/latest.js) и отправлять WorkerA, а WorkerA в случае превышения заданного порога будет уведомлять WorkerB о событии. Очевидно, для этого сценария нужно доработать политики безопасности.