In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
"""mediapipe 설치"""

# MediaPipe와 호환되는 numpy 버전을 강제 재설치
!pip install numpy==1.26.4 mediapipe==0.10.21 --force-reinstall

Collecting numpy==1.26.4
  Downloading numpy-1.26.4-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (61 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/61.0 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m61.0/61.0 kB[0m [31m2.2 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting mediapipe==0.10.21
  Downloading mediapipe-0.10.21-cp312-cp312-manylinux_2_28_x86_64.whl.metadata (9.7 kB)
Collecting absl-py (from mediapipe==0.10.21)
  Downloading absl_py-2.3.1-py3-none-any.whl.metadata (3.3 kB)
Collecting attrs>=19.1.0 (from mediapipe==0.10.21)
  Downloading attrs-25.4.0-py3-none-any.whl.metadata (10 kB)
Collecting flatbuffers>=2.0 (from mediapipe==0.10.21)
  Downloading flatbuffers-25.9.23-py2.py3-none-any.whl.metadata (875 bytes)
Collecting jax (from mediapipe==0.10.21)
  Downloading jax-0.8.1-py3-none-any.whl.metadata (13 kB)
Collecting jaxlib (from mediapipe==0.10.21)
  Downloading jaxlib-

In [None]:
!pip install ultralytics

Collecting ultralytics
  Downloading ultralytics-8.3.233-py3-none-any.whl.metadata (37 kB)
Collecting ultralytics-thop>=2.0.18 (from ultralytics)
  Downloading ultralytics_thop-2.0.18-py3-none-any.whl.metadata (14 kB)
Collecting numpy>=1.23.0 (from ultralytics)
  Downloading numpy-2.2.6-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (62 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m62.0/62.0 kB[0m [31m4.0 MB/s[0m eta [36m0:00:00[0m
Downloading ultralytics-8.3.233-py3-none-any.whl (1.1 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.1/1.1 MB[0m [31m19.7 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading numpy-2.2.6-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (16.5 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m16.5/16.5 MB[0m [31m73.9 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading ultralytics_thop-2.0.18-py3-none-any.whl (28 kB)
Installing collected packages: numpy, ultralytics-thop, 

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import mediapipe as mp
from ultralytics import YOLO
from tensorflow.keras.models import load_model
from google.colab import drive
import os

# 1. 구글 드라이브 마운트
drive.mount('/content/drive')

# GPU 설정
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

# =========================================================
# [필수] Stage 1 모델 아키텍처 정의 (PyTorch 모델 로드용)
# =========================================================

JOINT_CONNECTIONS = [
    (0, 1), (1, 2), (2, 3), (3, 7), (0, 4), (4, 5), (5, 6), (6, 8),
    (9, 10), (11, 12), (11, 13), (13, 15), (12, 14), (14, 16)
]

def get_adjacency_matrix(num_joints=33):
    A = torch.eye(num_joints)
    for i, j in JOINT_CONNECTIONS:
        A[i, j] = 1
        A[j, i] = 1
    D_inv_sqrt = torch.diag(torch.pow(A.sum(1), -0.5))
    return D_inv_sqrt @ A @ D_inv_sqrt

class GraphConv(nn.Module):
    def __init__(self, in_features, out_features, A):
        super().__init__()
        self.A = A
        self.fc = nn.Linear(in_features, out_features)

    def forward(self, x):
        Ax = torch.einsum('ij,bjc->bic', self.A, x)
        return F.relu(self.fc(Ax))

class GCN_LSTM(nn.Module):
    def __init__(self, num_joints=33, in_features=4, gcn_hidden=64, lstm_hidden=128):
        super().__init__()
        self.A = get_adjacency_matrix(num_joints).to(torch.float32).to(device)
        self.gcn1 = GraphConv(in_features, gcn_hidden, self.A)
        self.gcn2 = GraphConv(gcn_hidden, gcn_hidden, self.A)
        self.lstm = nn.LSTM(gcn_hidden * num_joints, lstm_hidden, batch_first=True, bidirectional=True)
        self.fc = nn.Linear(lstm_hidden * 2, 1)

    def forward(self, x):
        B, T, J, C = x.shape
        feats = []
        for t in range(T):
            xt = x[:, t, :, :]
            h = self.gcn1(xt)
            h = self.gcn2(h)
            feats.append(h.view(B, -1))
        x_seq = torch.stack(feats, dim=1)
        _, (hn, _) = self.lstm(x_seq)
        hn_cat = torch.cat((hn[0], hn[1]), dim=1)
        return self.fc(hn_cat).squeeze(1)

# =========================================================
# [실행] 모델 불러오기
# =========================================================

STAGE1_MODEL_PATH = "/content/drive/MyDrive/elderlycare_violence/model/best_model_GCN-BILSTM.pth"
STAGE2_MODEL_PATH = "/content/drive/MyDrive/elderlycare_violence/model/skeleton_pinch_v3.keras"

print("\n--- 모델 로드 시작 ---")

# 1. Stage 1 (전신 폭행) 모델 로드
try:
    print("Loading Stage 1 (GCN-BiLSTM + YOLO + Pose)...")

    # YOLO & MediaPipe 초기화
    yolo_model = YOLO("yolov8n.pt")
    pose_detector = mp.solutions.pose.Pose(
        static_image_mode=False, model_complexity=2, min_detection_confidence=0.2
    )

    # GCN 모델 초기화 및 가중치 로드
    model_stage1 = GCN_LSTM().to(device)
    if os.path.exists(STAGE1_MODEL_PATH):
        model_stage1.load_state_dict(torch.load(STAGE1_MODEL_PATH, map_location=device))
        model_stage1.eval() # 평가 모드로 전환 (Dropout 등 비활성화)
        print("Stage 1 모델 가중치 로드 성공!")
    else:
        print(f"오류: Stage 1 파일을 찾을 수 없습니다. 경로: {STAGE1_MODEL_PATH}")

except Exception as e:
    print(f" Stage 1 로드 중 에러 발생: {e}")

# 2. Stage 2 (손 꼬집기) 모델 로드
try:
    print("Loading Stage 2 (Hand Pinch DNN)...")

    # MediaPipe Hands 초기화
    hands_detector = mp.solutions.hands.Hands(
        static_image_mode=False, max_num_hands=2, min_detection_confidence=0.5
    )

    # Keras 모델 로드
    if os.path.exists(STAGE2_MODEL_PATH):
        model_stage2 = load_model(STAGE2_MODEL_PATH)
        print(" Stage 2 모델 로드 성공!")
    else:
        print(f"오류: Stage 2 파일을 찾을 수 없습니다. 경로: {STAGE2_MODEL_PATH}")

except Exception as e:
    print(f"Stage 2 로드 중 에러 발생: {e}")

print("\n------------------------------------------------")
if 'model_stage1' in locals() and 'model_stage2' in locals():
    print("모든 모델이 정상적으로 메모리에 로드되었습니다.")
else:
    print("일부 모델 로드에 실패했습니다. 경로를 다시 확인해주세요.")

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
Using device: cpu

--- 모델 로드 시작 ---
Loading Stage 1 (GCN-BiLSTM + YOLO + Pose)...
Stage 1 모델 가중치 로드 성공!
Loading Stage 2 (Hand Pinch DNN)...
 Stage 2 모델 로드 성공!

------------------------------------------------
모든 모델이 정상적으로 메모리에 로드되었습니다.


In [11]:
import cv2
import numpy as np
import torch
import pandas as pd
import glob
import os
from tqdm import tqdm
from mediapipe.framework.formats import landmark_pb2

# ---------------------------------------------------------
# 1. 전처리 함수 정의 (밝기 보정)
# ---------------------------------------------------------
def enhance_brightness(frame):
    ycrcb = cv2.cvtColor(frame, cv2.COLOR_BGR2YCrCb)
    y, cr, cb = cv2.split(ycrcb)
    y = cv2.equalizeHist(y)
    merged = cv2.merge([y, cr, cb])
    frame_enhanced = cv2.cvtColor(merged, cv2.COLOR_YCrCb2BGR)
    # alpha=1.4, beta=20
    return cv2.convertScaleAbs(frame_enhanced, alpha=1.4, beta=20)

# ---------------------------------------------------------
# 2. Stage 1 데이터 추출 함수 (YOLO + Pose -> GCN Input)
# ---------------------------------------------------------
def extract_skeleton_stage1(video_path, yolo_model, pose_detector, num_frames=32):
    cap = cv2.VideoCapture(video_path)
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

    if total_frames == 0:
        return None

    # 32프레임 균등 추출
    frame_indices = np.linspace(0, total_frames - 1, num_frames, dtype=np.int32)
    sequence = []

    for idx in frame_indices:
        cap.set(cv2.CAP_PROP_POS_FRAMES, idx)
        ret, frame = cap.read()
        if not ret or frame is None:
            sequence.append(np.zeros((33, 4)))
            continue

        frame = enhance_brightness(frame)

        # YOLO 사람 탐지
        results = yolo_model(frame, verbose=False)

        if len(results[0].boxes) == 0:
            sequence.append(np.zeros((33, 4)))
            continue

        # 가장 큰 사람 박스 선택
        boxes = results[0].boxes.xyxy.cpu().numpy()
        areas = [(x2 - x1) * (y2 - y1) for x1, y1, x2, y2 in boxes]
        main_box = boxes[np.argmax(areas)]
        x1, y1, x2, y2 = map(int, main_box)

        # Crop
        person_crop = frame[max(0, y1):y2, max(0, x1):x2]
        if person_crop.size == 0:
            sequence.append(np.zeros((33, 4)))
            continue

        # MediaPipe Pose
        rgb = cv2.cvtColor(person_crop, cv2.COLOR_BGR2RGB)
        result = pose_detector.process(rgb)

        if result.pose_landmarks:
            landmarks = result.pose_landmarks.landmark
            coords = np.array([[lm.x, lm.y, lm.z, lm.visibility] for lm in landmarks], dtype=np.float32)
            # 정규화 (Center at 0)
            coords[:, 0] = (coords[:, 0] - 0.5) * 2
            coords[:, 1] = (coords[:, 1] - 0.5) * 2
        else:
            coords = np.zeros((33, 4), dtype=np.float32)
        sequence.append(coords)

    cap.release()
    return np.array(sequence, dtype=np.float32)

# ---------------------------------------------------------
# 3. Stage 2 데이터 추출 함수 (Hands -> DNN Input)
# ---------------------------------------------------------
def extract_hand_features_stage2(video_path, hands_detector):
    cap = cv2.VideoCapture(video_path)
    # 전체 프레임을 다 보면 느리므로 5프레임 단위로 샘플링
    frame_skip = 5
    all_hand_features = []

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret: break

        frame_idx = int(cap.get(cv2.CAP_PROP_POS_FRAMES))
        if frame_idx % frame_skip != 0: continue

        frame = enhance_brightness(frame)
        image_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        results = hands_detector.process(image_rgb)

        if results.multi_hand_landmarks:
            for hand_landmarks in results.multi_hand_landmarks:
                kp = []
                for lm in hand_landmarks.landmark:
                    kp.extend([lm.x, lm.y, lm.z])

                # 모델 입력 형상(126)에 맞춤 (한 손만 감지된 경우 나머지는 0)
                final_kp = np.zeros(126)
                # 추출된 키포인트가 126개보다 적으면 채우고, 많으면 자름
                length = min(len(kp), 126)
                final_kp[:length] = kp[:length]
                all_hand_features.append(final_kp)

    cap.release()
    return np.array(all_hand_features)

# ---------------------------------------------------------
# 4. 전체 실행 로직 (Main Loop)
# ---------------------------------------------------------

# 분석할 영상 폴더 경로
TARGET_FOLDER = "/content/drive/MyDrive/elderlycare_violence/enhanced_video"
video_files = glob.glob(os.path.join(TARGET_FOLDER, "*.mp4"))

print(f"[INFO] Target Folder: {TARGET_FOLDER}")
print(f"[INFO] Total Videos: {len(video_files)}")

results = []

# 영상 목록 순회
for video_path in tqdm(video_files, desc="Processing Videos"):
    video_name = os.path.basename(video_path)

    # ---------------------------
    # [Stage 1] 전신 폭행 탐지
    # ---------------------------
    skeleton_data = extract_skeleton_stage1(video_path, yolo_model, pose_detector)

    if skeleton_data is None:
        print(f"[ERROR] Could not read video: {video_name}")
        continue

    # Tensor 변환 (Batch=1, Frames=32, Joints=33, Channels=4)
    input_tensor = torch.tensor(skeleton_data, dtype=torch.float32).unsqueeze(0).to(device)

    with torch.no_grad():
        logits = model_stage1(input_tensor)
        probs = torch.sigmoid(logits)
        stage1_score = probs.item()

    # 결과 변수 초기화
    final_label = "Normal"
    final_score = 0.0
    detection_stage = "Stage 1"

    # Stage 1 판단 (0.5 초과 시 폭행)
    if stage1_score > 0.5:
        final_label = "Violence (General)"
        final_score = stage1_score
        detection_stage = "Stage 1"
    else:
        # ---------------------------
        # [Stage 2] 미세 폭행(꼬집기) 탐지
        # (Stage 1이 Normal일 때만 수행)
        # ---------------------------
        hand_features = extract_hand_features_stage2(video_path, hands_detector)

        if len(hand_features) > 0:
            # 추출된 손 데이터들에 대해 예측 (Batch Prediction)
            pinch_probs = model_stage2.predict(hand_features, verbose=0)

            # 영상 내에서 가장 높은 꼬집기 확률을 해당 영상의 점수로 사용 (Max Pooling)
            max_pinch_score = np.max(pinch_probs)

            if max_pinch_score > 0.5:
                final_label = "Violence (Pinching)"
                final_score = max_pinch_score
                detection_stage = "Stage 2"
            else:
                final_label = "Normal"
                final_score = 1.0 - max_pinch_score
                detection_stage = "Stage 2"
        else:
            # 손이 감지되지 않음 -> Stage 1의 Normal 결과 유지
            final_label = "Normal"
            final_score = 1.0 - stage1_score
            detection_stage = "Stage 1 (No Hands)"

    # 결과 리스트에 추가
    results.append({
        "video_name": video_name,
        "label": final_label,
        "score": round(final_score, 4),
        "stage": detection_stage
    })

    # 콘솔에 간단히 출력
    print(f" > {video_name}: {final_label} ({detection_stage}, Score: {final_score:.4f})")

# ---------------------------------------------------------
# 5. 결과 저장
# ---------------------------------------------------------
if results:
    df = pd.DataFrame(results)
    save_path = os.path.join(TARGET_FOLDER, "final_analysis_results.csv")
    df.to_csv(save_path, index=False)
    print("------------------------------------------------")
    print(f"[INFO] Analysis Complete. Results saved to: {save_path}")
    print("------------------------------------------------")
    print(df)
else:
    print("[WARN] No results to save.")

[INFO] Target Folder: /content/drive/MyDrive/elderlycare_violence/enhanced_video
[INFO] Total Videos: 3


Processing Videos:  33%|███▎      | 1/3 [00:30<01:01, 30.83s/it]

 > MOVI0006_enhanced.mp4: Normal (Stage 1 (No Hands), Score: 0.9856)


Processing Videos:  67%|██████▋   | 2/3 [01:45<00:56, 56.88s/it]

 > MOVI0005_enhanced.mp4: Violence (Pinching) (Stage 2, Score: 0.8561)


Processing Videos: 100%|██████████| 3/3 [03:28<00:00, 69.60s/it]

 > MOVI0004_enhanced.mp4: Violence (Pinching) (Stage 2, Score: 0.9859)
------------------------------------------------
[INFO] Analysis Complete. Results saved to: /content/drive/MyDrive/elderlycare_violence/enhanced_video/final_analysis_results.csv
------------------------------------------------
              video_name                label   score               stage
0  MOVI0006_enhanced.mp4               Normal  0.9856  Stage 1 (No Hands)
1  MOVI0005_enhanced.mp4  Violence (Pinching)  0.8561             Stage 2
2  MOVI0004_enhanced.mp4  Violence (Pinching)  0.9859             Stage 2



