In [1]:
import torch, cv2, facenet_pytorch, timm, emotiefflib, numpy as np

print("torch:", torch.__version__)
print("cuda:", torch.cuda.is_available(), "/", torch.cuda.get_device_name(0) if torch.cuda.is_available() else "no gpu")
print("cv2:", cv2.__version__)
print("timm:", timm.__version__)

from emotiefflib.facial_analysis import EmotiEffLibRecognizer, get_model_list
from facenet_pytorch import MTCNN

print("models:", get_model_list()[:5])

device = "cuda" if torch.cuda.is_available() else "cpu"
mtcnn = MTCNN(keep_all=True, device=device)
rec = EmotiEffLibRecognizer(engine="torch", model_name="enet_b0_8_best_vgaf", device=device)

print("✅ EmotiEffLib + MTCNN 초기화 완료")


  from .autonotebook import tqdm as notebook_tqdm


torch: 2.9.1+cu126
cuda: True / NVIDIA GeForce RTX 4050 Laptop GPU
cv2: 4.11.0
timm: 0.9.12
models: ['enet_b0_8_best_vgaf', 'enet_b0_8_best_afew', 'enet_b2_8', 'enet_b0_8_va_mtl', 'enet_b2_7']
Downloading enet_b0_8_best_vgaf.pt from https://github.com/sb-ai-lab/EmotiEffLib/blob/main/models/affectnet_emotions/enet_b0_8_best_vgaf.pt?raw=true
✅ EmotiEffLib + MTCNN 초기화 완료


In [5]:
import cv2
import torch
import numpy as np
from facenet_pytorch import MTCNN
from emotiefflib.facial_analysis import EmotiEffLibRecognizer, get_model_list

def main():
    # 1. 디바이스 설정
    device = "cuda" if torch.cuda.is_available() else "cpu"
    print("[INFO] Using device:", device)

    # 2. 모델 리스트 확인 (옵션)
    print("[INFO] Available models:", get_model_list())
    model_name = "enet_b0_8_best_vgaf"  # 8-class 감정 모델

    # 3. MTCNN (얼굴 검출기) + EmotiEffLib (감정 인식기)
    mtcnn = MTCNN(keep_all=True, device=device)

    # 공식 문서 기준: engine="torch", model_name, device 세 개만 넘김
    emotion_recognizer = EmotiEffLibRecognizer(
        engine="torch",
        model_name=model_name,
        device=device,
    )
    print("[INFO] EmotiEffLib model loaded:", model_name)
    print("[INFO] Press 'q' to quit")

    # 4. 웹캠 열기
    cap = cv2.VideoCapture(0, cv2.CAP_DSHOW)
    if not cap.isOpened():
        print("[ERROR] Cannot open webcam")
        return

    while True:
        ret, frame = cap.read()
        if not ret:
            print("[ERROR] Failed to grab frame")
            break

        # BGR(OpenCV) → RGB
        frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)

        # 4-1. 얼굴 bbox 검출
        boxes, _ = mtcnn.detect(frame_rgb)

        emotions = []
        facial_images_np = []

        if boxes is not None:
            # 4-2. 얼굴 텐서 추출 (MTCNN 공식 사용법)
            # keep_all=True 이므로 faces는 (N, 3, H, W) 텐서일 가능성이 큼
            faces = mtcnn(frame_rgb)

            if faces is not None:
                # faces가 torch.Tensor인 경우와 리스트인 경우를 모두 처리
                if isinstance(faces, torch.Tensor):
                    if faces.ndim == 3:  # (3, H, W) → 1개 얼굴
                        faces = faces.unsqueeze(0)  # (1, 3, H, W)
                    faces_iter = [f for f in faces]
                else:
                    # 이미 리스트/튜플이면 그대로
                    faces_iter = list(faces)

                # 4-3. 얼굴 텐서를 numpy(H, W, C, uint8)로 변환
                for face in faces_iter:
                    face_np = face.permute(1, 2, 0).cpu().numpy()      # (H, W, C), float32, [-1, 1] 근처
                    face_np = (face_np * 128 + 127.5).clip(0, 255).astype(np.uint8)
                    facial_images_np.append(face_np)

                if len(facial_images_np) > 0:
                    try:
                        # EmotiEffLib 공식 문서: (H, W, C) RGB np.ndarray 리스트를 넣어줌
                        emotions, _ = emotion_recognizer.predict_emotions(
                            facial_images_np,
                            logits=True,  # 확률만 원하면 False로
                        )
                    except Exception as e:
                        print("[ERROR] Emotion prediction failed:", e)
                        emotions = ["error"] * len(facial_images_np)
            else:
                boxes = None  # faces 없으면 boxes도 같이 무시
        print(emotions, _)
        # 4-4. bbox + 감정 라벨 그리기
        if boxes is not None and emotions:
            for box, emotion in zip(boxes, emotions):
                if box is None:
                    continue
                x1, y1, x2, y2 = [int(b) for b in box]
                cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
                cv2.putText(
                    frame,
                    emotion,
                    (x1, y1 - 10),
                    cv2.FONT_HERSHEY_SIMPLEX,
                    0.7,
                    (0, 255, 0),
                    2,
                    cv2.LINE_AA,
                )
        else:
            cv2.putText(
                frame,
                "No face detected",
                (20, 40),
                cv2.FONT_HERSHEY_SIMPLEX,
                1.0,
                (0, 0, 255),
                2,
                cv2.LINE_AA,
            )

        cv2.imshow("EmotiEffLib Webcam Emotion Recognition (press 'q' to quit)", frame)

        if cv2.waitKey(1) & 0xFF == ord("q"):
            break

    cap.release()
    cv2.destroyAllWindows()


if __name__ == "__main__":
    main()


[INFO] Using device: cuda
[INFO] Available models: ['enet_b0_8_best_vgaf', 'enet_b0_8_best_afew', 'enet_b2_8', 'enet_b0_8_va_mtl', 'enet_b2_7', 'mbf_va_mtl', 'mobilevit_va_mtl']
[INFO] EmotiEffLib model loaded: enet_b0_8_best_vgaf
[INFO] Press 'q' to quit
['Neutral'] [[1.1882356e-01 3.1404372e-02 3.9325215e-04 2.4907892e-03 1.4925869e-03
  4.9974835e-01 3.4211966e-01 3.5274166e-03]]
['Neutral'] [[0.15038133 0.02921563 0.00069888 0.00137091 0.00207686 0.4828465
  0.32904074 0.00436922]]
['Neutral'] [[1.0987236e-01 3.3147104e-02 1.9615873e-04 1.8376418e-03 1.3343418e-03
  5.3034306e-01 3.2077417e-01 2.4951203e-03]]
['Neutral'] [[1.3325629e-01 2.7485231e-02 4.7035387e-04 2.5909957e-03 1.3393170e-03
  4.8330066e-01 3.4812614e-01 3.4310631e-03]]
['Neutral'] [[0.10126219 0.02902524 0.00050957 0.0035685  0.00170875 0.48656476
  0.37278518 0.00457577]]
['Neutral'] [[9.3358666e-02 3.0664444e-02 2.2952790e-04 2.1803258e-03 1.5964846e-03
  5.7867676e-01 2.8949457e-01 3.7991896e-03]]
['Neutral'] [

In [6]:
import cv2
import torch
import numpy as np
import time

from facenet_pytorch import MTCNN
from emotiefflib.facial_analysis import EmotiEffLibRecognizer, get_model_list

# (옵션) 이미 mediapipe 설치했으면 포즈도 같이 사용 가능
try:
    import mediapipe as mp
    USE_POSE = True
    mp_drawing = mp.solutions.drawing_utils
    mp_pose = mp.solutions.pose
except ImportError:
    USE_POSE = False

# -------------------------------
# 1. 장치 / 모델 초기화
# -------------------------------
device = "cuda" if torch.cuda.is_available() else "cpu"
print("[INFO] Using device:", device)
print("[INFO] Available models:", get_model_list())

# 얼굴 검출기 (MTCNN)
mtcnn = MTCNN(keep_all=True, device=device)

# 감정 인식 모델
model_name = "enet_b0_8_best_vgaf"
emotion_recognizer = EmotiEffLibRecognizer(
    engine="torch",
    model_name=model_name,
    device=device,
)
print("[INFO] EmotiEffLib model loaded:", model_name)
print("[INFO] Press 'q' to quit")

# AffectNet 8-class 레이블 (문서 기준 순서)
EMOTION_LABELS = [
    "Anger",
    "Contempt",
    "Disgust",
    "Fear",
    "Happiness",
    "Neutral",
    "Sadness",
    "Surprise",
]

# FPS 계산용
prev_time = time.time()
fps = 0.0

# 콘솔에 전체 분포 출력 주기 (초)
last_print_time = 0.0
PRINT_INTERVAL = 1.0  # 1초마다 한번씩만 출력

# Pose 초기화
pose = mp_pose.Pose(min_detection_confidence=0.5,
                    min_tracking_confidence=0.5) if USE_POSE else None

# -------------------------------
# 2. 웹캠 루프
# -------------------------------
cap = cv2.VideoCapture(0)
if not cap.isOpened():
    print("[ERROR] Cannot open webcam")
    raise SystemExit

try:
    while True:
        ret, frame = cap.read()
        if not ret:
            print("[ERROR] Failed to grab frame")
            break

        # FPS 계산
        now_time = time.time()
        dt = now_time - prev_time
        if dt > 0:
            fps = 1.0 / dt
        prev_time = now_time

        # BGR -> RGB
        frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)

        # ---------------- 얼굴 검출 ----------------
        boxes, _ = mtcnn.detect(frame_rgb)
        faces = mtcnn(frame_rgb)  # (N, 3, H, W) 텐서 or None

        all_scores = None  # 모든 얼굴의 확률 저장용

        if faces is not None:
            facial_images_np = []
            for face in faces:
                # (C, H, W) -> (H, W, C)
                face_np = face.permute(1, 2, 0).cpu().numpy()
                # [-1,1] -> [0,255] uint8
                face_np = (face_np * 128 + 127.5).clip(0, 255).astype(np.uint8)
                facial_images_np.append(face_np)

            # 감정 예측: logits=False 로 바로 확률(probabilities) 받기
            # emotions: 예측된 감정 레이블 리스트 (문자열)
            # scores: shape (N_faces, 8) 의 확률 (0~1)
            emotions, scores = emotion_recognizer.predict_emotions(
                facial_images_np, logits=False
            )
            all_scores = np.array(scores)  # (N, 8)

            # 콘솔에 1초에 한 번, 첫 번째 얼굴의 전체 분포 출력
            if len(all_scores) > 0 and (now_time - last_print_time) >= PRINT_INTERVAL:
                print("=== Emotion probabilities (Face 0) ===")
                for emo_idx, emo_name in enumerate(EMOTION_LABELS):
                    prob = all_scores[0, emo_idx] * 100.0
                    print(f"{emo_name:9s}: {prob:5.1f}%")
                print("======================================")
                last_print_time = now_time

            # 박스 + 감정 + 확률 오버레이
            if boxes is not None:
                for i, box in enumerate(boxes):
                    if box is None:
                        continue
                    x1, y1, x2, y2 = [int(b) for b in box]
                    emotion = emotions[i]

                    # 이 얼굴의 최고 확률
                    if all_scores is not None and i < all_scores.shape[0]:
                        top_prob = float(all_scores[i].max()) * 100.0
                        text = f"{emotion} {top_prob:.1f}%"
                    else:
                        text = emotion

                    # 얼굴 박스
                    cv2.rectangle(frame, (x1, y1), (x2, y2),
                                  (0, 255, 0), 2)
                    # 감정+확률 텍스트
                    cv2.putText(frame, text, (x1, max(y1 - 10, 10)),
                                cv2.FONT_HERSHEY_SIMPLEX,
                                0.7, (0, 255, 0), 2, cv2.LINE_AA)

        # ---------------- 포즈 추정 (선택) ----------------
        if USE_POSE and pose is not None:
            results = pose.process(frame_rgb)
            if results.pose_landmarks:
                mp_drawing.draw_landmarks(
                    frame,
                    results.pose_landmarks,
                    mp_pose.POSE_CONNECTIONS
                )

        # ---------------- FPS 표시 ----------------
        fps_text = f"FPS: {fps:.1f}"
        cv2.putText(frame, fps_text, (10, 30),
                    cv2.FONT_HERSHEY_SIMPLEX,
                    1.0, (0, 255, 255), 2, cv2.LINE_AA)

        # 화면 출력
        cv2.imshow("Webcam Emotion + (Pose)", frame)
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break
finally:
    cap.release()
    if USE_POSE and pose is not None:
        pose.close()
    cv2.destroyAllWindows()


[INFO] Using device: cuda
[INFO] Available models: ['enet_b0_8_best_vgaf', 'enet_b0_8_best_afew', 'enet_b2_8', 'enet_b0_8_va_mtl', 'enet_b2_7', 'mbf_va_mtl', 'mobilevit_va_mtl']
[INFO] EmotiEffLib model loaded: enet_b0_8_best_vgaf
[INFO] Press 'q' to quit
=== Emotion probabilities (Face 0) ===
Anger    :   1.3%
Contempt :   0.2%
Disgust  :   0.0%
Fear     :   0.3%
Happiness:   0.0%
Neutral  :  42.1%
Sadness  :  54.9%
Surprise :   1.2%
=== Emotion probabilities (Face 0) ===
Anger    :   2.0%
Contempt :   1.5%
Disgust  :   0.0%
Fear     :   1.1%
Happiness:   0.2%
Neutral  :  62.7%
Sadness  :  25.5%
Surprise :   6.9%
=== Emotion probabilities (Face 0) ===
Anger    :   2.1%
Contempt :   2.5%
Disgust  :   0.0%
Fear     :   0.3%
Happiness:   0.4%
Neutral  :  70.4%
Sadness  :  22.7%
Surprise :   1.5%
=== Emotion probabilities (Face 0) ===
Anger    :   2.9%
Contempt :   8.0%
Disgust  :   0.0%
Fear     :   0.2%
Happiness:   1.4%
Neutral  :  75.7%
Sadness  :   9.3%
Surprise :   2.4%
=== Emotion 

In [7]:
import cv2
import torch
import numpy as np
import time
import os
from datetime import datetime

from facenet_pytorch import MTCNN
from emotiefflib.facial_analysis import EmotiEffLibRecognizer, get_model_list

# (옵션) mediapipe가 설치되어 있으면 포즈도 같이 사용
try:
    import mediapipe as mp
    USE_POSE = True
    mp_drawing = mp.solutions.drawing_utils
    mp_pose = mp.solutions.pose
except ImportError:
    USE_POSE = False


# -----------------------------
# 1. 기본 설정 및 모델 초기화
# -----------------------------
device = "cuda" if torch.cuda.is_available() else "cpu"
print("[INFO] Using device:", device)
print("[INFO] Available EmotiEffLib models:", get_model_list())

# 얼굴 검출기
mtcnn = MTCNN(keep_all=True, device=device)

# 감정 인식 모델
model_name = "enet_b0_8_best_vgaf"
emotion_recognizer = EmotiEffLibRecognizer(
    engine="torch",
    model_name=model_name,
    device=device,
)
print("[INFO] EmotiEffLib model loaded:", model_name)

# AffectNet 8-class 레이블 순서 (문서 기준)
EMOTION_LABELS = [
    "Anger",
    "Contempt",
    "Disgust",
    "Fear",
    "Happiness",
    "Neutral",
    "Sadness",
    "Surprise",
]

# 로그 저장 폴더
LOG_DIR = "logs"
os.makedirs(LOG_DIR, exist_ok=True)

# FPS 계산용
prev_time = time.time()
fps = 0.0

# 녹화(로깅) 상태 관리
recording = False
record_start_time = 0.0
record_buffer = []  # 각 프레임의 feature를 리스트로 쌓음

# -----------------------------
# 2. Pose 초기화(선택)
# -----------------------------
pose = None
if USE_POSE:
    mp_pose_module = mp.solutions.pose
    pose = mp_pose_module.Pose(
        static_image_mode=False,
        model_complexity=1,
        enable_segmentation=False,
        min_detection_confidence=0.5,
        min_tracking_confidence=0.5,
    )
    print("[INFO] MediaPipe Pose enabled")
else:
    print("[INFO] MediaPipe Pose NOT used (mediapipe not installed)")


# -----------------------------
# 3. CSV 저장 함수
# -----------------------------
def save_log_to_csv(records):
    """records: list of dict, 각 dict는 한 프레임의 정보"""
    if not records:
        print("[INFO] No records to save.")
        return

    ts = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
    filename = os.path.join(LOG_DIR, f"emotion_log_{ts}.csv")

    # 필드 이름(컬럼)
    fieldnames = [
        "t",
        "fps",
        "top_emotion",
        "top_prob",
    ] + [f"prob_{emo}" for emo in EMOTION_LABELS]

    import csv
    with open(filename, "w", newline="", encoding="utf-8") as f:
        writer = csv.DictWriter(f, fieldnames=fieldnames)
        writer.writeheader()
        for rec in records:
            writer.writerow(rec)

    print(f"[INFO] Saved log: {filename}")


# -----------------------------
# 4. 메인 루프
# -----------------------------
cap = cv2.VideoCapture(0, cv2.CAP_DSHOW)
if not cap.isOpened():
    print("[ERROR] Cannot open webcam")
    raise SystemExit

print("[INFO] Press SPACE to record 3s log, 'q' to quit.")

try:
    while True:
        ret, frame = cap.read()
        if not ret:
            print("[ERROR] Failed to grab frame")
            break

        # FPS 계산
        now_time = time.time()
        dt = now_time - prev_time
        if dt > 0:
            fps = 1.0 / dt
        prev_time = now_time

        # 원본 복사 (표시 용도)
        display_frame = frame.copy()

        # BGR -> RGB
        frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)

        # -----------------------------
        # A. Pose 추정 (선택)
        # -----------------------------
        if USE_POSE and pose is not None:
            results = pose.process(frame_rgb)
            if results.pose_landmarks:
                mp_drawing.draw_landmarks(
                    display_frame,
                    results.pose_landmarks,
                    mp_pose_module.POSE_CONNECTIONS,
                )
                # 나중에 feature로 쓰고 싶으면 여기서 x,y,z,visibility 추출해서 기록 가능

        # -----------------------------
        # B. 얼굴 감정 인식
        # -----------------------------
        boxes, _ = mtcnn.detect(frame_rgb)
        faces = mtcnn(frame_rgb)

        # 기본값 (얼굴 없을 때)
        top_emotion = "none"
        top_prob = 0.0
        probs_vec = np.zeros(len(EMOTION_LABELS), dtype=float)

        if faces is not None:
            facial_images_np = []
            # faces: (N, 3, H, W) 텐서일 가능성이 큼
            if isinstance(faces, torch.Tensor):
                if faces.ndim == 3:  # (3,H,W) -> 1개
                    faces = faces.unsqueeze(0)
                faces_iter = [f for f in faces]
            else:
                faces_iter = list(faces)

            for face in faces_iter:
                face_np = face.permute(1, 2, 0).cpu().numpy()
                face_np = (face_np * 128 + 127.5).clip(0, 255).astype(np.uint8)
                facial_images_np.append(face_np)

            if len(facial_images_np) > 0:
                emotions, scores = emotion_recognizer.predict_emotions(
                    facial_images_np, logits=False  # 확률로 받기
                )
                scores = np.array(scores)  # (N, 8)

                # 첫 번째 얼굴 기준으로 top emotion / prob 계산
                probs_vec = scores[0]  # shape: (8,)
                top_idx = int(np.argmax(probs_vec))
                top_emotion = EMOTION_LABELS[top_idx]
                top_prob = float(probs_vec[top_idx]) * 100.0

                # 화면에 박스 + 감정 표시 (모든 얼굴에 대해)
                if boxes is not None:
                    for i, box in enumerate(boxes):
                        if box is None:
                            continue
                        x1, y1, x2, y2 = [int(b) for b in box]
                        emo = emotions[i]
                        emo_prob = float(scores[i].max()) * 100.0

                        text = f"{emo} {emo_prob:.1f}%"
                        cv2.rectangle(display_frame, (x1, y1), (x2, y2),
                                      (0, 255, 0), 2)
                        cv2.putText(display_frame, text,
                                    (x1, max(y1 - 10, 10)),
                                    cv2.FONT_HERSHEY_SIMPLEX,
                                    0.7, (0, 255, 0), 2,
                                    cv2.LINE_AA)
        else:
            cv2.putText(display_frame, "No face detected",
                        (20, 40),
                        cv2.FONT_HERSHEY_SIMPLEX,
                        1.0, (0, 0, 255), 2,
                        cv2.LINE_AA)

        # -----------------------------
        # C. 녹화(로깅) 상태 처리
        # -----------------------------
        if recording:
            t_rel = now_time - record_start_time  # 녹화 시작으로부터 경과 시간
            # 로그용 dict 하나 만들기
            record = {
                "t": t_rel,
                "fps": fps,
                "top_emotion": top_emotion,
                "top_prob": top_prob,
            }
            for idx, emo_name in enumerate(EMOTION_LABELS):
                record[f"prob_{emo_name}"] = float(probs_vec[idx]) * 100.0
            record_buffer.append(record)

            # 화면에 REC 표시
            cv2.putText(display_frame, "REC",
                        (10, 70),
                        cv2.FONT_HERSHEY_SIMPLEX,
                        1.0, (0, 0, 255), 2,
                        cv2.LINE_AA)

            # 3초 지나면 자동 종료 + 저장
            if t_rel >= 3.0:
                recording = False
                print(f"[INFO] Recording finished. Frames: {len(record_buffer)}")
                save_log_to_csv(record_buffer)
                record_buffer = []  # 비우기

        # -----------------------------
        # D. FPS 표시
        # -----------------------------
        fps_text = f"FPS: {fps:.1f}"
        cv2.putText(display_frame, fps_text, (10, 30),
                    cv2.FONT_HERSHEY_SIMPLEX,
                    1.0, (0, 255, 255), 2,
                    cv2.LINE_AA)

        # 화면 출력
        cv2.imshow("Emotion + (Pose) + Logging", display_frame)

        key = cv2.waitKey(1) & 0xFF
        if key == ord("q"):
            break
        elif key == ord(" "):  # SPACE 누르면 3초 녹화 시작
            if not recording:
                recording = True
                record_start_time = now_time
                record_buffer = []
                print("[INFO] Recording started for 3 seconds...")

finally:
    cap.release()
    if USE_POSE and pose is not None:
        pose.close()
    cv2.destroyAllWindows()


[INFO] Using device: cuda
[INFO] Available EmotiEffLib models: ['enet_b0_8_best_vgaf', 'enet_b0_8_best_afew', 'enet_b2_8', 'enet_b0_8_va_mtl', 'enet_b2_7', 'mbf_va_mtl', 'mobilevit_va_mtl']
[INFO] EmotiEffLib model loaded: enet_b0_8_best_vgaf
[INFO] MediaPipe Pose enabled
[INFO] Press SPACE to record 3s log, 'q' to quit.
[INFO] Recording started for 3 seconds...
[INFO] Recording finished. Frames: 16
[INFO] Saved log: logs\emotion_log_2025-11-28_16-33-35.csv


In [1]:
import cv2
import torch
import numpy as np
import time
import os
from datetime import datetime

from facenet_pytorch import MTCNN
from emotiefflib.facial_analysis import EmotiEffLibRecognizer, get_model_list

# (옵션) mediapipe가 설치되어 있으면 포즈도 같이 사용
try:
    import mediapipe as mp
    USE_POSE = True
    mp_drawing = mp.solutions.drawing_utils
    mp_pose = mp.solutions.pose
except ImportError:
    USE_POSE = False

# -----------------------------
# 0. Pose 랜드마크 이름 (33개)
# -----------------------------
LANDMARK_NAMES = [
    "nose",               # 0
    "left_eye_inner",     # 1
    "left_eye",           # 2
    "left_eye_outer",     # 3
    "right_eye_inner",    # 4
    "right_eye",          # 5
    "right_eye_outer",    # 6
    "left_ear",           # 7
    "right_ear",          # 8
    "mouth_left",         # 9
    "mouth_right",        # 10
    "left_shoulder",      # 11
    "right_shoulder",     # 12
    "left_elbow",         # 13
    "right_elbow",        # 14
    "left_wrist",         # 15
    "right_wrist",        # 16
    "left_pinky",         # 17
    "right_pinky",        # 18
    "left_index",         # 19
    "right_index",        # 20
    "left_thumb",         # 21
    "right_thumb",        # 22
    "left_hip",           # 23
    "right_hip",          # 24
    "left_knee",          # 25
    "right_knee",         # 26
    "left_ankle",         # 27
    "right_ankle",        # 28
    "left_heel",          # 29
    "right_heel",         # 30
    "left_foot_index",    # 31
    "right_foot_index",   # 32
]

# -----------------------------
# 1. 기본 설정 및 모델 초기화
# -----------------------------
device = "cuda" if torch.cuda.is_available() else "cpu"
print("[INFO] Using device:", device)
print("[INFO] Available EmotiEffLib models:", get_model_list())

# 얼굴 검출기
mtcnn = MTCNN(keep_all=True, device=device)

# 감정 인식 모델
model_name = "enet_b0_8_best_vgaf"
emotion_recognizer = EmotiEffLibRecognizer(
    engine="torch",
    model_name=model_name,
    device=device,
)
print("[INFO] EmotiEffLib model loaded:", model_name)

# AffectNet 8-class 레이블 순서 (문서 기준)
EMOTION_LABELS = [
    "Anger",
    "Contempt",
    "Disgust",
    "Fear",
    "Happiness",
    "Neutral",
    "Sadness",
    "Surprise",
]

# 로그 저장 폴더
LOG_DIR = "logs"
os.makedirs(LOG_DIR, exist_ok=True)

# FPS 계산용
prev_time = time.time()
fps = 0.0

# 녹화(로깅) 상태 관리
recording = False
record_start_time = 0.0
record_buffer = []  # 각 프레임의 feature를 리스트로 쌓음

# -----------------------------
# 2. Pose 초기화(선택)
# -----------------------------
pose = None
if USE_POSE:
    mp_pose_module = mp.solutions.pose
    pose = mp_pose_module.Pose(
        static_image_mode=False,
        model_complexity=1,
        enable_segmentation=False,
        min_detection_confidence=0.5,
        min_tracking_confidence=0.5,
    )
    print("[INFO] MediaPipe Pose enabled")
else:
    print("[INFO] MediaPipe Pose NOT used (mediapipe not installed)")

# -----------------------------
# 3. CSV 저장 함수
# -----------------------------
def save_log_to_csv(records):
    """records: list of dict, 각 dict는 한 프레임의 정보"""
    if not records:
        print("[INFO] No records to save.")
        return

    ts = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
    filename = os.path.join(LOG_DIR, f"emotion_log_{ts}.csv")

    # 필드 이름(컬럼)
    fieldnames = [
        "t",
        "fps",
        "top_emotion",
        "top_prob",
    ] + [f"prob_{emo}" for emo in EMOTION_LABELS]

    # Pose 랜드마크 컬럼 추가 (각 랜드마크당 x,y,z,vis)
    for name in LANDMARK_NAMES:
        fieldnames.extend([
            f"{name}_x",
            f"{name}_y",
            f"{name}_z",
            f"{name}_vis",
        ])

    import csv
    with open(filename, "w", newline="", encoding="utf-8") as f:
        writer = csv.DictWriter(f, fieldnames=fieldnames)
        writer.writeheader()
        for rec in records:
            writer.writerow(rec)

    print(f"[INFO] Saved log: {filename}")

# -----------------------------
# 4. 메인 루프
# -----------------------------
cap = cv2.VideoCapture(0, cv2.CAP_DSHOW)
if not cap.isOpened():
    print("[ERROR] Cannot open webcam")
    raise SystemExit

print("[INFO] Press SPACE to record 3s log, 'q' to quit.")

try:
    while True:
        ret, frame = cap.read()
        if not ret:
            print("[ERROR] Failed to grab frame")
            break

        # FPS 계산
        now_time = time.time()
        dt = now_time - prev_time
        if dt > 0:
            fps = 1.0 / dt
        prev_time = now_time

        # 원본 복사 (표시 용도)
        display_frame = frame.copy()

        # BGR -> RGB
        frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)

        # -----------------------------
        # A. Pose 추정 (선택) + 랜드마크 저장 준비
        # -----------------------------
        pose_landmarks = None  # 이 프레임에서의 랜드마크 리스트 (없으면 None)

        if USE_POSE and pose is not None:
            results = pose.process(frame_rgb)
            if results.pose_landmarks:
                mp_drawing.draw_landmarks(
                    display_frame,
                    results.pose_landmarks,
                    mp_pose_module.POSE_CONNECTIONS,
                )
                pose_landmarks = results.pose_landmarks.landmark  # 길이 33 리스트

        # -----------------------------
        # B. 얼굴 감정 인식
        # -----------------------------
        boxes, _ = mtcnn.detect(frame_rgb)
        faces = mtcnn(frame_rgb)

        # 기본값 (얼굴 없을 때)
        top_emotion = "none"
        top_prob = 0.0
        probs_vec = np.zeros(len(EMOTION_LABELS), dtype=float)

        if faces is not None:
            facial_images_np = []
            # faces: (N, 3, H, W) 텐서일 가능성이 큼
            if isinstance(faces, torch.Tensor):
                if faces.ndim == 3:  # (3,H,W) -> 1개
                    faces = faces.unsqueeze(0)
                faces_iter = [f for f in faces]
            else:
                faces_iter = list(faces)

            for face in faces_iter:
                face_np = face.permute(1, 2, 0).cpu().numpy()
                face_np = (face_np * 128 + 127.5).clip(0, 255).astype(np.uint8)
                facial_images_np.append(face_np)

            if len(facial_images_np) > 0:
                emotions, scores = emotion_recognizer.predict_emotions(
                    facial_images_np, logits=False  # 확률로 받기
                )
                scores = np.array(scores)  # (N, 8)

                # 첫 번째 얼굴 기준으로 top emotion / prob 계산
                probs_vec = scores[0]  # shape: (8,)
                top_idx = int(np.argmax(probs_vec))
                top_emotion = EMOTION_LABELS[top_idx]
                top_prob = float(probs_vec[top_idx]) * 100.0

                # 화면에 박스 + 감정 표시 (모든 얼굴에 대해)
                if boxes is not None:
                    for i, box in enumerate(boxes):
                        if box is None:
                            continue
                        x1, y1, x2, y2 = [int(b) for b in box]
                        emo = emotions[i]
                        emo_prob = float(scores[i].max()) * 100.0

                        text = f"{emo} {emo_prob:.1f}%"
                        cv2.rectangle(display_frame, (x1, y1), (x2, y2),
                                      (0, 255, 0), 2)
                        cv2.putText(display_frame, text,
                                    (x1, max(y1 - 10, 10)),
                                    cv2.FONT_HERSHEY_SIMPLEX,
                                    0.7, (0, 255, 0), 2,
                                    cv2.LINE_AA)
        else:
            cv2.putText(display_frame, "No face detected",
                        (20, 40),
                        cv2.FONT_HERSHEY_SIMPLEX,
                        1.0, (0, 0, 255), 2,
                        cv2.LINE_AA)

        # -----------------------------
        # C. 녹화(로깅) 상태 처리
        # -----------------------------
        if recording:
            t_rel = now_time - record_start_time  # 녹화 시작으로부터 경과 시간

            # 기본 로그 필드
            record = {
                "t": t_rel,
                "fps": fps,
                "top_emotion": top_emotion,
                "top_prob": top_prob,
            }
            for idx, emo_name in enumerate(EMOTION_LABELS):
                record[f"prob_{emo_name}"] = float(probs_vec[idx]) * 100.0

            # Pose 랜드마크 값 기본은 -999로 채움
            for name in LANDMARK_NAMES:
                record[f"{name}_x"] = -999.0
                record[f"{name}_y"] = -999.0
                record[f"{name}_z"] = -999.0
                record[f"{name}_vis"] = -999.0

            # 이 프레임에서 포즈가 보였으면, visibility > 0.5 인 것만 값 채우기
            if pose_landmarks is not None:
                for idx, name in enumerate(LANDMARK_NAMES):
                    if idx >= len(pose_landmarks):
                        break
                    lm = pose_landmarks[idx]
                    vis = float(lm.visibility) if lm.visibility is not None else 0.0
                    if vis > 0.5:  # "보이는" 기준, 필요하면 조정 가능
                        record[f"{name}_x"] = float(lm.x)
                        record[f"{name}_y"] = float(lm.y)
                        record[f"{name}_z"] = float(lm.z)
                        record[f"{name}_vis"] = vis
                    # vis <= 0.5 인 경우는 기본값 -999 유지

            record_buffer.append(record)

            # 화면에 REC 표시
            cv2.putText(display_frame, "REC",
                        (10, 70),
                        cv2.FONT_HERSHEY_SIMPLEX,
                        1.0, (0, 0, 255), 2,
                        cv2.LINE_AA)

            # 3초 지나면 자동 종료 + 저장
            if t_rel >= 3.0:
                recording = False
                print(f"[INFO] Recording finished. Frames: {len(record_buffer)}")
                save_log_to_csv(record_buffer)
                record_buffer = []  # 비우기

        # -----------------------------
        # D. FPS 표시
        # -----------------------------
        fps_text = f"FPS: {fps:.1f}"
        cv2.putText(display_frame, fps_text, (10, 30),
                    cv2.FONT_HERSHEY_SIMPLEX,
                    1.0, (0, 255, 255), 2,
                    cv2.LINE_AA)

        # 화면 출력
        cv2.imshow("Emotion + (Pose) + Logging", display_frame)

        key = cv2.waitKey(1) & 0xFF
        if key == ord("q"):
            break
        elif key == ord(" "):  # SPACE 누르면 3초 녹화 시작
            if not recording:
                recording = True
                record_start_time = now_time
                record_buffer = []
                print("[INFO] Recording started for 3 seconds...")

finally:
    cap.release()
    if USE_POSE and pose is not None:
        pose.close()
    cv2.destroyAllWindows()


  from .autonotebook import tqdm as notebook_tqdm


[INFO] Using device: cuda
[INFO] Available EmotiEffLib models: ['enet_b0_8_best_vgaf', 'enet_b0_8_best_afew', 'enet_b2_8', 'enet_b0_8_va_mtl', 'enet_b2_7', 'mbf_va_mtl', 'mobilevit_va_mtl']
[INFO] EmotiEffLib model loaded: enet_b0_8_best_vgaf
[INFO] MediaPipe Pose enabled
[INFO] Press SPACE to record 3s log, 'q' to quit.
[INFO] Recording started for 3 seconds...
[INFO] Recording finished. Frames: 17
[INFO] Saved log: logs\emotion_log_2025-11-28_17-07-54.csv


In [2]:
import pandas as pd
import numpy as np
import os
import json
import glob

# 설정
LOG_DIR = "logs"
OUTPUT_DIR = "preprocessed"
os.makedirs(OUTPUT_DIR, exist_ok=True)

def detect_head_gesture(df_vis):
    """
    코(nose)의 움직임을 통해 끄덕임(Nodding)이나 가로저음(Shaking)을 감지
    """
    if df_vis['nose_vis'].mean() < 0.5:
        return "Not Detected"

    # 이동 평균으로 노이즈 제거
    nose_x = df_vis['nose_x'].rolling(window=5).mean().fillna(method='bfill')
    nose_y = df_vis['nose_y'].rolling(window=5).mean().fillna(method='bfill')

    # 움직임의 분산(Variance) 계산 (움직임의 크기)
    var_x = nose_x.var() * 10000  # 스케일 보정
    var_y = nose_y.var() * 10000

    # 움직임이 너무 적으면 Static
    if var_x < 0.05 and var_y < 0.05:
        return "Static (Still)"

    # X축 움직임이 Y축보다 현저히 크면 -> Shaking (No/Confusion)
    if var_x > var_y * 1.5:
        return "Head Shaking (Negative/Confusion)"
    
    # Y축 움직임이 X축보다 현저히 크면 -> Nodding (Yes/Agreed)
    if var_y > var_x * 1.5:
        return "Head Nodding (Positive/Understood)"

    return "Dynamic (Moving)"

def analyze_posture_lean(df_vis):
    """
    어깨의 Z축 변화를 통해 몸을 앞으로 기울였는지(집중) 뒤로 뺐는지(이완) 판단
    MediaPipe에서 Z값은 카메라에 가까울수록 작아짐 (음수 방향)
    """
    # 어깨가 보이는 프레임만
    if df_vis['left_shoulder_vis'].mean() < 0.5:
        return "Unknown"

    # 초반 30% vs 후반 30% Z값 비교
    n = len(df_vis)
    start_z = df_vis[['left_shoulder_z', 'right_shoulder_z']].iloc[:int(n*0.3)].mean().mean()
    end_z = df_vis[['left_shoulder_z', 'right_shoulder_z']].iloc[int(n*0.7):].mean().mean()
    
    diff = start_z - end_z # 양수면 나중이 더 작아짐(가까워짐)

    # 임계값 (실험적으로 조정 필요)
    if diff > 0.05: 
        return "Leaning Forward (High Engagement)"
    elif diff < -0.05:
        return "Leaning Backward (Relaxed/Bored)"
    else:
        return "Stable Posture"

def analyze_behavior(file_path):
    try:
        # -999를 NaN으로 변환하여 로드
        df = pd.read_csv(file_path)
        df.replace(-999.0, np.nan, inplace=True)
    except Exception as e:
        print(f"[ERR] Load failed: {file_path}")
        return None

    if len(df) < 10: return None

    # 1. Pose 데이터가 유효한 행만 추출 (코가 보이는 프레임)
    df_vis = df.dropna(subset=['nose_x'])
    has_pose = len(df_vis) > len(df) * 0.5  # 전체의 50% 이상 포즈가 잡혔는지

    # 2. 감정 분석 (Baseline 제거 로직)
    # Neutral, Sadness는 '기본 상태'로 간주. 그 외 감정의 스파이크 감지
    emotion_cols = [c for c in df.columns if c.startswith("prob_")]
    avg_emotions = df[emotion_cols].mean()
    
    # 주요 감정 추출 (Neutral 제외 Top 1)
    sorted_emotions = avg_emotions.drop("prob_Neutral", errors='ignore').sort_values(ascending=False)
    dominant_sub_emotion = sorted_emotions.index[0].replace("prob_", "")
    dominant_score = sorted_emotions.iloc[0]

    # 감정 해석
    emotion_summary = f"Mainly Neutral, but shows traces of {dominant_sub_emotion} ({dominant_score:.1f}%)"
    if dominant_sub_emotion == "Sadness":
        emotion_context = "Concentrated / Serious"
    elif dominant_sub_emotion in ["Anger", "Disgust", "Contempt"]:
        emotion_context = "Dissatisfied / Uncomfortable"
    elif dominant_sub_emotion in ["Fear", "Surprise"]:
        emotion_context = "Confused / Surprised"
    elif dominant_sub_emotion == "Happiness":
        emotion_context = "Satisfied / Amused"
    else:
        emotion_context = "Passive"

    # 3. 행동(Pose) 분석
    gesture = "Not Detected"
    posture = "Unknown"
    
    if has_pose:
        gesture = detect_head_gesture(df_vis)
        posture = analyze_posture_lean(df_vis)

    # 4. LLM용 최종 요약 생성 (JSON)
    result = {
        "metadata": {
            "duration": f"{df['t'].max():.1f}s",
            "fps": f"{df['fps'].mean():.1f}"
        },
        "behavior_analysis": {
            "facial_expression": emotion_context,
            "detailed_emotion": emotion_summary,
            "head_gesture": gesture,
            "body_posture": posture
        },
        "final_interpretation_for_llm": ""
    }

    # 5. 종합 해석 문자열 생성 (Rule-based)
    # 이 부분이 LLM as Judge의 핵심 Input이 됩니다.
    interpretation = []
    
    if "Forward" in posture:
        interpretation.append("User is highly engaged and leaning in to read details.")
    elif "Backward" in posture:
        interpretation.append("User is sitting back, possibly bored or just skimming.")
        
    if "Nodding" in gesture:
        interpretation.append("User is nodding, indicating agreement or understanding.")
    elif "Shaking" in gesture:
        interpretation.append("User is shaking head, indicating confusion or disagreement.")
        
    if "Dissatisfied" in emotion_context:
        interpretation.append("Facial expressions show signs of dissatisfaction or frustration.")
    elif "Confused" in emotion_context:
        interpretation.append("User seems confused by the content.")
    
    if not interpretation:
        interpretation.append("User showed no significant non-verbal reaction (Passive reading).")
        
    result["final_interpretation_for_llm"] = " ".join(interpretation)

    return result

def process_all():
    files = glob.glob(os.path.join(LOG_DIR, "*.csv"))
    print(f"[INFO] Found {len(files)} logs.")
    
    for f in files:
        res = analyze_behavior(f)
        if res:
            out_name = os.path.basename(f).replace(".csv", "_analysis.json")
            with open(os.path.join(OUTPUT_DIR, out_name), "w", encoding="utf-8") as json_f:
                json.dump(res, json_f, indent=4, ensure_ascii=False)
            print(f"[DONE] {out_name}")

if __name__ == "__main__":
    process_all()

[INFO] Found 1 logs.
[DONE] emotion_log_2025-11-28_17-07-54_analysis.json


  nose_x = df_vis['nose_x'].rolling(window=5).mean().fillna(method='bfill')
  nose_y = df_vis['nose_y'].rolling(window=5).mean().fillna(method='bfill')
