In [20]:
import cv2
import numpy as np
import os

import torch
import torchvision.transforms as transforms
from torchvision.models import resnet18
from collections import defaultdict, Counter

model = torch.load("my_resnet3.pth")
model.eval()  

# Подготовка трансформации изображения
transform = transforms.Compose([
    transforms.ToPILImage(),
    transforms.Grayscale(num_output_channels=1),
    transforms.Resize((64, 64)),
    transforms.ToTensor(),
    transforms.Normalize([0.5], [0.5])
])

# Цвет и шрифт для отображения
font = cv2.FONT_HERSHEY_SIMPLEX
font_scale = 0.5
font_color = (0, 255, 0)
thickness = 2
image_classes = {0: "Line free", 1: "Line train"}
class_colors = {0: (0, 255, 0), 1: (0, 0, 255)} 

# Области интереса bbox по две на каддый путь - путь1 лев, прав, путь 2 лев, прав
bboxes = [
    (110, 170, 190, 110),
    (310, 170, 190, 110),
    (150, 65, 150, 60),
    (310, 68, 150, 60),
]

# Буфер для усреднения предсказанного класса
bbox_history = defaultdict(list)
def update_bbox_history(bbox_id, predicted_class, max_history=35):
    # Добавляем предсказание в историю
    bbox_history[bbox_id].append(predicted_class)
    # Удаляем старые значения, если длина превышает max_history
    if len(bbox_history[bbox_id]) > max_history:
        bbox_history[bbox_id].pop(0)

def get_average_class(bbox_id):

    if bbox_id not in bbox_history or len(bbox_history[bbox_id]) == 0:
        return None
    return Counter(bbox_history[bbox_id]).most_common(1)[0][0]


line_state1 = {"left": None, "right": None, "dir": None}
line_state2 = {"left": None, "right": None, "dir": None}

# Функция для обновления состояния bbox и определения поезда на пути и его направления
def update_line_state(line_state, frame_number, left_detected, right_detected):

    # Обнуление
    if (not left_detected or not right_detected ) and (line_state["dir"] is None ):
        line_state = {"left": None, "right": None, "dir": None}
        #print("res1")

    # Если левый bbox впервые сработал
    if left_detected and line_state["left"] is None:
        line_state["left"] = frame_number

    # Если правый bbox впервые сработал
    if right_detected and line_state["right"] is None:
        line_state["right"] = frame_number

    # Проверка наличия поезда
    if left_detected and right_detected:
        # Направление поезда
        if line_state["left"] < line_state["right"]:
            direction = "L to R"
            line_state["dir"] = direction
        else:
            direction = "R to L"    
            line_state["dir"] = direction        
        msg = f"Train {direction}"
        color = (0, 0, 255)

    # Если поезд прошел полностью
    elif (not left_detected) and (not right_detected) and (not (line_state["dir"] is None)):
        msg = f"Free"
        color = (0, 255, 0)
        line_state = {"left": None, "right": None, "dir": None}
        #print("res2")

    # Прошел только один край
    elif not (line_state["dir"] is None) :

        msg = f"Train {line_state["dir"]}"
        color = (0, 0, 255)

    # Поезда нет, в ожидании
    else:
        msg = f"Free"
        color = (0, 255, 0)

    return msg, color, line_state


# Открытие видео
video_path = "near.mp4"
video_output_path = "output_video.mp4"  # Файл для записи результата

cap = cv2.VideoCapture(video_path)
ret, frame = cap.read()

# Получаем параметры видео
frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = int(cap.get(cv2.CAP_PROP_FPS))  # Частота кадров

# Инициализация записи видео
fourcc = cv2.VideoWriter_fourcc(*'mp4v')  # Кодек для MP4
out = cv2.VideoWriter(video_output_path, fourcc, fps, (frame_width, frame_height))

# Задаем параметры дисторсии вручную примерно
DIM = frame.shape[:2][::-1]  # Размер изображения (width, height)
K = np.array([[800, 0, DIM[0] / 2],  # Фокусные расстояния и центр камеры
              [0, 800, DIM[1] / 2],
              [0, 0, 1]])
D = np.array([-0.3, 0.1, 0, 0])*3.5  # Коэффициенты дисторсии (k1, k2, p1, p2)

# Генерируем матрицу для исправления искажений
map1, map2 = cv2.initUndistortRectifyMap(K, D, None, K, DIM, cv2.CV_32FC1)


nframe = 0
while cap.isOpened():
    ret, frame = cap.read()    

    if not ret:
        break

    nframe += 1
    #frame  = cv2.remap(frame, map1, map2, interpolation=cv2.INTER_LINEAR) # можно не убирать искажения
    cv2.putText(frame, f"{nframe}", (10, 20), font, font_scale, (0, 255, 0), 1)
    
    # Собираем bbox кадры в батч
    batch = []
    for bbox in bboxes:
        x, y, w, h = bbox
        cropped = frame[y:y+h, x:x+w]  # Обрезаем изображение
        transformed = transform(cropped)  # Применяем трансформации
        batch.append(transformed)
    batch = torch.stack(batch) 

    outputs = model(batch)  # Предсказания
    predictions = torch.argmax(outputs, dim=1)  # Предсказанные классы

    nbbox = 0
    for bbox, pred_class in zip(bboxes, predictions.tolist()):
        x, y, w, h = bbox
        nbbox += 1
        class_label = image_classes.get(pred_class, f"T {pred_class}")

        # Класс усредняем по 35 кадрам ("скользящее среднее")
        update_bbox_history(nbbox, pred_class)
        avg_class = get_average_class(nbbox)
        color = class_colors.get(avg_class, (255, 255, 255))

        # Подписываем класс
        cv2.rectangle(frame, (x, y), (x+w, y+h), color, 1)
        cv2.putText(frame, f"{avg_class}", (x + 5, y + 15), font, font_scale, color, 1)

    # По левому и правому bbox определяем наличие поезда и его направление
    msg1, msgcolor, line_state1 = update_line_state(line_state1, nframe, get_average_class(1), get_average_class(2))
    cv2.putText(frame, f"Line1 - {msg1}", (220, 310), font, 0.7, msgcolor, 1)

    if not (line_state1["dir"] is None):
        # Если на 1 пути поезд то 2 путь не видно
        cv2.putText(frame, 'Line2 - Not visible', (220, 50), font, 0.7, (255, 0, 0), 1)
    else:
        # Проверка поезда на втором пути
        msg2, msgcolor2, line_state2 = update_line_state(line_state2, nframe, get_average_class(3), get_average_class(4))
        cv2.putText(frame, f"Line2 - {msg2}", (220, 50), font, 0.7, msgcolor2, 1)

    
    # Отображение кадра и лог видео
    out.write(frame)
    cv2.imshow("Rail Tracks", frame)  
    # Нажмите 'q' для выхода
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

cap.release()
out.release()
cv2.destroyAllWindows()


  model = torch.load("my_resnet3.pth")


In [12]:
cap.release()
cv2.destroyAllWindows()