In [None]:


import cv2
import numpy as np
import tensorflow as tf
from tensorflow import keras
import mediapipe as mp
import threading
import os


# Инициализация MediaPipe Face Mesh с использованием GPU
mp_face_mesh = mp.solutions.face_mesh
face_mesh = mp_face_mesh.FaceMesh(
    static_image_mode=False,
    max_num_faces=1,
    refine_landmarks=True,
    min_detection_confidence=0.5,
    min_tracking_confidence=0.5
)

# Проверка доступности GPU
print("Доступные устройства:", tf.config.list_physical_devices())
gpus = tf.config.experimental.list_physical_devices('GPU')
if gpus:
    try:
        # Ограничение памяти GPU для предотвращения утечек
        tf.config.experimental.set_memory_growth(gpus[0], True)
        print("Используется GPU:", gpus[0])
    except RuntimeError as e:
        print(e)

# Загрузка модели
model = keras.models.load_model(os.path.join('..', 'neural_network', 'data', 'models', 'my_test_model.keras'))

class VideoProcessor:
    def __init__(self):
        self.cap = cv2.VideoCapture(0)
        self.set_resolution(640, 480)
        self.lock = threading.Lock()
        self.frame = None
        self.running = True
        self.prediction = None
        self.face_detected = False

    def set_resolution(self, width, height):
        self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, width)
        self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, height)
        self.cap.set(cv2.CAP_PROP_FPS, 30)

    def capture_frames(self):
        while self.running:
            ret, frame = self.cap.read()
            if ret:
                with self.lock:
                    self.frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)

    def process_frames(self):
        while self.running:
            if self.frame is None:
                continue
            
            # Копирование кадра для обработки
            with self.lock:
                frame_copy = self.frame.copy()
            
            # Обнаружение лиц с MediaPipe (GPU-оптимизировано)
            results = face_mesh.process(frame_copy)
            
            if results.multi_face_landmarks:
                self.face_detected = True
                # Получение ограничивающего прямоугольника лица
                h, w, _ = frame_copy.shape
                landmarks = results.multi_face_landmarks[0].landmark
                x_coords = [int(lm.x * w) for lm in landmarks]
                y_coords = [int(lm.y * h) for lm in landmarks]
                x1, x2 = min(x_coords), max(x_coords)
                y1, y2 = min(y_coords), max(y_coords)
                
                # Выделение области лица с запасом
                expansion = 50
                x1 = max(0, x1 - expansion)
                y1 = max(0, y1 - expansion)
                x2 = min(w, x2 + expansion)
                y2 = min(h, y2 + expansion)
                
                # Предобработка для модели
                face_roi = frame_copy[y1:y2, x1:x2]
                if face_roi.size == 0:
                    continue
                
                # Конвертация в тензор и предсказание
                input_tensor = tf.convert_to_tensor(
                    [cv2.resize(face_roi, (145, 145)) / 255.0],
                    dtype=tf.float32
                )
                
                # Асинхронное предсказание на GPU
                self.prediction = model(input_tensor, training=False)[0][0].numpy()
            else:
                self.face_detected = False

    def run(self):
        # Запуск потоков
        capture_thread = threading.Thread(target=self.capture_frames)
        process_thread = threading.Thread(target=self.process_frames)
        
        capture_thread.start()
        process_thread.start()
        
        while self.running:
            if self.frame is None:
                continue
            
            # Отрисовка результатов
            with self.lock:
                display_frame = cv2.cvtColor(self.frame, cv2.COLOR_RGB2BGR)
            
            if self.face_detected and self.prediction is not None:
                status = "FATIGUE" if self.prediction > 0.7 else "ACTIVE"
                color = (0, 0, 255) if status == "FATIGUE" else (0, 255, 0)
                cv2.putText(display_frame, f"{status} {self.prediction:.2f}", 
                           (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, color, 2)
            
            # Показать FPS
            fps = self.cap.get(cv2.CAP_PROP_FPS)
            cv2.putText(display_frame, f"FPS: {fps:.1f}", (10, 60),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)
            
            cv2.imshow('Fatigue Detection', display_frame)
            
            if cv2.waitKey(1) & 0xFF == 27:
                self.running = False
        
        # Очистка
        self.cap.release()
        cv2.destroyAllWindows()
        capture_thread.join()
        process_thread.join()

if __name__ == "__main__":
    processor = VideoProcessor()
    processor.run()

Доступные устройства: [PhysicalDevice(name='/physical_device:CPU:0', device_type='CPU')]
