In [1]:
import cv2
import numpy as np
from ultralytics import YOLO
import torchvision.transforms as transforms
import onnxruntime as ort
import torch
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")


In [2]:
class TrackOnFrame():
    def __init__(self, frame, number_frame, agression, x_top_left,y_top_left,h,w, track):
        self.frame = frame
        self.number_frame = number_frame
        self.agression = agression
        self.x_top_left = x_top_left
        self.y_top_left = y_top_left
        self.h = h
        self.w = w
        self.track = track
track_list = {} 
track_on_frame = []

In [3]:
IMG_SIZE = 224
filename = "VID20240501175552.mp4"
name = "MobileNetV2len30size224w4V4"
SEQ_LENGTH = 30
yolo = YOLO('yolov8n.onnx')
step = 5
kof_val_pred =  step / SEQ_LENGTH
output_filename = 'output_video_test2.avi'  # Имя файла для записи




In [4]:
cap = cv2.VideoCapture(filename)
ort_session = ort.InferenceSession("C:\\Users\\marku\\Projects\\aggression\\bot\\model.onnx")

In [5]:
transform = transforms.Compose([
    transforms.ToPILImage(),
    transforms.Resize((IMG_SIZE, IMG_SIZE)),
    transforms.ToTensor(),
    transforms.Grayscale(num_output_channels=1)
])

In [6]:
frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
for frame_number in range(frame_count-1):
    ret, img = cap.read()
    track_on_frame.append([])
    results = yolo.track(img, persist=True, show=False)
    for result in results[0]:
        if result.boxes.cls.item() != 0.0 or result.boxes.id is None:
            continue
        x, y, w, h = map(int, result.boxes.xywh.cpu()[0])
        x_top_left = x - w // 2
        y_top_left = y - h // 2
        track = int(result.boxes.id.int().cpu())
        cropped_img = img[y_top_left:y_top_left+h, x_top_left:x_top_left+w].copy()
        if h>w:
            pad = int((h-w)/2)
            padded_frame = np.pad(cropped_img, ((0,0), (pad,pad), (0,0)))
        else:
            pad = int((w-h)/2)
            padded_frame = np.pad(cropped_img, ((pad,pad), (0,0), (0,0)))
        frame_tensor = transform(padded_frame).unsqueeze(0)
        if track in track_list.keys():
            track_list[track].append(TrackOnFrame(frame_tensor, frame_number, None, x_top_left, y_top_left, h,w, track))
        else:
            track_list[track] = [TrackOnFrame(frame_tensor, frame_number, None, x_top_left, y_top_left, h,w, track)]
        track_on_frame[-1].append(track_list[track][-1])

Loading yolov8n.onnx for ONNX Runtime inference...
0: 640x640 1 person, 245.0ms
Speed: 11.5ms preprocess, 245.0ms inference, 17.0ms postprocess per image at shape (1, 3, 640, 640)
0: 640x640 1 person, 74.5ms
Speed: 3.0ms preprocess, 74.5ms inference, 3.0ms postprocess per image at shape (1, 3, 640, 640)
0: 640x640 1 person, 48.5ms
Speed: 2.0ms preprocess, 48.5ms inference, 1.0ms postprocess per image at shape (1, 3, 640, 640)
0: 640x640 1 person, 40.0ms
Speed: 2.0ms preprocess, 40.0ms inference, 1.0ms postprocess per image at shape (1, 3, 640, 640)
0: 640x640 1 person, 55.0ms
Speed: 2.0ms preprocess, 55.0ms inference, 1.0ms postprocess per image at shape (1, 3, 640, 640)
0: 640x640 1 person, 46.0ms
Speed: 2.0ms preprocess, 46.0ms inference, 1.0ms postprocess per image at shape (1, 3, 640, 640)
0: 640x640 1 person, 38.0ms
Speed: 2.0ms preprocess, 38.0ms inference, 4.0ms postprocess per image at shape (1, 3, 640, 640)
0: 640x640 2 persons, 1 skateboard, 29.5ms
Speed: 2.0ms preprocess, 29

In [7]:
def predict_with_onnx(ort_session, data_for_model_on_one_track, batch_size=64):
    num_samples = len(data_for_model_on_one_track)
    predictions = []

    for i in range(0, num_samples, batch_size):
        batch = np.array(data_for_model_on_one_track[i:i+batch_size])  # Получаем батч
        num_elements = batch.shape[0]

        # Если в батче меньше элементов, чем batch_size, дополняем его копиями первого элемента
        if num_elements < batch_size:
            first_element = batch[0]
            # Дополняем батч копиями первого элемента
            padding = np.repeat(first_element[np.newaxis, ...], batch_size - num_elements, axis=0)
            batch = np.concatenate((batch, padding), axis=0)

        batch_predictions = ort_session.run(None, {"input": batch})  # Предполагается, что входное имя в ONNX модели это "input"
        # Обрезаем лишние предсказания, если батч был дополнен
        batch_predictions = batch_predictions[0][:num_elements]

        predictions.extend(batch_predictions)
    return predictions

In [8]:
def track_prediction(track):
    if len(track_list[track]) <= SEQ_LENGTH:
        for i in range(len(track_list[track])):
            track_list[track][i].agression = 0
        return 
    data_for_model = []
    for frame_on_track_number_start in range(0, len(track_list[track])-SEQ_LENGTH, step):
        data = [track_list[track][i].frame for i in range(frame_on_track_number_start, frame_on_track_number_start+SEQ_LENGTH)]
        data_for_model.append(data)
    data_for_model = np.array(data_for_model).squeeze(3)
    data = predict_with_onnx(ort_session, data_for_model)
    number_pred = 0
    for frame_on_track_number_start in range(0, len(track_list[track])-SEQ_LENGTH, step):
        for i in range(frame_on_track_number_start, frame_on_track_number_start+SEQ_LENGTH):
            if track_list[track][i].agression is not None:  # Проверяем, существует ли уже значение агрессии
                # Если значение уже существует, берем среднее между старым и новым
                track_list[track][i].agression = track_list[track][i].agression + data[number_pred].item() * kof_val_pred
            else:
                # Иначе, устанавливаем новое значение агрессии
                track_list[track][i].agression = data[number_pred].item() * kof_val_pred
        number_pred += 1
    

In [9]:
for track in track_list.keys():
    track_prediction(track)
    print(f"S {track}")

S 1
S 3
S 11
S 13
S 16
S 20
S 21
S 23
S 26
S 29
S 32
S 34
S 36
S 37
S 38
S 42
S 43
S 46
S 52
S 53
S 57
S 61
S 66


In [10]:
fourcc = cv2.VideoWriter_fourcc(*'XVID')  # Выберите подходящий кодек
fps = 30  # или значение, соответствующее вашему видео
frame_size = (int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)), int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)))
out = cv2.VideoWriter(output_filename, fourcc, fps, frame_size)
edge_margin = 20
cap = cv2.VideoCapture(filename)
video_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
video_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
agressive_track_on_next_frame = {}
for frame_number in range(frame_count-1):
    ret, img = cap.read()
    for track_object in track_on_frame[frame_number]:
        # if track_object.x_top_left <= edge_margin or track_object.y_top_left <= edge_margin or \
        #         (track_object.x_top_left + track_object.w) >= video_width - edge_margin or \
        #         (track_object.y_top_left + track_object.h) >= video_height - edge_margin:
        #     continue
        text_position = (track_object.x_top_left, track_object.y_top_left)
        cv2.rectangle(img, (track_object.x_top_left, track_object.y_top_left), (track_object.x_top_left + track_object.w, track_object.y_top_left + track_object.h), (255, 255, 255), 1)

        # Always draw the track number
        cv2.putText(img, f"T:{track_object.track}", text_position, cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 0), 2)
        
        # Draw aggression level if it's not None
        if track_object.agression is not None:
            # Adjust the text position for aggression level
            agression_text_position = (track_object.x_top_left, track_object.y_top_left - 25)
            cv2.putText(img, f"A:{(track_object.agression*100):.0f}", agression_text_position, cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 0), 2)
        
            # If aggression is above a threshold, mark the track and draw a rectangle
            if track_object.agression > 0.90:
                agressive_track_on_next_frame[track_object.track] = 200
                cv2.rectangle(img, (track_object.x_top_left, track_object.y_top_left), (track_object.x_top_left + track_object.w, track_object.y_top_left + track_object.h), (0, 0, 255), 4)
            elif track_object.agression > 0.50:
                agressive_track_on_next_frame[track_object.track] = 100
                cv2.rectangle(img, (track_object.x_top_left, track_object.y_top_left), (track_object.x_top_left + track_object.w, track_object.y_top_left + track_object.h), (0, 0, 255), 4)
            elif track_object.agression < 0.20:
                agressive_track_on_next_frame[track_object.track] = 0

        
        # Draw a rectangle for tracks marked as aggressive
        if agressive_track_on_next_frame.get(track_object.track, 0) > 0:
            cv2.rectangle(img, (track_object.x_top_left, track_object.y_top_left), (track_object.x_top_left + track_object.w, track_object.y_top_left + track_object.h), (0, 0, 255), 4)
            agressive_track_on_next_frame[track_object.track] -= 1





    #    cv2.imshow("img", img)
    out.write(img)
    # if cv2.waitKey(1) & 0xFF == ord('q'):
    #     break
cv2.destroyAllWindows()
out.release()

    