In [1]:
import cv2
import mediapipe as mp
import numpy as np
import os
import random  
import gc
import time
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import matplotlib.pyplot as plt

from IPython.display import Image, clear_output
from torch.cuda import memory_allocated, empty_cache
from torch.optim import Adam
from torch.utils.data import random_split
from torch.utils.data import Dataset, DataLoader
from glob import glob
from tqdm import tqdm

In [2]:
# torch_ver Yolov5
yolo_model = torch.hub.load('ultralytics/yolov5', 'yolov5s',
                            device='cuda:0' if torch.cuda.is_available() else 'cpu')  # 예측 모델
yolo_model.classes = [0]  # 예측 클래스 (0 : 사람)

Using cache found in /Users/kimsungwook/.cache/torch/hub/ultralytics_yolov5_master
YOLOv5 🚀 2024-4-17 Python-3.11.7 torch-2.2.2 CPU

Fusing layers... 
YOLOv5s summary: 213 layers, 7225885 parameters, 0 gradients, 16.4 GFLOPs
Adding AutoShape... 


In [3]:
# LSTM
class skeleton_LSTM(nn.Module):
    def __init__(self):
        super(skeleton_LSTM, self).__init__()
        self.lstm1 = nn.LSTM(input_size=len(attention_dot) * 2, hidden_size=128, num_layers=NUM_LAYERS, batch_first=True)
        self.lstm2 = nn.LSTM(input_size=128, hidden_size=256, num_layers=NUM_LAYERS, batch_first=True)
        self.lstm3 = nn.LSTM(input_size=256, hidden_size=512, num_layers=NUM_LAYERS, batch_first=True)
        self.dropout1 = nn.Dropout(0.1)
        self.lstm4 = nn.LSTM(input_size=512, hidden_size=256, num_layers=NUM_LAYERS, batch_first=True)
        self.lstm5 = nn.LSTM(input_size=256, hidden_size=128, num_layers=NUM_LAYERS, batch_first=True)
        self.lstm6 = nn.LSTM(input_size=128, hidden_size=64, num_layers=NUM_LAYERS, batch_first=True)
        self.dropout2 = nn.Dropout(0.1)
        self.lstm7 = nn.LSTM(input_size=64, hidden_size=32, num_layers=NUM_LAYERS, batch_first=True)
        self.fc = nn.Linear(32,2)

    def forward(self, x) :
        x, _ = self.lstm1(x)
        x, _ = self.lstm2(x)
        x, _ = self.lstm3(x)
        x = self.dropout1(x)
        x, _ = self.lstm4(x)
        x, _ = self.lstm5(x)
        x, _ = self.lstm6(x)
        x = self.dropout2(x)
        x, _ = self.lstm7(x)
        x = self.fc(x[:,-1,:]) # x[배치 크기, 시퀀스 길이, 은닉 상태 크기], [:, -1, :] -> 마지막 시간 단계만 선택

        return x

In [4]:
BATCH_SIZE = 6
EPOCH = 300
NUM_LAYERS = 3      # LSTM model: num_layers
start_dot = 11      # mp.solutions.pose 시작 포인트 (0: 얼굴부터 발목까지, 11: 어깨부터 발목까지)
n_CONFIDENCE = 0.3    # MediaPipe Min Detectin confidence check
y_CONFIDENCE = 0.3    # Yolv5 Min Detectin confidence check

mp_pose = mp.solutions.pose
attention_dot = [n for n in range(start_dot, 29)]

# 라인 그리기
if start_dot == 11:
    """몸 부분만"""
    draw_line = [[11, 13], [13, 15], [15, 21], [15, 19], [15, 17], [17, 19], \
                [12, 14], [14, 16], [16, 22], [16, 20], [16, 18], [18, 20], \
                [23, 25], [25, 27], [24, 26], [26, 28], [11, 12], [11, 23], \
                [23, 24], [12, 24]]
    print('Pose : Only Body')

else:
    """얼굴 포함"""
    draw_line = [[11, 13], [13, 15], [15, 21], [15, 19], [15, 17], [17, 19], \
                [12, 14], [14, 16], [16, 22], [16, 20], [16, 18], [18, 20], \
                [23, 25], [25, 27], [24, 26], [26, 28], [11, 12], [11, 23], \
                [23, 24], [12, 24], [9, 10], [0, 5], [0, 2], [5, 8], [2, 7]]
    print('Pose : Face + Body')

Pose : Only Body


In [5]:
model = skeleton_LSTM()

In [6]:
model.load_state_dict(torch.load('LSTM1.pt'))

<All keys matched successfully>

In [7]:
# Yolov4 바운딩 box 안에서 media pipe 데이터 전처리 함수

def get_skeleton(video_path, attention_dot, draw_line):
    frame_length = 30 # LSTM 모델에 넣을 frame 수

    xy_list_list, xy_list_list_flip = [], []
    cv2.destroyAllWindows()
    pose = mp_pose.Pose(static_image_mode = True, model_complexity = 1, \
                        enable_segmentation = False, min_detection_confidence = n_CONFIDENCE)
    cap = cv2.VideoCapture(video_path)

    if cap.isOpened():

        while True:
            ret, img = cap.read()

            if ret == True:

                """ Yolo 바운딩 박스 및 좌표 추출"""
                img = cv2.resize(img, (640, 640))
                res = yolo_model(img)
                res_refine = res.pandas().xyxy[0].values
                nms_human = len(res_refine)
                if nms_human > 0:
                    for bbox in res_refine:
                        """바운딩 박스 상하좌우 크기 조절"""
                        xx1, yy1, xx2, yy2 = int(bbox[0])-10, int(bbox[1]), int(bbox[2])+10, int(bbox[3])
                        if xx1 < 0:
                            xx1 = 0
                        elif xx2 > 639:
                            xx2 = 639
                        if yy1 < 0:
                            yy1 = 0
                        elif yy2 > 639:
                            yy2 = 639

                        start_point = (xx1, yy1)
                        end_point = (xx2, yy2)

                        """ Yolov5 바운딩 박스 좌표 안에서 mediapipe Pose 추출"""
                        if bbox[4] > y_CONFIDENCE: # bbox[4] : confidence 데이터
                            # img = cv2.rectangle(img, start_point, end_point, (0, 0, 255), 2) # 바운딩 박스 그리기 : 데이터 추출 확인용
                            c_img = img[yy1:yy2, xx1:xx2] # 바운딩 박스 좌표
                            results = pose.process(cv2.cvtColor(c_img, cv2.COLOR_BGR2RGB)) # Yolov5 바운딩 박스 좌표 안에서 'mp_pose' 좌표

                            if not results.pose_landmarks: continue
                            idx = 0
                            draw_line_dic = {}
                            xy_list, xy_list_flip = [], []
                            # 33 반복문 진행 : 33개 중 18개의 dot
                            for x_and_y in results.pose_landmarks.landmark:
                                if idx in attention_dot:
                                    xy_list.append(x_and_y.x)
                                    xy_list.append(x_and_y.y)
                                    xy_list_flip.append(1 - x_and_y.x)
                                    xy_list_flip.append(x_and_y.y)

                                    x, y = int(x_and_y.x*(xx2-xx1)), int(x_and_y.y*(yy2-yy1))
                                    draw_line_dic[idx] = [x, y]
                                idx += 1

                            if len(xy_list) != len(attention_dot) * 2:
                                print('Error : attention_dot 데이터 오류')

                            xy_list_list.append(xy_list)
                            xy_list_list_flip.append(xy_list_flip)

                            """mediapipe line 그리기 부분 : 데이터 추출(dot) 확인용"""
                            # for line in draw_line:
                            #     x1, y1 = draw_line_dic[line[0]][0], draw_line_dic[line[0]][1]
                            #     x2, y2 = draw_line_dic[line[1]][0], draw_line_dic[line[1]][1]
                            #     c_img = cv2.line(c_img, (x1, y1), (x2, y2), (0, 255, 0), 4)
                            # # cv2.imshow('Landmark Image', img)
                            # cv2_imshow(img)
                            # cv2.waitKey(1)

            elif ret == False: break


        # 부족한 프레임 수 맞추기
        if len(xy_list_list_flip) < 15:
            return False, False
        elif len(xy_list_list_flip) < frame_length:
            f_ln = frame_length - len(xy_list_list_flip)
            for _ in range(f_ln):
                xy_list_list.append(xy_list_list[-1])
                xy_list_list_flip.append(xy_list_list_flip[-1])

    cap.release()
    cv2.destroyAllWindows()


    return xy_list_list, xy_list_list_flip

In [8]:
# 영상 resize 및 추출
#test_video_name = 'C_3_12_43_BU_SMC_10-14_12-17-14_CC_RGB_DF2_F2'
test_video_path = f'01_pocket_30.mp4'
cv2.destroyAllWindows()
cap = cv2.VideoCapture(test_video_path)
img_list = []

if cap.isOpened():

    while True:
        ret, img = cap.read()
        if ret:
            img = cv2.resize(img, (640, 640))
            img_list.append(img)
            # cv2_imshow(img)
            # cv2.waitKey(1)
        else:
            break

cap.release()
cv2.destroyAllWindows()

print('저장된 frame의 개수: {}'.format(len(img_list)))

저장된 frame의 개수: 150


In [9]:
class MyDataset(Dataset):
    def __init__(self, seq_list):
        self.X = []
        self.y = []
        for dic in seq_list :
            self.y.append(dic['key'])
            self.X.append(dic['value'])

    def __getitem__(self, index):
        data = self.X[index]
        label = self.y[index]
        return torch.Tensor(np.array(data)), torch.tensor(np.array(int(label)))

    def __len__(self):
        return len(self.X)

In [10]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
device

device(type='cpu')

In [11]:
"""Yolov5 + Mediapipe Version"""

model.eval()

length = 30 # frame 상태를 표시할 길이
out_img_list = []
dataset = []
status = 'None'
pose = mp_pose.Pose(static_image_mode=True, model_complexity=1, enable_segmentation=False, min_detection_confidence=n_CONFIDENCE)
print('시퀀스 데이터 분석 중...')

xy_list_list = []
for img in tqdm(img_list):
    res = yolo_model(img)
    res_refine = res.pandas().xyxy[0].values

    nms_human = len(res_refine)
    if nms_human > 0:
        for bbox in res_refine:
            xx1, yy1, xx2, yy2 = int(bbox[0])-10, int(bbox[1]), int(bbox[2])+10, int(bbox[3])
            if xx1 < 0:
                xx1 = 0
            elif xx2 > 639:
                xx2 = 639
            if yy1 < 0:
                yy1 = 0
            elif yy2 > 639:
                yy2 = 639

            start_point = (xx1, yy1)
            end_point = (xx2, yy2)
            if bbox[4] > y_CONFIDENCE:
                img = cv2.rectangle(img, start_point, end_point, (0, 0, 255), 2)

                c_img = img[yy1:yy2, xx1:xx2]
                results = pose.process(cv2.cvtColor(c_img, cv2.COLOR_BGR2RGB)) # Yolo 바운딩 box 안에서 landmark dot 추출
                if not results.pose_landmarks: continue
                xy_list = []
                idx = 0
                draw_line_dic = {}
                for x_and_y in results.pose_landmarks.landmark:
                    if idx in attention_dot:
                        xy_list.append(x_and_y.x)
                        xy_list.append(x_and_y.y)
                        x, y = int(x_and_y.x*(xx2-xx1)), int(x_and_y.y*(yy2-yy1))
                        draw_line_dic[idx] = [x, y]
                    idx += 1

                xy_list_list.append(xy_list)
                for line in draw_line:
                    x1, y1 = draw_line_dic[line[0]][0], draw_line_dic[line[0]][1]
                    x2, y2 = draw_line_dic[line[1]][0], draw_line_dic[line[1]][1]
                    c_img = cv2.line(c_img, (x1, y1), (x2, y2), (0, 255, 0), 3)

                if len(xy_list_list) == length:
                    dataset = []
                    dataset.append({'key' : 0, 'value' : xy_list_list})
                    dataset = MyDataset(dataset)
                    dataset = DataLoader(dataset)
                    xy_list_list = []

                    for data, label in dataset:
                        data = data.to(device)
                        with torch.no_grad():
                            result = model(data)
                            _, out = torch.max(result, 1)
                            if out.item() == 0: status = 'Normal'
                            else: status = 'Theft'

    cv2.putText(img, status, (0, 50), cv2.FONT_HERSHEY_COMPLEX, 1.5, (0, 0, 255), 2)
    out_img_list.append(img)

I0000 00:00:1713832264.506778 1408120 gl_context.cc:357] GL version: 2.1 (2.1 Metal - 88), renderer: Apple M2
INFO: Created TensorFlow Lite XNNPACK delegate for CPU.


시퀀스 데이터 분석 중...


100%|████████████████████████████████████████████████████████████████| 150/150 [00:17<00:00,  8.67it/s]


In [12]:
# 테스트 원본 영상 내보내기
filename = './output.mp4'
fourcc = cv2.VideoWriter_fourcc(*'DIVX')
fps = 3
frameSize = (640, 640)
isColor = True
out = cv2.VideoWriter(filename, fourcc, fps, frameSize, isColor)
for out_img in out_img_list:
    out.write(out_img)
out.release()

OpenCV: FFMPEG: tag 0x58564944/'DIVX' is not supported with codec id 12 and format 'mp4 / MP4 (MPEG-4 Part 14)'
OpenCV: FFMPEG: fallback to use tag 0x7634706d/'mp4v'
