In [21]:
import os
import gc
import sys
import time
sys.path.append(os.path.abspath(os.path.join(os.getcwd(), "../src")))

In [2]:
import cv2
import argparse
import traceback
import numpy as np
import pandas as pd
from pose_estimator import PoseEstimator3D
from kalman_filter import KalmanFilterTracker3D
from skeleton_visualizer import SkeletonVisualizer
from kalman_filter import KeypointPreprosess
from com_calculator import COMCalculator

2025-03-14 08:17:24.662524: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:477] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
E0000 00:00:1741940244.683026    1937 cuda_dnn.cc:8310] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
E0000 00:00:1741940244.690589    1937 cuda_blas.cc:1418] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
2025-03-14 08:17:24.725403: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: SSE4.1 SSE4.2 AVX AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [3]:
import glob
import time
import torch
import torch.nn as nn
import torch.optim as optim
from torch.optim import Adam
import matplotlib.pyplot as plt
import torch.nn.functional as F
from IPython.display import clear_output
from torch.utils.data import random_split
from torch.utils.data import Dataset, DataLoader
from torch.cuda import memory_allocated, empty_cache

In [4]:
if torch.cuda.is_available() == True:
    device = 'cuda:0'
    print('GPU available')
else:
    device = 'cpu'
    print('GPU not available')

GPU available


In [5]:
class Deep_LSTM(nn.Module):
    def __init__(self):
        super(Deep_LSTM, self).__init__()
        self.lstm1 = nn.LSTM(input_size=69, hidden_size=128, num_layers=1, batch_first=True)
        self.lstm2 = nn.LSTM(input_size=128, hidden_size=256, num_layers=1, batch_first=True)
        self.lstm3 = nn.LSTM(input_size=256, hidden_size=512, num_layers=1, batch_first=True)
        self.dropout1 = nn.Dropout(0.3)
        self.lstm4 = nn.LSTM(input_size=512, hidden_size=256, num_layers=1, batch_first=True)
        self.lstm5 = nn.LSTM(input_size=256, hidden_size=128, num_layers=1, batch_first=True)
        self.lstm6 = nn.LSTM(input_size=128, hidden_size=64, num_layers=1, batch_first=True)
        self.dropout2 = nn.Dropout(0.3)
        self.lstm7 = nn.LSTM(input_size=64, hidden_size=32, num_layers=1, batch_first=True)
        self.fc = nn.Linear(32,4)

    def forward(self, x) :
        x, _ = self.lstm1(x)
        x, _ = self.lstm2(x)
        x, _ = self.lstm3(x)
        x = self.dropout1(x)
        x, _ = self.lstm4(x)
        x, _ = self.lstm5(x)
        x, _ = self.lstm6(x)
        x = self.dropout2(x)
        x, _ = self.lstm7(x)
        x = self.fc(x[:,-1,:])
        return x

In [6]:
net = Deep_LSTM()
net.to(device)
net.load_state_dict(torch.load('model.pth'))
net.eval()

Deep_LSTM(
  (lstm1): LSTM(69, 128, batch_first=True)
  (lstm2): LSTM(128, 256, batch_first=True)
  (lstm3): LSTM(256, 512, batch_first=True)
  (dropout1): Dropout(p=0.3, inplace=False)
  (lstm4): LSTM(512, 256, batch_first=True)
  (lstm5): LSTM(256, 128, batch_first=True)
  (lstm6): LSTM(128, 64, batch_first=True)
  (dropout2): Dropout(p=0.3, inplace=False)
  (lstm7): LSTM(64, 32, batch_first=True)
  (fc): Linear(in_features=32, out_features=4, bias=True)
)

In [10]:
class InferenceVideo:
    def __init__(self, roi_ratio=0.8, video_path=None):
        self.pose_estimator = PoseEstimator3D(roi_ratio=roi_ratio)
        self.kalman_tracker = KalmanFilterTracker3D()
        self.skeleton_visualizer = SkeletonVisualizer()
        self.com_calculator = COMCalculator()
        self.keypoint_preprosessing = KeypointPreprosess(True)
        
        self.video_path = video_path
        self.com_trajectory = []
        self.max_trajectory_length = 60
        
    def process_frame(self, frame):
        """
        프레임 처리 및 3D 포즈 추적
        Args:
            frame (numpy.ndarray): 입력 프레임
        Returns:
            numpy.ndarray: 추적 결과가 표시된 프레임
        """
        try:
            landmarks, processed_frame = self.pose_estimator.estimate_pose(frame)
            
            if landmarks:
                # 3D 키포인트 추출
                keypoints_3d = self.pose_estimator.extract_3d_keypoints(landmarks, frame)
                
                # 칼만 필터로 키포인트 필터링
                filtered_keypoints_3d = self.kalman_tracker.track(keypoints_3d)
                
                # 3D CoM 계산
                com_3d = self.com_calculator.calculate_whole_body_com(filtered_keypoints_3d, include_z=True)
                
                if com_3d:
                    # CoM 궤적 업데이트
                    self.com_trajectory.append(com_3d)
                    if len(self.com_trajectory) > self.max_trajectory_length:
                        self.com_trajectory.pop(0)
                        
                    # 현재 CoM 표시
                    cv2.circle(processed_frame, (int(com_3d[0]), int(com_3d[1])), 8, (0, 255, 255), -1)
                    cv2.putText(processed_frame, f"CoM z:{int(com_3d[2])}",
                                (int(com_3d[0]) + 10, int(com_3d[1]) - 10),
                                cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 255), 1)
                    
                    # CoM 궤적 그리기
                    for i in range(1, len(self.com_trajectory)):
                        prev_com = self.com_trajectory[i-1]
                        curr_com = self.com_trajectory[i]
                        
                        # 색상 그라데이션 (오래된 포인트일수록 더 투명하게)
                        alpha = i / len(self.com_trajectory)
                        
                        # Z 값에 따른 색상 변화 (깊이 시각화)
                        z_val = curr_com[2]
                        z_color = (
                            0,
                            int(255 * alpha),
                            int(255 * (1.0 - abs(z_val) / 100) * alpha)  # Z값에 따른 색상 변화
                        )
                        
                        cv2.line(processed_frame,
                                 (int(prev_com[0]), int(prev_com[1])),
                                 (int(curr_com[0]), int(curr_com[1])),
                                 z_color, 2)
                        
                    # 3D 이동 방향 계산 및 시각화
                    if len(self.com_trajectory) >= 5:
                        direction, speed = self.com_calculator.calculate_movement_direction(
                            self.com_trajectory, window_size=5)
                        
                        # 방향 시각화
                        processed_frame = self.skeleton_visualizer.draw_direction_arrow(
                            processed_frame, com_3d, direction, speed)
                        
                # 3D 스켈레톤 시각화
                processed_frame = self.skeleton_visualizer.draw_3d_skeleton(
                    processed_frame, keypoints_3d, filtered_keypoints_3d)
                
                keypoints_and_com = filtered_keypoints_3d[11:]
                keypoints_and_com.append(com_3d)
                
                keypoints_and_com = self.keypoint_preprosessing.prosess(keypoints_and_com)
                sequence_data = [x for y in keypoints_and_com for x in y]
                
            return sequence_data, processed_frame
            
        except Exception as e:
            print(f"Error processing frame: {e}")
            traceback.print_exc()
            return frame
            
    def run(self):
        """
        비디오 캡처 및 처리
        
        MP4 파일을 프레임 수에 따라 라벨링 후 병합
        """
        # 비디오 파일을 사용하여 캡처
        if not self.video_path:
            print("Error: No video source provided.")
            return
            
        cap = cv2.VideoCapture(self.video_path)
        sequence_tensor, current_frame = [[] , 20]
        
        if not cap.isOpened():
            print("Error: Could not open video source.")
            return
            
        width  = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
        height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
        fps    = int(cap.get(cv2.CAP_PROP_FPS))
        fourcc = cv2.VideoWriter_fourcc(*'mp4v')
        
        out_video_name = self.video_path.rsplit('.')[0] + "_out.mp4"
        out = cv2.VideoWriter(out_video_name, fourcc, fps, (width, height))
        
        while True:
            ret, frame = cap.read()
            
            if not ret:
                print("End of video or failed to capture frame")
                break

            if len(sequence_tensor) < current_frame:
                sequence_vector, visual_frame = self.process_frame(frame)
                sequence_tensor.append(sequence_vector)
            else :
                sequence_tensor = sequence_tensor[1:]
                sequence_vector, visual_frame = self.process_frame(frame)
                sequence_tensor.append(sequence_vector)
            
            if len(sequence_tensor) == current_frame:
                data = torch.tensor(sequence_tensor, dtype=torch.float32).unsqueeze(0)
                data = data.to(device)
                with torch.no_grad():
                    result = net(data)
                    _, outs = torch.max(result, 1)
                    if outs.item() == 0 : status = 'Stand'
                    elif outs.item() == 1 : status = 'Start Walking'
                    elif outs.item() == 2 : status = 'During Walking'
                    else : status = 'Finish Walking'
            else :
                status = 'None'
            
            cv2.putText(visual_frame, status, (0, 50), cv2.FONT_HERSHEY_COMPLEX, 1.5, (0, 0, 255), 2)

            if isinstance(visual_frame, torch.Tensor):
                visual_frame = visual_frame.detach().cpu().numpy()
                if visual_frame.ndim == 3:  # (C, H, W)
                    visual_frame = np.transpose(visual_frame, (1, 2, 0))
                visual_frame = (visual_frame * 255).astype(np.uint8)
            
            out.write(visual_frame)
            
        cap.release()
        out.release()
        cv2.destroyAllWindows()

In [24]:
inference = InferenceVideo(roi_ratio=1, video_path='testtesttest.mp4')

W0000 00:00:1741942428.990175    2470 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors.
W0000 00:00:1741942429.076170    2470 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors.


In [25]:
inference.run()

End of video or failed to capture frame
