# Initialisation

In [None]:
import numpy as np
from ultralytics import YOLO
import torch
import pandas as pd
from typing import Dict

device = torch.device("cpu")
print("")
if torch.cuda.is_available():
    print("Training on GPU")
    torch.autocast(device_type="cuda", dtype=torch.bfloat16).__enter__()

    if torch.cuda.get_device_properties(0).major >= 8:
        torch.backends.cuda.matmul.allow_tf32 = True
        torch.backends.cudnn.allow_tf32 = True
else:
    print("Training on CPU")
print("")
import os
from sam2.build_sam import build_sam2
from sam2.sam2_image_predictor import SAM2ImagePredictor

## Timer

In [2]:
# import time
# from functools import wraps

# # Путь к лог-файлу
# log_file_path = ".\\execution_time_log.txt"

# # Декоратор для измерения времени выполнения и логирования
# def log_execution_time(func):
#     @wraps(func)
#     def wrapper(*args, **kwargs):
#         # Получаем имя функции
#         function_name = func.__name__

#         # Начинаем замер времени выполнения
#         start_time = time.time()

#         # Выполняем функцию
#         result = func(*args, **kwargs)

#         # Рассчитываем время выполнения
#         execution_time = time.time() - start_time

#         # Записываем в лог
#         with open(log_file_path, "a") as log_file:
#             log_file.write(f"Function: {function_name}, Execution Time: {execution_time:.6f} seconds\n")

#         return result

#     return wrapper

## Data

In [3]:
import src.utils.Configurator as configurator

In [None]:
config = configurator.main()

In [5]:
pth_sgn = config["pth_sgn"]
PATH_TO_INPUT = config["PATH_TO_INPUT"]
PATH_TO_RESULTS = config["PATH_TO_RESULTS"]
PATH_TO_TRUEMASKS = config["PATH_TO_TRUEMASKS"]
PATH_TO_EXPMASKS = config["PATH_TO_EXPMASKS"]

In [None]:
config["gaze_data_file"]

In [7]:
gaze_data_path = {}
for key, value in config["gaze_data_file"].items():
    gaze_data_path[key] = f"{PATH_TO_INPUT}{value}"
input_path = f"{PATH_TO_INPUT}{config["input_video"]}"
output_path = f"{PATH_TO_RESULTS}{config["output_video"]}"
output_dataframe = (
    f"{PATH_TO_RESULTS}{config["output_dataframe"]}"
)
output_dataframe_all = (
    f"{PATH_TO_RESULTS}{config["output_dataframe_all"]}"
)

In [8]:
Yolo_model_wheight = config["Yolo_model_wheight"]
sam2_checkpoint = config["sam2_checkpoint"]
sam_model_weight = config["sam_model_weight"]

In [9]:
segmentation_classes = config["segmentation_classes"]

In [10]:
def read_dataframes_for_segmentation(gaze_data_path: Dict[str, str]):
    """
    Читает данные о взгляде из CSV файлов и нормализует индекс мирового пространства.

    Params:
        gaze_data_path (Dict[str, str]): Словарь, где ключи - метки данных, а значения - пути к CSV файлам.

    Returns:
        df (Dict[str, pd.DataFrame]): Словарь, где ключи - метки данных, а значения - соответствующие DataFrame.
    """
    df = {}  # Инициализация словаря для хранения DataFrame

    # Проходим по всем меткам данных и загружаем соответствующие CSV файлы
    for df_label in gaze_data_path.keys():
        # Читаем CSV файл в DataFrame
        df[df_label] = pd.read_csv(gaze_data_path[df_label])

        # Нормализуем индекс мирового пространства
        df[df_label] = df[df_label].assign(
            world_index=df[df_label]["world_index"] - min(df[df_label]["world_index"])
        )

    return df  # Возвращаем словарь с DataFrame

In [11]:
gaze_data = read_dataframes_for_segmentation(gaze_data_path)

## Инициализация предикторов

In [None]:
if config["Yolo_model_wheight"] != "" and config["sam2_checkpoint"] != "" and config["sam_model_weight"] != "":
    # Инициализация ЙОЛЫ
    yolo_model = YOLO(Yolo_model_wheight)
    yolo_model.to(device)
    print("initialized")
    ### Вписать в класс загрузку модели SAM2
    model_cfg = os.path.basename("sam2_hiera_l.yaml")

    sam_model = build_sam2(model_cfg, sam2_checkpoint, device="cuda")
    sam_model = SAM2ImagePredictor(sam_model)
    # Загружаем веса с настроенным энкодером/декодером
    sam_model.model.load_state_dict(torch.load(sam_model_weight))
    def predict_on_image(image):
        """
        Выполняет предсказание на изображении с использованием модели и возвращает результаты.

        Params:
            yolo_model (str): модель YOLO pretrained
            sam_model (str): модель SAM2 c весами энкодера/декодера
            image (np.ndarray): Изображение (кадр), на котором будет выполнено предсказание.


        Returns:
            boxes (Boxes): Объект с ограничивающими прямоугольниками (bbox) для обнаруженных объектов.
            masks (Masks): Объект с сегментационными масками для обнаруженных объектов.
            cls (int): Класс объекта
            probs (np.ndarray): Вероятности классов для каждого обнаруженного объекта.
        """
        results = yolo_model.predict(source=image, conf=0.5, stream=True)

        # Инициализация переменных для хранения результатов
        boxes, masks, class_ids, probs = [], [], [], []

        # Обрабатываем результаты предсказания YOLO
        for r in results:
            yolo_boxes = r.boxes
            yolo_mask = r.masks.cpu().numpy() if r.masks is not None else None
            yolo_probs = r.probs.cpu().numpy() if r.probs is not None else None
            yolo_class_ids = r.boxes.cls.cpu().numpy()

            for i, box in enumerate(yolo_boxes):
                # # Преобразование бокса для SAM2
                xyxy = box.xyxy[0].cpu().numpy()
                input_box = np.array(
                    [
                        np.round(xyxy[0]),
                        np.round(xyxy[1]),
                        np.round(xyxy[2]),
                        np.round(xyxy[3]),
                    ]
                )

                if len(input_box) > 0:
                    # Предсказание масок с помощью SAM2
                    sam_model.set_image(image)
                    sam_masks, _, _ = sam_model.predict(
                        box=input_box, multimask_output=False
                    )

                    # Выбираем первую маску, так как multimask_output=False
                    masks.append(sam_masks[0])

                else:
                    masks.append(yolo_mask)

                # Добавляем данные боксов и классов
                # boxes.append(yolo_boxes)
                class_ids.append(int(yolo_class_ids[i]))  # Добавляем класс
                if yolo_probs is not None:
                    probs.append(yolo_probs[i])

        return {"boxes": yolo_boxes, "masks": masks, "classes": class_ids, "probs": probs}
elif config["Yolo_model_wheight"] != "":
    # Инициализация ЙОЛЫ
    yolo_model = YOLO(Yolo_model_wheight)
    yolo_model.to(device)
    print("initialized")
    def predict_on_image(image: np.ndarray):
        """
        Выполняет предсказание на изображении с использованием модели YOLO (детекции или сегментации)

        Params:
        model_path (str):
        Путь до модели сегментации YOLO 8/9/11 или детекции YOLO 8/9/10/11

        image (np.ndarray):
        Массив изображения (кадра), на котором модель будет выполнять предсказания

        Returns:

            boxes (np.ndarray): возвращает боксы в формате xyxy

            masks (np.ndarray): возвращает маски

            probs (np.ndarray): возвращает параметр conf

            cls (np.ndarray): возвращает класс
        """

        # Инициализация переменных для хранения результатов

        results = yolo_model.predict(
            source=image, conf=0.5,
            stream=True)

        boxes, masks, probs = None, None, None
        for r in results:
            boxes = r.boxes
            masks = r.masks.data.cpu().numpy() if r.masks is not None else None
            probs = r.probs if r.probs is not None else None
            class_ids = list(int(x) for x in r.boxes.cls)


        return {"boxes":boxes, "masks":masks, "classes":class_ids, "probs":probs}
else:
    raise Exception("no model wheight has been detected")

## Useful methods

In [13]:
def mask_prep(masks, class_ids, class_names, w, h):
    """
    Подготавливает маски для наложения на изображение, изменяя их размер и формат.

    Params:
        masks: Объект масок, содержащий сегментационные маски.
        boxes: Объекты ограничивающих прямоугольников для каждой маски.
        class_names: Список названий классов для идентификации объектов.

    Returns:
        augmented_masks (dict): Словарь, где ключи - названия классов, а значения - подготовленные маски.
    """
    # Инициализация словаря для хранения подготовленных масок
    augmented_masks = {}

    # Проверяем наличие масок
    if masks is not None:
        # Проходим по маскам и соответствующим ограничивающим прямоугольникам
        for mask, id in zip(masks, class_ids):

            # Преобразование маски в формат CV_8UC1
            if mask.max() <= 1:
                mask = (mask * 255).astype(np.uint8)
            elif mask.max() > 1 and mask.max() <= 255:
                mask = mask.astype(np.uint8)
            else:
                raise ValueError(
                    "Значения маски должны быть в диапазоне 0-1 или 0-255."
                )

            # Изменяем размер маски до оригинальной формы изображения
            augmented_masks[class_names[id]] = cv2.resize(mask, (h, w))
    else:
        raise ValueError("Маски не найдены")

    return augmented_masks


In [14]:
from scipy.spatial import distance
import cv2


def find_mask_raduis(mask):
    """
    Находит минимальный и максимальный радиус от центра масс маски до её границы.

    Params:
        mask: Бинарная маска (np.ndarray), значения маски должны быть 0 или 255.

    Returns:
        center_mass: Координаты центра масс (x, y).
        min_radius: Минимальный радиус (float).
        max_radius: Максимальный радиус (float).
    """
    # Преобразование маски в 8-битное изображение, если она не бинарная
    if mask.dtype != np.uint8:
        mask = (mask * 255).astype(np.uint8)

    # Находим момент маски
    moments = cv2.moments(mask)
    if moments["m00"] == 0:
        raise ValueError("Площадь маски равна нулю, невозможно вычислить центр масс.")

    # Вычисление координат центра масс
    center_mass = (
        int(moments["m10"] / moments["m00"]),
        int(moments["m01"] / moments["m00"]),
    )

    # Получаем контуры маски
    contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)

    if not contours:
        raise ValueError("Не найдены контуры маски.")

    # Извлекаем точки границы маски
    contour_points = np.vstack(contours)

    # Вычисляем расстояния от центра масс до всех точек границы
    distances = [distance.euclidean(center_mass, point[0]) for point in contour_points]

    # Минимальное и максимальное расстояние
    min_radius = min(distances)
    max_radius = max(distances)

    return center_mass, min_radius, max_radius

In [15]:
def find_min_distance_to_contour(mask, point):
    """
    Находит минимальное расстояние от заданной точки до ближайшего контура маски.

    Params:
        mask: Бинарная маска (np.ndarray), значения маски должны быть 0 или 255.
        point: Координаты точки, для которой ищется расстояние (x, y).

    Returns:
        min_distance: Минимальное расстояние до ближайшего контура (float).
    """
    # Преобразование маски в формат CV_8UC1, если это не так
    if mask.max() <= 1:
        mask = (mask * 255).astype(np.uint8)
    elif mask.max() > 1 and mask.max() <= 255:
        mask = mask.astype(np.uint8)
    else:
        raise ValueError("Значения маски должны быть в диапазоне 0-1 или 0-255.")

    # Находим контуры маски
    contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)

    if len(contours) == 0:
        raise ValueError("Контуры не найдены в маске.")

    # Инициализация минимального расстояния бесконечностью
    min_distance = float("inf")

    # Проходим по всем контурам и находим минимальное расстояние до каждого
    for contour in contours:
        distance = cv2.pointPolygonTest(contour, point, True)
        if distance < min_distance:
            min_distance = abs(distance)  # Используем модуль расстояния

    return min_distance

In [16]:
def is_point_touching_mask(mask, point, radius=5):
    """
    Проверяет, пересекает ли круг вокруг точки маску.
    Если хотя бы один пиксель внутри круга пересекает маску, возвращается True.

    Params:
        mask: Бинарная маска (np.ndarray), значения маски должны быть 0 или 255.
        point: Координаты точки (x, y).
        radius: Радиус круга вокруг точки (по умолчанию 5).

    Returns:
        True, если хотя бы один пиксель внутри радиуса круга пересекает маску, иначе False.
    """
    x, y = point

    # Преобразование маски в формат CV_8UC1, если это не так
    if mask.max() <= 1:
        mask = (mask * 255).astype(np.uint8)
    elif mask.max() > 1 and mask.max() <= 255:
        mask = mask.astype(np.uint8)
    else:
        raise ValueError("Значения маски должны быть в диапазоне 0-1 или 0-255.")

    # Размеры изображения
    h, w = mask.shape

    # Создаем пустую маску того же размера, что и исходная
    circle_mask = np.zeros_like(mask)

    # Рисуем круг на новой маске в точке (x, y) с заданным радиусом
    cv2.circle(circle_mask, (x, y), radius, 255, -1)

    # Проверяем пересечение круговой маски с исходной маской
    intersection = cv2.bitwise_and(mask, circle_mask)

    # Если хотя бы один пиксель пересечения непустой, возвращаем True
    return np.any(intersection > 0), np.sum(intersection)

In [17]:
def calculate_gaze_points(w, h, masks, gaze_df, frame_number, radius): #exprimental
    """
    Вычисляет и возвращает информацию о точках взгляда для текущего кадра,
    проверяя их нахождение и касание с заданными масками объектов.

    Params:
        w (int): Ширина кадра.
        h (int): Высота кадра.
        masks (dict): Словарь масок объектов, где ключи - идентификаторы объектов,
                      а значения - маски контуров.
        gaze_df (pd.DataFrame): DataFrame с данными о взгляде, содержащий информацию
                                о положении взгляда и метаданных для каждого кадра.
        frame_number (int): Номер текущего кадра.
        radius (int): Радиус для проверки касания точки взгляда с контуром маски.

    Returns:
        points_data (dict): Словарь, содержащий данные по точкам взгляда, где ключи -
                            метки времени взгляда, а значения - информация о нахождении
                            и касании точки взгляда с масками объектов.
    """

    # Отбираем данные взгляда для текущего кадра по номеру кадра
    gaze_points = gaze_df[gaze_df["world_index"] == frame_number]
    points_data = {}  # Инициализируем словарь для хранения данных по точкам взгляда

    # Проходим по всем точкам взгляда в текущем кадре
    for _, gaze_point in gaze_points.iterrows():
        # Проверяем, что координаты точки взгляда не NaN
        if not np.isnan(gaze_point["norm_pos_x"]) and not np.isnan(
            gaze_point["norm_pos_y"]
        ):
            # Вычисляем координаты точки на кадре на основе нормализованных координат
            center_x = int(gaze_point["norm_pos_x"] * h)
            center_y = int((1 - gaze_point["norm_pos_y"]) * w)
            point = (center_x, center_y)  # Определяем точку на кадре

            # Создаем пустой датафрейм для хранения данных по текущей точке взгляда
            point_data = pd.DataFrame(
                columns=["mask", "is inside", "intersection", "normal", "radius_of_gaze"]
            )

            # Проверяем взаимодействие точки взгляда с каждой маской
            for mask_i in masks.keys():
                # Создаем словарь для текущей маски
                is_inside, _ = is_point_touching_mask(masks[mask_i], point, radius=1)
                is_touching, intersection = is_point_touching_mask(
                    masks[mask_i], point, radius=radius
                )
                normal = (
                    0
                    if is_inside
                    else find_min_distance_to_contour(masks[mask_i], point)
                )

                # Проверяем, что все данные валидны (нет NaN или None)
                if (
                    not pd.isna(is_inside)
                    and not pd.isna(is_touching)
                    and not pd.isna(normal)
                ):
                    row_data = {
                        "mask": [mask_i],
                        "is inside": [is_inside],
                        "intersection": [intersection],
                        "normal": [normal],
                        "radius_of_gaze": radius
                    }
                    # Добавляем данные для текущей маски как новую строку в DataFrame
                    row_df = pd.DataFrame(row_data)
                    point_data = pd.concat([point_data, row_df], ignore_index=True)

                    # Добавляем данные по текущей точке взгляда в словарь
                    points_data[gaze_point["gaze_timestamp"]] = point_data

    return points_data

In [18]:
def probability_function(masks, constant):
    halo_rads = {}
    for mask_i in masks.keys():
        # Calculate the area of the mask
        mask_area = np.sum(masks[mask_i])
        center_mass, min_radius, max_radius = find_mask_raduis(masks[mask_i])
        # Calculate the halo distance based on the mask area and the constant
        halo_rads[mask_i] = min_radius * (constant**2) / mask_area

    def calculate_probability(distance, mask_i):
        if distance >= halo_rads[mask_i]:
            return 0
        else:
            return 1 - distance / halo_rads[mask_i]

    return calculate_probability, halo_rads

In [19]:
import json
import pandas as pd

def make_verdict(points_data, probability_function, frame_number):
    """
    Формирует DataFrame с результатами анализа точек взгляда.

    Parameters:
        points_data (dict): Данные точек взгляда.
        probability_function (function): Функция для вычисления вероятности на основе расстояния.
        frame_number (int): Номер текущего кадра.

    Returns:
        pd.DataFrame: DataFrame с результатами анализа.
    """
    verdict_data = []

    # Проходим по каждому типу обработки
    for processing_type, gaze_data in points_data.items():
        # Проходим по каждой записи точки взгляда
        for gaze_timestamp, point_df in gaze_data.items():
            # Инициализируем данные для ближайшей и лучшей масок
            nearest_data = {
                "mask": "None",
                "prob": 0,
                "is inside": False,
                "intersection": 0,
                "normal": float("inf"),
            }
            best_data = {
                "mask": "None",
                "prob": 0,
                "is inside": False,
                "intersection": 0,
                "normal": float("inf"),
            }
            other_mask_data = {}
            radius_of_gaze = point_df.iloc[0]["radius_of_gaze"] if not point_df.empty else 0

            if not point_df.empty:
                # Проходим по каждой маске и вычисляем вероятность
                for _, row in point_df.iterrows():
                    prob = probability_function(row["normal"], row["mask"])

                    # Сравниваем расстояние до ближайшей маски
                    if row["normal"] < nearest_data["normal"]:
                        nearest_data = {
                            "mask": row["mask"],
                            "prob": prob,
                            "is inside": row["is inside"],
                            "normal": row["normal"],
                            "intersection": row["intersection"]
                        }

                    # Обновляем маску с максимальной вероятностью
                    if prob > best_data["prob"]:
                        best_data = {
                            "mask": row["mask"],
                            "prob": prob,
                            "is inside": row["is inside"],
                            "normal": row["normal"],
                            "intersection": row["intersection"]
                        }

                    # Сохраняем информацию о всех масках
                    other_mask_data[row["mask"]] = {
                        "is inside": row["is inside"],
                        "probability": prob,
                        "normal": row["normal"],
                        "intersection": row["intersection"]
                    }

                # Удаляем информацию о лучшей и ближайшей масках из other_mask_data
                other_mask_data.pop(best_data["mask"], None)
                if best_data["mask"] != nearest_data["mask"]:
                    other_mask_data.pop(nearest_data["mask"], None)

            # Формируем строку для DataFrame
            row_data = {
                "gaze_timestamp": gaze_timestamp,
                "frame_number": frame_number,
                "processing_type": processing_type,
                "best_mask": best_data["mask"],
                "is_best_inside": best_data["is inside"],
                "best_intersection": best_data["intersection"],
                "best_probability": best_data["prob"],
                "best_normal": best_data["normal"],
                "nearest_mask": nearest_data["mask"],
                "nearest_intersection": nearest_data["intersection"],
                "nearest_probability": nearest_data["prob"],
                "nearest_normal": nearest_data["normal"],
                "radius_of_gaze": radius_of_gaze,
                "other_mask_data": json.dumps(other_mask_data),
            }

            # Добавляем строку в список
            verdict_data.append(row_data)

    # Преобразуем список словарей в DataFrame
    verdict_df = pd.DataFrame(verdict_data)

    return verdict_df


In [20]:
def safe_plug(
    gaze_data: Dict[str, pd.DataFrame],
):
    columns_to_check = [
        "gaze_timestamp",
        "world_index",
        "confidence",
        "norm_pos_x",
        "norm_pos_y",
    ]
    if not bool(gaze_data):
        raise Exception()
    for gaze_df in gaze_data.values():
        if not isinstance(gaze_df, pd.DataFrame()):
            raise Exception(
                "safe_plug in segmentation_run_through",
                f"Ошибка: gaze_df должен быть pd.DataFrame(), получено {type(gaze_df)}",
            )

        if not all(x in columns_to_check for x in [*gaze_df]):
            raise Exception(
                "safe_plug in segmentation_run_through",
                f"Ошибка: gaze_df должен содержать определенные столбцы, отсутствоют {set(columns_to_check).difference({*gaze_df})}",
            )
        

In [21]:
import cv2
import numpy as np
import os


def save_experimental_mask_for_frame(p, w, h, frame_counter, masks_folder):
    """
    Функция для получения мультиклассовой маски для определенного фрейма

    Param:
    frame_counter: номер фрейма
    masks_folder: путь к истинным маскам

    Return:
    mask - истинная маска определенного фрейма
    """

    # Инициализация пустой мультиклассовой маски
    multi_class_mask = np.zeros((w, h), dtype=np.uint8)  # Используем (h, w)

    # Заполнение мультиклассовой маски из словаря
    for cl, mask in zip(p["classes"], p["masks"]):
        if mask is not None:  # Проверка на наличие маски
            original_shape = mask.shape

            # Приведение маски к нужному размеру, если это необходимо
            if original_shape != (w, h):
                # Если размеры не совпадают, используем cv2.resize
                mask = cv2.resize(mask, (h, w), interpolation=cv2.INTER_NEAREST)

            # Наложение класса на соответствующую область маски
            multi_class_mask[mask > 0] = cl + 1  # Убедитесь, что cl начинается с 0

    # Формируем имя файла маски
    mask_filename = f"{frame_counter:05d}.png"
    mask_path = os.path.join(masks_folder, mask_filename)

    # Записываем изображение
    cv2.imwrite(mask_path, multi_class_mask)

## Main methods

In [22]:
from src.utils.VideoLogger import VideoLogger
from src.utils.VideoAggregation import VideoAggregation

def segmentation_run_through(
    video: VideoAggregation,
    gaze_data: Dict[str, pd.DataFrame],
    class_names: list,
    Logger: VideoLogger,
    predictor=predict_on_image,
    path_to_masks=None,
):
    """
    Основная функция для сегментации видео, обработки и визуализации данных.
    Процесс включает в себя следующие шаги:
    чтение -> предсказание -> отрисовка маски и боксов -> отрисовка точек ->
    запись кадра в выходное видео.

    Params:
        video (VideoAggregation): Объект VideoAggregation с инициализированными
                                  исходными и выходными видеофайлами.
        gaze_data (Dict[str, pd.DataFrame]): Словарь с данными взгляда, где ключи -
                                             метки данных, а значения - соответствующие
                                             DataFrame с координатами и метаданными.
        class_names (list): Список классов сегментации.
        Logger (VideoLogger): Объект для ведения логов.
        predictor (function): Функция для предсказания сегментации.
        path_to_masks (str): Путь для сохранения масок (по умолчанию None).

    Returns:
        pd.DataFrame, pd.DataFrame: Два DataFrame с результатами обработки.
    """
    try:
        safe_plug(gaze_data)
    except Exception as e:
        Logger.log(text_log_data=("error", *e.args, "До цикла обработки"))

    df = pd.DataFrame()
    constant = 2000

    while True:
        try:
            frame = video.read()  # Чтение текущего кадра

            # Проверяем, достигнут ли конец видео
            if video.end or video.enough():
                break

            if not video.already():
                continue

            # Предсказание сегментации
            try:
                prediction = predictor(frame)
            except Exception as e:
                print(e)
                Logger.log(text_log_data=("error", *e.args, f"Кадр {video.frame_counter}"))
                continue

            # Сохранение экспериментальной маски
            if path_to_masks:
                try:
                    save_experimental_mask_for_frame(prediction, *frame.shape[:2], video.frame_counter, path_to_masks)
                except Exception as e:
                    print(e)
                    Logger.log(text_log_data=("error", *e.args, f"Кадр {video.frame_counter}"))

            # Подготовка масок и расчёт вероятностей
            try:
                prediction["masks"] = mask_prep(prediction["masks"], prediction["classes"], class_names, *frame.shape[:2])
                pf, rads = probability_function(prediction["masks"], constant)
            except Exception as e:
                print(e)
                Logger.log(text_log_data=("error", *e.args, f"Кадр {video.frame_counter}"))

            print(rads)
            # Визуализация результатов
            try:
                Logger.log(frame=frame, masks=prediction["masks"], boxes=prediction["boxes"], segment_rads = rads)
            except Exception as e:
                print(e)
                Logger.log(text_log_data=("error", *e.args, f"Кадр {video.frame_counter}"))

            # Обработка данных взгляда
            points_data = {}
            for label, gaze_df in gaze_data.items():
                try:
                    Logger.log(frame=frame, gaze_data=gaze_df, label=label, frame_number=video.frame_counter)
                    points_data[label] = calculate_gaze_points(*frame.shape[:2], prediction["masks"], gaze_df, video.frame_counter, 5)
                except Exception as e:
                    print(e)
                    Logger.log(text_log_data=("error", *e.args, f"Кадр {video.frame_counter}, Датафрейм {label}"))

                # Добавление данных о взглядах и сегментации
                df = pd.concat([df, pd.DataFrame(make_verdict(points_data, pf, video.frame_counter))], ignore_index=True)

            # Запись кадра в видео
            video.write(frame)

        except Exception as e:
            print(e)
            Logger.log(text_log_data=("error", *e.args, video.frame_counter))
            video.write(frame)
        except KeyboardInterrupt:
            Logger.log(text_log_data=("error", "Прерывание", video.frame_counter))
            video.release()
            break

    # Завершение работы
    video.release()
    print("Сегментация завершена.")
    return df


# Launch

In [None]:
import warnings

# Отключение предупреждений Pandas
warnings.filterwarnings("ignore", category=FutureWarning)
# Пример вызова функции

Logger = VideoLogger(
    "MGF",
    seg_vis={"classes": segmentation_classes, "alpha": 0.4},
    gaze_vis={"classes": gaze_data.keys()},
)
video = VideoAggregation(
    input_path, output_path, Logger=Logger, frame_floor=4797, frame_cap=4799
)
df = segmentation_run_through(
    video, gaze_data, segmentation_classes, Logger=Logger, path_to_masks=PATH_TO_EXPMASKS
)
df.to_csv(output_dataframe, index=False)

In [36]:
import src.utils.MetricsCalculator as calc

In [37]:
# Пути к папкам
folder1 = PATH_TO_TRUEMASKS
folder2 = PATH_TO_EXPMASKS

# Найдем пересекающиеся имена файлов
common_filenames = calc.find_common_filenames(folder1, folder2)

# Загрузка изображений из каждой папки по общим именам файлов
images1 = calc.load_images_by_filenames(folder1, common_filenames)
images2 = calc.load_images_by_filenames(folder2, common_filenames)

In [38]:
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import jaccard_score, precision_score, recall_score, f1_score, confusion_matrix, classification_report

def calculate_metrics(y_trues, y_preds,  classes, background_class=0):
    # Инициализируем списки для хранения метрик
    precisions = []
    recalls = []
    f1_scores = []
    ious = []
        # Проходим по каждой паре масок
    for y_true, y_pred in zip(y_trues, y_preds):
        # Проверяем, что размеры масок совпадают
        if y_true.shape != y_pred.shape:
            print(f"Размеры масок не совпадают! Истинная: {y_true.shape}, Предсказанная: {y_pred.shape}")
            continue  # Пропускаем эту пару
    # Проходим по каждой паре масок
    for y_true, y_pred in zip(y_trues, y_preds):
        # Приводим маски к формату 1D
        y_true_flat = y_true.flatten()
        y_pred_flat = y_pred.flatten()


        # Рассчитываем precision, recall, F1-score с учетом всех классов, кроме фона
        precision = precision_score(y_true_flat, y_pred_flat, average='weighted', labels=np.unique(y_true_flat[y_true_flat != background_class]), zero_division=0)
        recall = recall_score(y_true_flat, y_pred_flat, average='weighted', labels=np.unique(y_true_flat[y_true_flat != background_class]), zero_division=0)
        f1 = f1_score(y_true_flat, y_pred_flat, average='weighted', labels=np.unique(y_true_flat[y_true_flat != background_class]), zero_division=0)
        iou = jaccard_score(y_true_flat, y_pred_flat, average='macro')
        
        # Рассчитываем IoU (Intersection over Union) с учетом фона
        # intersection = np.logical_and(y_true_flat, y_pred_flat).sum()
        # union = np.logical_or(y_true_flat, y_pred_flat).sum()
        # iou = intersection / union if union != 0 else 0

        # Сохраняем результаты
        precisions.append(precision)
        recalls.append(recall)
        f1_scores.append(f1)
        ious.append(iou)

    # Выводим средние метрики
    avg_precision = np.mean(precisions) if precisions else 0
    avg_recall = np.mean(recalls) if recalls else 0
    avg_f1 = np.mean(f1_scores) if f1_scores else 0
    avg_iou = np.mean(ious) if ious else 0

    print("Метрики по Weighted")
    print('\n')
    print("Precision:", avg_precision)
    print("Recall:", avg_recall)
    print("F1-Score:", avg_f1)
    print('\n')
    print("IoU:", avg_iou)
    print('\n')

    # Для визуализации матрицы путаницы
    # Соединяем все истинные и предсказанные маски
    y_true_all = np.concatenate([y.flatten() for y in y_trues])
    y_pred_all = np.concatenate([y.flatten() for y in y_preds])

    # Выводим отчет по классам, исключая фон
    print("Classification Report (excluding background):")
    print(classification_report(y_true_all, y_pred_all, target_names=classes,labels=np.unique(y_true_all[y_true_all != background_class])))

    # Рассчитываем матрицу путаницы
    conf_matrix = confusion_matrix(y_true_all, y_pred_all, normalize='true', labels=np.unique(y_pred_all[y_pred_all != background_class]))

    # Визуализируем матрицу путаницы
    plt.figure(figsize=(8, 6))
    sns.heatmap(conf_matrix, annot=True, cmap='Blues', 
                xticklabels=classes, #np.unique(y_pred_all[y_pred_all != background_class]), 
                yticklabels=classes) #np.unique(y_true_all[y_true_all != background_class]))
    plt.xlabel('Предсказанные метки')
    plt.ylabel('Истинные метки')
    plt.title('Матрица путаницы (исключая фон)')
    plt.show()

    return avg_precision, avg_recall, avg_f1, avg_iou

In [39]:
calc.calculate_metrics = calculate_metrics

In [None]:
avg_precision, avg_recall, avg_f1, avg_iou = calc.calculate_metrics(images1, images2, segmentation_classes)