In [None]:
from google.colab import drive
drive.mount('/content/drive/')

Mounted at /content/drive/


In [None]:
pip install opencv-python-headless deepface tf-keras tqdm

Collecting deepface
  Downloading deepface-0.0.93-py3-none-any.whl.metadata (30 kB)
Collecting flask-cors>=4.0.1 (from deepface)
  Downloading Flask_Cors-5.0.0-py2.py3-none-any.whl.metadata (5.5 kB)
Collecting mtcnn>=0.1.0 (from deepface)
  Downloading mtcnn-1.0.0-py3-none-any.whl.metadata (5.8 kB)
Collecting retina-face>=0.0.1 (from deepface)
  Downloading retina_face-0.0.17-py3-none-any.whl.metadata (10 kB)
Collecting fire>=0.4.0 (from deepface)
  Downloading fire-0.7.0.tar.gz (87 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m87.2/87.2 kB[0m [31m8.0 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting gunicorn>=20.1.0 (from deepface)
  Downloading gunicorn-23.0.0-py3-none-any.whl.metadata (4.4 kB)
Collecting lz4>=4.3.3 (from mtcnn>=0.1.0->deepface)
  Downloading lz4-4.3.3-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (3.7 kB)
Downloading deepface-0.0.93-py3-none-any.whl (108 kB)
[2K   [90m━

In [None]:
pip install face_recognition

Collecting face_recognition
  Downloading face_recognition-1.3.0-py2.py3-none-any.whl.metadata (21 kB)
Collecting face-recognition-models>=0.3.0 (from face_recognition)
  Downloading face_recognition_models-0.3.0.tar.gz (100.1 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m100.1/100.1 MB[0m [31m8.2 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Downloading face_recognition-1.3.0-py2.py3-none-any.whl (15 kB)
Building wheels for collected packages: face-recognition-models
  Building wheel for face-recognition-models (setup.py) ... [?25l[?25hdone
  Created wheel for face-recognition-models: filename=face_recognition_models-0.3.0-py2.py3-none-any.whl size=100566162 sha256=3427e2412257020bb19e1194b5413890091e218861a955cfa78e1cd27f34b5a5
  Stored in directory: /root/.cache/pip/wheels/7a/eb/cf/e9eced74122b679557f597bb7c8e4c739cfcac526db1fd523d
Successfully built face-recognition-models
Installing collected packages: face-recog

In [None]:
pip install opencv-python mediapipe tqdm

Collecting mediapipe
  Downloading mediapipe-0.10.18-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (9.7 kB)
Collecting sounddevice>=0.4.4 (from mediapipe)
  Downloading sounddevice-0.5.1-py3-none-any.whl.metadata (1.4 kB)
Downloading mediapipe-0.10.18-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (36.1 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m36.1/36.1 MB[0m [31m18.1 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading sounddevice-0.5.1-py3-none-any.whl (32 kB)
Installing collected packages: sounddevice, mediapipe
Successfully installed mediapipe-0.10.18 sounddevice-0.5.1


In [None]:
import cv2
import face_recognition
import os
import numpy as np
from tqdm import tqdm
from deepface import DeepFace
import mediapipe as mp
from transformers import pipeline
from sklearn.model_selection import train_test_split
from sklearn.pipeline import make_pipeline
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.naive_bayes import MultinomialNB
from sklearn import metrics
from collections import deque

# Função para carregar imagens e codificações faciais de uma pasta
def load_images_from_folder(folder):
    known_face_encodings = []
    known_face_names = []

    for filename in os.listdir(folder):
        if filename.endswith(".jpg") or filename.endswith(".png"):
            image_path = os.path.join(folder, filename)
            image = face_recognition.load_image_file(image_path)
            face_encodings = face_recognition.face_encodings(image)

            if face_encodings:
                face_encoding = face_encodings[0]
                name = os.path.splitext(filename)[0][:-1]  # Remove sufixo numérico e extensão
                known_face_encodings.append(face_encoding)
                known_face_names.append(name)

    return known_face_encodings, known_face_names

def calculate_movement_speed(prev_landmarks, curr_landmarks, mp_pose):
    """Calcula a velocidade de movimento entre frames."""
    if prev_landmarks is None or curr_landmarks is None:
        return 0

    # Pontos chave para monitorar (ombros e cotovelos)
    key_points = [
        mp_pose.PoseLandmark.LEFT_SHOULDER.value,
        mp_pose.PoseLandmark.RIGHT_SHOULDER.value,
        mp_pose.PoseLandmark.LEFT_ELBOW.value,
        mp_pose.PoseLandmark.RIGHT_ELBOW.value
    ]

    total_speed = 0
    for point in key_points:
        prev_point = prev_landmarks[point]
        curr_point = curr_landmarks[point]

        # Calcula a distância euclidiana entre as posições
        speed = np.sqrt(
            (curr_point.x - prev_point.x)**2 +
            (curr_point.y - prev_point.y)**2
        )
        total_speed += speed

    return total_speed / len(key_points)

def is_movement_anomalous(speed, speed_history, threshold_multiplier=2.0):
    """Determina se um movimento é anômalo baseado no histórico de velocidades."""
    if len(speed_history) < 10:  # Precisamos de um histórico mínimo
        return False

    mean_speed = np.mean(speed_history)
    std_speed = np.std(speed_history)

    # Um movimento é considerado anômalo se estiver muito acima da média histórica
    threshold = mean_speed + (threshold_multiplier * std_speed)
    return speed > threshold

def detect_arm_movements(landmarks, mp_pose):
    """Detecta movimentos dos braços usando landmarks do MediaPipe."""
    left_shoulder = landmarks[mp_pose.PoseLandmark.LEFT_SHOULDER.value]
    right_shoulder = landmarks[mp_pose.PoseLandmark.RIGHT_SHOULDER.value]
    left_elbow = landmarks[mp_pose.PoseLandmark.LEFT_ELBOW.value]
    right_elbow = landmarks[mp_pose.PoseLandmark.RIGHT_ELBOW.value]
    left_wrist = landmarks[mp_pose.PoseLandmark.LEFT_WRIST.value]
    right_wrist = landmarks[mp_pose.PoseLandmark.RIGHT_WRIST.value]

    # Verifica se os braços estão levantados (acima dos ombros)
    left_arm_up = left_elbow.y < left_shoulder.y or left_wrist.y < left_shoulder.y
    right_arm_up = right_elbow.y < right_shoulder.y or right_wrist.y < right_shoulder.y

    return {
        "left_arm_up": left_arm_up,
        "right_arm_up": right_arm_up
    }

def detect_faces_and_emotions(video_path, output_path, known_face_encodings, known_face_names):
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        print("Erro ao abrir o vídeo.")
        return

    width, height = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)), int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fps, total_frames = int(cap.get(cv2.CAP_PROP_FPS)), int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

    # Inicializa o MediaPipe Pose
    mp_pose = mp.solutions.pose
    pose = mp_pose.Pose(model_complexity=0, min_detection_confidence=0.5, min_tracking_confidence=0.5)
    mp_drawing = mp.solutions.drawing_utils

    # Configura o gravador de vídeo
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))

    movements_log = []
    emotions_log = []
    analyzed_frames = 0
    anomalies = 0

    # Variáveis para detecção de movimentos anômalos
    prev_landmarks = None
    speed_history = deque(maxlen=30)  # Mantém histórico dos últimos 30 frames
    movement_pattern = deque(maxlen=5)  # Mantém padrão de movimentos recentes

    for _ in tqdm(range(total_frames), desc="Processando vídeo"):
        ret, frame = cap.read()
        if not ret:
            break

        analyzed_frames += 1
        frame_anomalies = []  # Anomalias detectadas no frame atual

        # Detecção de pose e análise de movimento
        rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        pose_results = pose.process(rgb_frame)

        if pose_results.pose_landmarks:
            # Calcula velocidade do movimento
            current_speed = calculate_movement_speed(
                prev_landmarks,
                pose_results.pose_landmarks.landmark,
                mp_pose
            )
            speed_history.append(current_speed)

            # Verifica se o movimento é anômalo
            if is_movement_anomalous(current_speed, list(speed_history)):
                anomalies += 1
                frame_anomalies.append("Movimento brusco detectado")
                # Desenha alerta no frame
                cv2.putText(frame, "MOVIMENTO ANOMALO!", (10, 60),
                           cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2)

            prev_landmarks = pose_results.pose_landmarks.landmark

            # Detecta movimentos dos braços
            movements = detect_arm_movements(pose_results.pose_landmarks.landmark, mp_pose)

            # Registra o movimento atual
            if movements["left_arm_up"] and movements["right_arm_up"]:
                current_movement = "Ambos os braços levantados"
            elif movements["left_arm_up"]:
                current_movement = "Braco esquerdo levantado"
            elif movements["right_arm_up"]:
                current_movement = "Braco direito levantado"
            else:
                current_movement = "Bracos abaixados"

            movement_pattern.append(current_movement)
            movements_log.append(current_movement)

            # Verifica padrões anômalos de movimento
            if len(movement_pattern) == 5:
                if len(set(movement_pattern)) >= 4:  # Muitas mudanças rápidas
                    frame_anomalies.append("Padrão de movimento errático")
                    anomalies += 1

            # Desenha as landmarks do pose
            mp_drawing.draw_landmarks(frame, pose_results.pose_landmarks, mp_pose.POSE_CONNECTIONS)

            # Exibe informações no frame
            cv2.putText(frame, f'Movimento: {current_movement}', (10, 30),
                       cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)

        # Detecção de emoções com DeepFace
        try:
            result = DeepFace.analyze(frame, actions=['emotion'], enforce_detection=False)
            # Reconhecimento de face
            rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            face_locations = face_recognition.face_locations(rgb_frame,model="hog")
            face_encodings = face_recognition.face_encodings(rgb_frame, face_locations)
            face_names = []

            for face_encoding in face_encodings:
                matches = face_recognition.compare_faces(known_face_encodings, face_encoding)
                name = "Desconhecido"
                face_distances = face_recognition.face_distance(known_face_encodings, face_encoding)
                best_match_index = np.argmin(face_distances)
                if matches[best_match_index]:
                    name = known_face_names[best_match_index]
                face_names.append(name)

            if result:
                for face in result:
                    emotions_log.append(face['dominant_emotion'])

                    # Desenha informações no frame
                    x, y, w, h = face['region']['x'], face['region']['y'], face['region']['w'], face['region']['h']
                    cv2.rectangle(frame, (x, y), (x+w, y+h), (0, 255, 0), 2)
                    cv2.putText(frame, face['dominant_emotion'], (x, y-10),
                              cv2.FONT_HERSHEY_SIMPLEX, 0.9, (36, 255, 12), 2)

                    # Associar a face detectada pelo DeepFace com as faces conhecidas
                    for (top, right, bottom, left), name in zip(face_locations, face_names):
                        if x <= left <= x + w and y <= top <= y + h:
                            # Escrever o nome abaixo da face
                            cv2.putText(frame, name, (x + 6, y + h - 6), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (255, 255, 255), 2)
                            break
        except Exception as e:
            print(f"Erro na análise DeepFace: {e}")

        # Adiciona informações ao frame
        if frame_anomalies:
            anomaly_text = " | ".join(frame_anomalies)
            cv2.putText(frame, f'Anomalias: {anomaly_text}', (10, 90),
                       cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2)

        out.write(frame)

    cap.release()
    out.release()
    pose.close()

    return movements_log, emotions_log, analyzed_frames, anomalies

def categorizador(movements_log):
    """Categoriza os movimentos detectados."""
    texts = [
        "Ambos os braços levantados", "Braço esquerdo levantado", "Braço direito levantado", "Braços abaixados",
        "Ambos os braços levantados", "Braço direito levantado", "Braço esquerdo levantado", "Braços abaixados",
        "Braço esquerdo levantado", "Ambos os braços levantados", "Braço direito levantado", "Braços abaixados",
        "Braço direito levantado", "Ambos os braços levantados", "Braço esquerdo levantado", "Braços abaixados"
    ]

    labels = [
        "levantar_ambos", "levantar_esquerdo", "levantar_direito", "repouso",
        "levantar_ambos", "levantar_direito", "levantar_esquerdo", "repouso",
        "levantar_esquerdo", "levantar_ambos", "levantar_direito", "repouso",
        "levantar_direito", "levantar_ambos", "levantar_esquerdo", "repouso"
    ]

    # Dividir os dados em conjuntos de treinamento e teste
    X_train, X_test, y_train, y_test = train_test_split(texts, labels, test_size=0.2, random_state=42)

    # Criar um pipeline de transformação de texto e classificação
    model = make_pipeline(TfidfVectorizer(), MultinomialNB())

    # Treinar o modelo
    model.fit(X_train, y_train)

    # Fazer previsões no conjunto de teste
    predicted_labels = model.predict(X_test)

    # Avaliar o modelo
    print(metrics.classification_report(y_test, predicted_labels, zero_division=0))

    # Classificar novos textos
    predicted_new_labels = model.predict(movements_log)
    text_categorizador = [f"{movement}-{label}" for movement, label in zip(movements_log, predicted_new_labels)]

    # Transformando em uma única string
    text_completo = " ".join(text_categorizador)
    print(text_completo)
    return text_completo, predicted_new_labels

def resumidor(movements_log, emotions_log, labels, analyzed_frames, anomalies):
    """Gera um resumo completo da análise."""
    # Resumo das Ações Detectadas
    action_frequencies = {}
    for action in set(movements_log):
        action_frequencies[action] = movements_log.count(action) / len(movements_log) * 100
    action_summary = "Resumo das Ações Detectadas:\n"
    for action, frequency in action_frequencies.items():
        action_summary += f"- {action}: {frequency:.2f}% do tempo\n"

    # Resumo das Emoções Detectadas
    emotion_frequencies = {}
    for emotion in set(emotions_log):
        emotion_frequencies[emotion] = emotions_log.count(emotion) / len(emotions_log) * 100
    emotion_summary = "Resumo das Emoções Detectadas:\n"
    for emotion, frequency in emotion_frequencies.items():
        emotion_summary += f"- {emotion}: {frequency:.2f}% do tempo\n"

    # Resumo das Labels Detectadas
    labels_frequencies = {}
    for label in set(labels):
        labels_frequencies[label] = labels.count(label) / len(labels) * 100
    label_summary = "Resumo das Labels Detectadas:\n"
    for label, frequency in labels_frequencies.items():
        label_summary += f"- {label}: {frequency:.2f}% do tempo\n"

    # Adicionar informações sobre frames e anomalias
    frame_summary = f"\nInformações de Análise:\n"
    frame_summary += f"- Total de frames analisados: {analyzed_frames}\n"
    frame_summary += f"- Número de movimentos anômalos detectados: {anomalies}\n"
    frame_summary += f"- Porcentagem de frames com anomalias: {(anomalies/analyzed_frames*100):.2f}%\n"

    return action_summary, emotion_summary, label_summary, frame_summary

# Código principal
def main():
    script_dir = "/content/drive/MyDrive/Pós Tech - IA PARA DEVS/Pós Tech/FASE 4/"
    #script_dir = "/content/drive/MyDrive/FASE 4/"
    image_folder = os.path.join(script_dir, 'images')
    known_face_encodings, known_face_names = load_images_from_folder(image_folder)

    input_video_path = os.path.join(script_dir, 'video-tech.mp4')
    output_video_path = os.path.join(script_dir, 'output_video_recognize.mp4')

    # Executa a análise do vídeo
    movements_log, emotions_log, analyzed_frames, anomalies = detect_faces_and_emotions(
        input_video_path, output_video_path, known_face_encodings, known_face_names)

    # Categoriza os movimentos
    text_completo, labels = categorizador(movements_log)

    # Gera o resumo
    action_summary, emotion_summary, label_summary, frame_summary = resumidor(
        movements_log, emotions_log, list(labels), analyzed_frames, anomalies)

    # Imprime os result
    print(action_summary)
    print(emotion_summary)
    print(label_summary)
    print(frame_summary)

main()

24-11-22 14:58:07 - Directory /root/.deepface has been created
24-11-22 14:58:07 - Directory /root/.deepface/weights has been created
Downloading model to /usr/local/lib/python3.10/dist-packages/mediapipe/modules/pose_landmark/pose_landmark_lite.tflite


Processando vídeo:   0%|          | 0/3326 [00:00<?, ?it/s]

24-11-22 14:59:53 - facial_expression_model_weights.h5 will be downloaded...


Downloading...
From: https://github.com/serengil/deepface_models/releases/download/v1.0/facial_expression_model_weights.h5
To: /root/.deepface/weights/facial_expression_model_weights.h5

100%|██████████| 5.98M/5.98M [00:00<00:00, 89.8MB/s]
Processando vídeo: 100%|██████████| 3326/3326 [1:12:28<00:00,  1.31s/it]

                   precision    recall  f1-score   support

   levantar_ambos       1.00      1.00      1.00         1
 levantar_direito       1.00      1.00      1.00         1
levantar_esquerdo       1.00      1.00      1.00         2

         accuracy                           1.00         4
        macro avg       1.00      1.00      1.00         4
     weighted avg       1.00      1.00      1.00         4

Bracos abaixados-repouso Bracos abaixados-repouso Bracos abaixados-repouso Bracos abaixados-repouso Bracos abaixados-repouso Bracos abaixados-repouso Bracos abaixados-repouso Bracos abaixados-repouso Bracos abaixados-repouso Bracos abaixados-repouso Bracos abaixados-repouso Bracos abaixados-repouso Bracos abaixados-repouso Bracos abaixados-repouso Bracos abaixados-repouso Bracos abaixados-repouso Bracos abaixados-repouso Bracos abaixados-repouso Bracos abaixados-repouso Bracos abaixados-repouso Bracos abaixados-repouso Bracos abaixados-repouso Bracos abaixados-repouso Bracos ab