사람 + 총 + 칼 인식 //
pose estimation을 모든 사람에 대해 적용(왼팔들기, 오른팔들기, 양팔들기) //
사람과 총 혹은 칼의 bounding box가 겹치는 경우에는 위험인으로 분류 -> 한 번 위험인으로 분류된 사람의 ID는 dangerous_ids로 관리됨 -> dangerous_ids 중 하나에 해당하는 사람은 Deepsort로 추적하여 Arcface를 적용(인증된 얼굴인지 확인)

In [5]:
#1. 얼굴 학습시키는 부분
import cv2
import numpy as np
import os
from insightface.app import FaceAnalysis

def initialize_arcface():
    app = FaceAnalysis(name="buffalo_l")  # ArcFace 모델 (buffalo_l은 기본 권장)
    app.prepare(ctx_id=-1, det_size=(640, 640))  # GPU: ctx_id=0, CPU: -1
    return app

def get_face_embedding(app, image_bgr):
    # ArcFace의 app.get()은 BGR 형식으로 이미지를 받기도 합니다.
    # 만약 RGB가 필요하면 cvtColor로 변환하세요.
    faces = app.get(image_bgr)
    if len(faces) > 0:
        return faces[0].embedding  # 첫 번째 얼굴의 임베딩
    else:
        return None

def generate_average_embedding(app, folder_path):
    embeddings = []
    for file in os.listdir(folder_path):
        if file.lower().endswith(('.jpg', '.jpeg', '.png')):
            img_path = os.path.join(folder_path, file)
            image = cv2.imread(img_path)
            if image is None:
                print(f"이미지 로드 실패: {img_path}")
                continue
            
            embedding = get_face_embedding(app, image)
            if embedding is not None:
                embeddings.append(embedding)
            else:
                print(f"얼굴 검출 실패: {img_path}")
    
    if len(embeddings) == 0:
        raise ValueError("임베딩을 하나도 생성하지 못했습니다.")
    
    avg_embedding = np.mean(embeddings, axis=0)
    return avg_embedding

if __name__ == "__main__":
    app = initialize_arcface()
    # 내 얼굴 사진 폴더
    my_face_folder = "C:/Users/idea0/EE101/Jongsul/myface"  
    my_face_embedding = generate_average_embedding(app, my_face_folder)
    np.save("my_face_embedding.npy", my_face_embedding)  # 필요 시 저장
    print("내 얼굴 평균 임베딩 생성 완료.")



Applied providers: ['CPUExecutionProvider'], with options: {'CPUExecutionProvider': {}}
find model: C:\Users\idea0/.insightface\models\buffalo_l\1k3d68.onnx landmark_3d_68 ['None', 3, 192, 192] 0.0 1.0
Applied providers: ['CPUExecutionProvider'], with options: {'CPUExecutionProvider': {}}
find model: C:\Users\idea0/.insightface\models\buffalo_l\2d106det.onnx landmark_2d_106 ['None', 3, 192, 192] 0.0 1.0
Applied providers: ['CPUExecutionProvider'], with options: {'CPUExecutionProvider': {}}
find model: C:\Users\idea0/.insightface\models\buffalo_l\det_10g.onnx detection [1, 3, '?', '?'] 127.5 128.0
Applied providers: ['CPUExecutionProvider'], with options: {'CPUExecutionProvider': {}}
find model: C:\Users\idea0/.insightface\models\buffalo_l\genderage.onnx genderage ['None', 3, 96, 96] 0.0 1.0
Applied providers: ['CPUExecutionProvider'], with options: {'CPUExecutionProvider': {}}
find model: C:\Users\idea0/.insightface\models\buffalo_l\w600k_r50.onnx recognition ['None', 3, 112, 112] 127.

  P = np.linalg.lstsq(X_homo, Y)[0].T # Affine matrix. 3 x 4


In [17]:
import cv2
import mediapipe as mp
import time
import torch
import numpy as np

from ultralytics import YOLO
from sklearn.metrics.pairwise import cosine_similarity
from insightface.app import FaceAnalysis

# Deep SORT (예: deep_sort_realtime)
from deep_sort_realtime.deepsort_tracker import DeepSort

############################
# 1) 모델 및 함수 초기화
############################
device = 'cuda' if torch.cuda.is_available() else 'cpu'

# (A) YOLO 모델 (사람=0, 총=1, 칼=2)
model = YOLO("C:/Users/idea0/EE101/Jongsul/Yolomodels/epoch180.pt").to(device)

# (B) ArcFace
arc_app = FaceAnalysis(name="buffalo_l")
arc_app.prepare(ctx_id=-1, det_size=(640,640))

my_face_embedding = np.load("my_face_embedding.npy")

def get_face_embedding(arc_app, face_img_bgr):
    faces = arc_app.get(face_img_bgr)
    if len(faces) == 0:
        return None
    return faces[0].embedding

def is_my_face(face_embedding, my_embedding, threshold=0.4):
    sim = cosine_similarity([face_embedding], [my_embedding])[0][0]
    return (sim > threshold), sim

# (C) Mediapipe Pose (전신) - 모든 사람에게 적용할 수도, 일부에 적용할 수도 있음
mp_pose = mp.solutions.pose
pose = mp_pose.Pose(
    static_image_mode=False,
    model_complexity=1,
    enable_segmentation=False,
    min_detection_confidence=0.7,
    min_tracking_confidence=0.7
)
mp_drawing = mp.solutions.drawing_utils

# Pose에서 사용할 간단한 팔 들기 판별
LEFT_SHOULDER = 11
RIGHT_SHOULDER = 12
LEFT_WRIST = 15
RIGHT_WRIST = 16

def is_arm_raised(shoulder_y, wrist_y, threshold=0.05):
    # y=0 상단, y=1 하단 (작을수록 위)
    return wrist_y < (shoulder_y - threshold)

# (D) 박스 overlap
def boxes_overlap(boxA, boxB):
    (x1A, y1A, x2A, y2A) = boxA
    (x1B, y1B, x2B, y2B) = boxB
    overlap_x = not (x2A < x1B or x2B < x1A)
    overlap_y = not (y2A < y1B or y2B < y1A)
    return overlap_x and overlap_y

############################
# 2) DeepSORT 초기화
############################

# deep_sort_realtime 버전에 따라 파라미터 이름이 다를 수 있음
#tracker = DeepSort(
#    max_age=30,
#    n_init=2,
#    max_iou_distance=0.95,       #default 0.7
#    max_cosine_distance=0.95,   # <-- max_dist 대신 여기를 사용 default 0.5
#    nn_budget=100,
#    override_track_class=None,
#    embedder="mobilenet",
#    half=True
#)

# DeepSort 초기화
tracker = DeepSort(max_age=30,
                   n_init=3,
                   nms_max_overlap=1.0,
                   embedder='mobilenet',
                   half=True,
                   embedder_gpu=True)  # GPU 사용 가능 시 True


############################
# 3) 메인 루프
############################
cap = cv2.VideoCapture(0)
if not cap.isOpened():
    print("카메라를 열 수 없습니다.")
    exit()

prev_time = time.time()

# 위험 인물(무기와 교차)로 판별된 track_id 저장
dangerous_ids = set()

while True:
    ret, frame = cap.read()
    if not ret:
        print("프레임 읽기 실패!")
        break

    # -------------------------------------------
    # (1) Mediapipe Pose: 전체 프레임에 대한 분석
    # -------------------------------------------
    rgb_pose = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    pose_results = pose.process(rgb_pose)

    action_text = ""
    if pose_results.pose_landmarks:
        landmarks = pose_results.pose_landmarks.landmark
        left_shoulder_y = landmarks[LEFT_SHOULDER].y
        right_shoulder_y = landmarks[RIGHT_SHOULDER].y
        left_wrist_y = landmarks[LEFT_WRIST].y
        right_wrist_y = landmarks[RIGHT_WRIST].y

        left_arm_up = is_arm_raised(left_shoulder_y, left_wrist_y)
        right_arm_up = is_arm_raised(right_shoulder_y, right_wrist_y)

        if left_arm_up and right_arm_up:
            action_text = "both arms up"
        elif left_arm_up:
            action_text = "left arm up"
        elif right_arm_up:
            action_text = "right arm up"
        else:
            action_text = "do nothing"

        mp_drawing.draw_landmarks(frame, pose_results.pose_landmarks, mp_pose.POSE_CONNECTIONS)

    cv2.putText(frame, action_text, (30, 50),
                cv2.FONT_HERSHEY_SIMPLEX, 1.2, (0, 255, 0), 3)

    # -------------------------------------------
    # (2) YOLO 추론
    # -------------------------------------------
    rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    results = model(rgb_frame)

    person_detections = []  # -> DeepSORT에 보낼 사람 감지
    weapon_boxes = []       # 총(1) 또는 칼(2)

    for box in results[0].boxes:
        x1, y1, x2, y2 = map(int, box.xyxy[0])
        class_id = int(box.cls)
        conf = float(box.conf)
        class_name = model.names[class_id]

        # 바운딩박스 시각화
        cv2.rectangle(frame, (x1,y1), (x2,y2), (0,255,0), 2)
        label = f"{class_name}: {conf:.2f}"
        cv2.putText(frame, label, (x1,y1-10),
                    cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255,0,0), 2)

        if class_id == 0:  # person
            w = x2 - x1
            h = y2 - y1
            person_detections.append(((x1, y1, w, h), conf, 0))
        elif class_id in [1,2]:  # gun/knife
            weapon_boxes.append((x1,y1,x2,y2))

    # -------------------------------------------
    # (3) DeepSORT로 사람 추적
    # -------------------------------------------
    tracks = tracker.update_tracks(person_detections, frame=rgb_frame)

    # 임시로 저장
    tracked_boxes = []  # (track_id, x1,y1,x2,y2)

    for t in tracks:
        if not t.is_confirmed() or t.time_since_update > 1:
            continue
        track_id = t.track_id
        ltrb = t.to_ltrb()  # left, top, right, bottom
        x1t, y1t, x2t, y2t = map(int, ltrb)
        tracked_boxes.append((track_id, x1t, y1t, x2t, y2t))

        # 시각화
        cv2.rectangle(frame, (x1t,y1t), (x2t,y2t), (255,255,0), 2)
        cv2.putText(frame, f"ID:{track_id}", (x1t,y1t-10),
                    cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0,255,255), 2)

    # -------------------------------------------
    # (4) 무기와 교차 => dangerous_ids에 추가
    # -------------------------------------------
    for (tid, px1, py1, px2, py2) in tracked_boxes:
        person_box = (px1, py1, px2, py2)
        for wb in weapon_boxes:
            if boxes_overlap(person_box, wb):
                dangerous_ids.add(tid)
                break

    # -------------------------------------------
    # (5) ArcFace: dangerous_ids에 속하는 트랙만
    # -------------------------------------------
    for (tid, px1, py1, px2, py2) in tracked_boxes:
        if tid in dangerous_ids:
            # ArcFace
            person_crop = frame[py1:py2, px1:px2]
            if person_crop.size == 0:
                continue

            face_embedding = get_face_embedding(arc_app, person_crop)
            if face_embedding is not None:
                same_person, sim = is_my_face(face_embedding, my_face_embedding, threshold=0.4)
                if same_person:
                    color = (0,255,0)
                    text_arc = f"             Me(sim={sim:.2f})"
                else:
                    color = (0,0,255)
                    text_arc = f"             NotMe(sim={sim:.2f})"

                cv2.rectangle(frame, (px1,py1), (px2,py2), color, 2)
                cv2.putText(frame, text_arc, (px1, py1-10),
                            cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2)

    # FPS
    current_time = time.time()
    fps = 1.0 / (current_time - prev_time)
    prev_time = current_time
    cv2.putText(frame, f"FPS: {fps:.2f}", (10,30),
                cv2.FONT_HERSHEY_SIMPLEX, 1, (0,255,0), 2)

    cv2.imshow("DeepSORT + YOLO + ArcFace + Pose + Weapons", frame)
#    time.sleep(0.1)
    
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

cap.release()
cv2.destroyAllWindows()



Applied providers: ['CPUExecutionProvider'], with options: {'CPUExecutionProvider': {}}
find model: C:\Users\idea0/.insightface\models\buffalo_l\1k3d68.onnx landmark_3d_68 ['None', 3, 192, 192] 0.0 1.0
Applied providers: ['CPUExecutionProvider'], with options: {'CPUExecutionProvider': {}}
find model: C:\Users\idea0/.insightface\models\buffalo_l\2d106det.onnx landmark_2d_106 ['None', 3, 192, 192] 0.0 1.0
Applied providers: ['CPUExecutionProvider'], with options: {'CPUExecutionProvider': {}}
find model: C:\Users\idea0/.insightface\models\buffalo_l\det_10g.onnx detection [1, 3, '?', '?'] 127.5 128.0
Applied providers: ['CPUExecutionProvider'], with options: {'CPUExecutionProvider': {}}
find model: C:\Users\idea0/.insightface\models\buffalo_l\genderage.onnx genderage ['None', 3, 96, 96] 0.0 1.0
Applied providers: ['CPUExecutionProvider'], with options: {'CPUExecutionProvider': {}}
find model: C:\Users\idea0/.insightface\models\buffalo_l\w600k_r50.onnx recognition ['None', 3, 112, 112] 127.

  P = np.linalg.lstsq(X_homo, Y)[0].T # Affine matrix. 3 x 4



0: 480x640 2 personnnns, 79.0ms
Speed: 2.0ms preprocess, 79.0ms inference, 1.0ms postprocess per image at shape (1, 3, 480, 640)


  P = np.linalg.lstsq(X_homo, Y)[0].T # Affine matrix. 3 x 4



0: 480x640 2 personnnns, 76.0ms
Speed: 2.0ms preprocess, 76.0ms inference, 1.0ms postprocess per image at shape (1, 3, 480, 640)


  P = np.linalg.lstsq(X_homo, Y)[0].T # Affine matrix. 3 x 4



0: 480x640 2 personnnns, 74.0ms
Speed: 2.0ms preprocess, 74.0ms inference, 1.0ms postprocess per image at shape (1, 3, 480, 640)


  P = np.linalg.lstsq(X_homo, Y)[0].T # Affine matrix. 3 x 4



0: 480x640 2 personnnns, 77.0ms
Speed: 2.0ms preprocess, 77.0ms inference, 0.0ms postprocess per image at shape (1, 3, 480, 640)


  P = np.linalg.lstsq(X_homo, Y)[0].T # Affine matrix. 3 x 4



0: 480x640 1 personnnn, 72.0ms
Speed: 1.0ms preprocess, 72.0ms inference, 1.0ms postprocess per image at shape (1, 3, 480, 640)


  P = np.linalg.lstsq(X_homo, Y)[0].T # Affine matrix. 3 x 4



0: 480x640 1 personnnn, 79.0ms
Speed: 2.0ms preprocess, 79.0ms inference, 1.0ms postprocess per image at shape (1, 3, 480, 640)

0: 480x640 1 personnnn, 78.5ms
Speed: 2.0ms preprocess, 78.5ms inference, 1.0ms postprocess per image at shape (1, 3, 480, 640)

0: 480x640 2 personnnns, 77.0ms
Speed: 1.0ms preprocess, 77.0ms inference, 1.0ms postprocess per image at shape (1, 3, 480, 640)

0: 480x640 1 personnnn, 72.0ms
Speed: 1.0ms preprocess, 72.0ms inference, 2.0ms postprocess per image at shape (1, 3, 480, 640)
