In [1]:
import sys
old_stdout = sys.stdout
sys.stdout = open('log2.txt', 'w')

In [2]:
import cv2
import mediapipe as mp
import numpy as np
import tensorflow as tf
import pandas as pd
import os

from tensorflow import keras
from tensorflow.keras import regularizers
from tensorflow.keras.layers import LSTM

In [3]:
# GPU 장치 목록 가져오기
gpu_devices = tf.config.experimental.list_physical_devices('GPU')
if gpu_devices:
    print('Using GPU')
    # GPU의 가상 메모리 제한을 6GB로 설정
    tf.config.experimental.set_virtual_device_configuration(
        gpu_devices[0],
        [tf.config.experimental.VirtualDeviceConfiguration(memory_limit=1024 * 6)]
    )
    # set_memory_growth는 set_virtual_device_configuration과 함께 사용할 수 없습니다
else:
    print('Using CPU')

In [4]:
train_dir = 'data/train'
test_dir = 'data/test'
# 비디오 파일 목록과 태그를 포함하는 리스트를 만드는 함수
def create_data_list(data_dir):
    data_list = []
    # data_dir 안의 각 디렉토리에 대해 반복
    for item in os.listdir(data_dir):
        item_path = os.path.join(data_dir, item)  # 아이템의 전체 경로
        # 해당 경로가 디렉토리인지 확인
        if os.path.isdir(item_path):
            # 디렉토리 내의 모든 파일을 나열
            for file_name in os.listdir(item_path):
                # 파일이 .mp4 파일인지 확인
                if file_name.endswith('.mp4'):
                    # 리스트에 태그와 파일 경로를 추가
                    data_list.append((item, str(data_dir+'/'+item)+'/'+file_name))
    return data_list

# 함수를 사용해서 리스트를 생성
train_list = create_data_list(train_dir)
test_list = create_data_list(test_dir)
# 리스트에서 데이터프레임을 생성
train_df = pd.DataFrame(data=train_list, columns=['tag', 'video_name'])
test_df = pd.DataFrame(data=test_list, columns=['tag', 'video_name'])
# 필요한 경우 열 순서를 수정
train_df = train_df.loc[:, ['tag', 'video_name']]
test_df = test_df.loc[:, ['tag', 'video_name']]
# 데이터프레임을 CSV 파일로 저장
train_file_path = 'train.csv'
test_file_path = 'test.csv'
train_df.to_csv(train_file_path, encoding='utf-8-sig', index=False)
test_df.to_csv(test_file_path, encoding='utf-8-sig', index=False)

In [5]:
train_df = pd.read_csv("train.csv")
test_df = pd.read_csv("test.csv")
print(f"Total video for training: {len(train_df)}")
print(f"Total video for testing: {len(test_df)}")

In [6]:
BATCH_SIZE = 16
EPOCHS = 30

MAX_SEQ_LENGTH = 10

In [7]:
# 라벨링
label_processor = keras.layers.StringLookup(num_oov_indices=0, vocabulary=np.unique(train_df["tag"]))
labels = train_df["tag"].values
labels = label_processor(labels[..., None]).numpy()

In [8]:
# 주어진 이미지에서 중앙에 맞춰 정사각형으로 잘라내는 함수
def crop_center_square(frame):
    # 이미지의 높이(y)와 너비(x)를 가져옴
    y, x = frame.shape[0:2]
    # 이미지의 높이와 너비 중 더 작은 값을 선택하여 정사각형의 크기를 결정
    min_dim = min(y, x)
    # 정사각형을 이미지 중앙에 위치시키기 위해 시작점의 x좌표와 y좌표를 계산
    start_x = (x // 2) - (min_dim // 2)
    start_y = (y // 2) - (min_dim // 2)
    # 계산된 시작점과 정사각형의 크기를 이용하여 이미지의 중앙 부분 잘라내기
    return frame[start_y : start_y + min_dim, start_x : start_x + min_dim]

In [9]:
# 각도 계산
def calculate_angle(a, b, c):
    a = np.array([a.x, a.y])  # 첫 번째 점
    b = np.array([b.x, b.y])  # 중간 점
    c = np.array([c.x, c.y])  # 세 번째 점
    radians = np.arctan2(c[1] - b[1], c[0] - b[0]) - np.arctan2(a[1] - b[1], a[0] - b[0])
    angle = np.abs(radians * 180.0 / np.pi)
    if angle > 180.0:
        angle = 360 - angle
    return angle

In [10]:
# 손가락의 각도를 계산하는 함수
def calculate_finger_angles(hand_landmarks, mp_hands):
    angles = []
    for finger in [mp_hands.HandLandmark.THUMB_CMC, mp_hands.HandLandmark.INDEX_FINGER_MCP, mp_hands.HandLandmark.MIDDLE_FINGER_MCP,
                   mp_hands.HandLandmark.RING_FINGER_MCP, mp_hands.HandLandmark.PINKY_MCP]:
        if finger == mp_hands.HandLandmark.THUMB_CMC:
            # 엄지는 다른 손가락과 구조가 다름
            base = np.array([hand_landmarks.landmark[finger].x, hand_landmarks.landmark[finger].y])
            tip = np.array([hand_landmarks.landmark[finger + 2].x, hand_landmarks.landmark[finger + 2].y])
        else:
            base = np.array([hand_landmarks.landmark[finger].x, hand_landmarks.landmark[finger].y])
            tip = np.array([hand_landmarks.landmark[finger + 3].x, hand_landmarks.landmark[finger + 3].y])
        angle = np.arccos(np.dot(base, tip) / (np.linalg.norm(base) * np.linalg.norm(tip)))
        angles.append(np.degrees(angle))
    return angles

In [11]:
# 팔 각도 계산 함수
def calculate_arm_angle(pose_landmarks, mp_pose, side):
    if side == 'Right':
        shoulder = pose_landmarks.landmark[mp_pose.PoseLandmark.RIGHT_SHOULDER]
        elbow = pose_landmarks.landmark[mp_pose.PoseLandmark.RIGHT_ELBOW]
        wrist = pose_landmarks.landmark[mp_pose.PoseLandmark.RIGHT_WRIST]
    else:
        shoulder = pose_landmarks.landmark[mp_pose.PoseLandmark.LEFT_SHOULDER]
        elbow = pose_landmarks.landmark[mp_pose.PoseLandmark.LEFT_ELBOW]
        wrist = pose_landmarks.landmark[mp_pose.PoseLandmark.LEFT_WRIST]

    return calculate_angle([shoulder.x, shoulder.y], [elbow.x, elbow.y], [wrist.x, wrist.y])

In [12]:
# 비디오 파일을 로드하고, 각 프레임을 처리하여 배열로 반환하는 함수
def load_video(path, max_frames=0):
    mp_hands = mp.solutions.hands
    mp_pose = mp.solutions.pose
    
    pose = mp_pose.Pose(static_image_mode=False, model_complexity=2, min_detection_confidence=0.3, smooth_landmarks=True)
    hands = mp_hands.Hands(static_image_mode=False, max_num_hands=2, model_complexity=2, min_detection_confidence=0.3)
    
    cap = cv2.VideoCapture(path)
    
    finger_angles = []  # 손가락 각도 데이터
    arm_angles = []  # 팔 각도 데이터

    try:
        while True:            
            ret, frame = cap.read() # 비디오에서 프레임을 하나씩 읽기            
            if not ret:
                break # 읽을 프레임이 없으면 반복문을 종료

            frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            hands_results = hands.process(frame_rgb)
            pose_results = pose.process(frame_rgb)
           
            if pose_results.pose_landmarks:
                # 팔 각도 계산
                right_arm_angle = calculate_arm_angle(pose_results.pose_landmarks, mp_pose, 'Right')
                left_arm_angle = calculate_arm_angle(pose_results.pose_landmarks, mp_pose, 'Left')
                arm_angles.append((right_arm_angle, left_arm_angle))

            if hands_results.multi_hand_landmarks:
                for hand_landmarks in hands_results.multi_hand_landmarks:
                    angles = calculate_finger_angles(hand_landmarks, mp_hands)
                    finger_angles.append(angles)

            if len(finger_angles) == max_frames:
                break
    finally:
        cap.release()
        pose.close()
        hands.close()

    return np.array(arm_angles), np.array(finger_angles)

In [13]:
def preprocess_arm_angle_data(arm_angles):
    # 팔 각도 데이터가 없는 경우 빈 벡터 반환
    if not arm_angles:
        return np.zeros(2)  # 오른팔, 왼팔 각각의 각도
    return np.array(arm_angles)

def preprocess_hand_data(hand_landmarks, mp_hands):
    # 손 랜드마크 데이터가 없는 경우 빈 벡터 반환
    if not hand_landmarks or len(hand_landmarks) < 2:
        return np.zeros(10)
    
    hand_angles = []

    for hand_lm in hand_landmarks[:2]:  # 첫 번째와 두 번째 손만 처리
        angles = calculate_finger_angles(hand_lm, mp_hands)
        hand_angles.extend(angles)

    # 부족한 부분을 0으로 채우기
    hand_angles = np.pad(hand_angles, (0, max(0, 10 - len(hand_angles))))

    return np.array(hand_angles)

In [14]:
def prepare_all_video(df):
    mp_hands = mp.solutions.hands
    num_samples = len(df)
    video_paths = df["video_name"].values.tolist()

    # 팔 각도 데이터와 손가락 각도 데이터 저장할 배열 초기화
    frame_arm_angles = np.zeros(shape=(num_samples, MAX_SEQ_LENGTH, 4), dtype="float16")  # 왼팔, 오른팔 각도
    frame_hand_angles = np.zeros(shape=(num_samples, MAX_SEQ_LENGTH, 10), dtype="float16")  # 손가락 각도
    
    frame_masks = np.zeros(shape=(num_samples, MAX_SEQ_LENGTH), dtype="bool")

    for idx, path in enumerate(video_paths):
        # load_video 함수 수정 필요 (이미지와 스켈레톤 데이터 제외하고, 팔 각도와 손가락 각도만 반환)
        arm_angles, hand_landmarks = load_video(path)
        video_length = min(MAX_SEQ_LENGTH, len(arm_angles))

        for i in range(video_length):
            # 팔 각도 데이터 전처리
            arm_angle_feature = preprocess_arm_angle_data(arm_angles[i])
            frame_arm_angles[idx, i, :] = arm_angle_feature

            # 손가락 각도 데이터 전처리
            hand_angle_feature = preprocess_hand_data(hand_landmarks[i], mp_hands)
            frame_hand_angles[idx, i, :] = hand_angle_feature

            frame_masks[idx, i] = 1
            
    # 반환 값에 팔 각도 및 손가락 각도 데이터 포함
    return (frame_arm_angles, frame_hand_angles, frame_masks), labels

In [15]:
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'  # INFO 메시지를 숨긴다.
tf.get_logger().setLevel('ERROR')         # TensorFlow 로그를 ERROR 레벨로 설정한다.

In [None]:
train_data, train_labels = prepare_all_video(train_df)
test_data, test_labels = prepare_all_video(test_df)
train_labels = np.squeeze(train_labels)
test_labels = np.squeeze(test_labels)

sys.stdout = old_stdout

In [None]:
def get_sequence_model():
    class_vocab = label_processor.get_vocabulary()
    
    # 팔 각도 및 손가락 각도 데이터에 대한 입력
    arm_angle_input = keras.Input((MAX_SEQ_LENGTH, 4))  # 왼팔, 오른팔 각도 (각 2개의 값)
    hand_input = keras.Input((MAX_SEQ_LENGTH, 10))  # 손가락 각도
    mask_input = keras.Input((MAX_SEQ_LENGTH,), dtype="bool")
    
    # LSTM 레이어 처리
    x_arm_angle = LSTM(64, return_sequences=True)(arm_angle_input, mask=mask_input)
    x_arm_angle = LSTM(32, return_sequences=True)(x_arm_angle)
    x_arm_angle = GlobalAveragePooling1D()(x_arm_angle)

    x_hand = LSTM(64, return_sequences=True)(hand_input, mask=mask_input)
    x_hand = LSTM(32, return_sequences=True)(x_hand)
    x_hand = GlobalAveragePooling1D()(x_hand)
    
    # 데이터 결합
    combined = concatenate([x_arm_angle, x_hand])

    # 추가 처리
    z = Dense(64, activation="relu", kernel_regularizer=regularizers.l2(0.01))(combined)
    z = Dense(32, activation="relu", kernel_regularizer=regularizers.l2(0.01))(z)
    output = Dense(len(class_vocab), activation="softmax", kernel_regularizer=regularizers.l2(0.01))(z)

    # 모델 생성 및 컴파일
    lstm_model = keras.Model([arm_angle_input, hand_input, mask_input], output)
    lstm_model.compile(loss="sparse_categorical_crossentropy", optimizer="adam", metrics=["accuracy"])

    return lstm_model

In [None]:
EPOCHS = 200
BATCH_SIZE= 128
def run_experiment():
    filepath = "tmp/video_classifier_lstm3.h5"
    checkpoint = keras.callbacks.ModelCheckpoint(
        filepath, save_weights_only=True, save_best_only=True, verbose=1)

    seq_model = get_sequence_model()
    history = seq_model.fit(
        [train_data[0], train_data[1], train_data[2]],  # 팔 각도, 손가락 각도, 마스크 데이터
        train_labels,
        batch_size=BATCH_SIZE,
        validation_split=0.2,
        epochs=EPOCHS,
        callbacks=[checkpoint],
    )

    seq_model.load_weights(filepath)
    
    _, accuracy = seq_model.evaluate(
        [test_data[0], test_data[1], test_data[2]],  # 팔 각도, 손가락 각도, 마스크 데이터
        test_labels
    )
    print(f"Test accuracy: {round(accuracy * 100, 2)}%")
    
    # 손실 및 정확도 그래프 출력
    plt.figure(figsize=(10, 4))
    # 손실 그래프
    ax1 = plt.subplot(1, 1, 1)
    ax1.plot(history.history['loss'], label='Train Loss', color='blue')
    ax1.plot(history.history['val_loss'], label='Validation Loss', color='blue', linestyle='dashed')
    ax1.set_title('Training and Validation Loss and Accuracy')
    ax1.set_xlabel('Epochs')
    ax1.set_ylabel('Loss', color='blue')
    ax1.tick_params(axis='y', labelcolor='blue')
    ax1.legend(loc='upper left')
    # 정확도 그래프를 같은 그래프에 추가
    ax2 = ax1.twinx()
    ax2.plot(history.history['accuracy'], label='Train Accuracy', color='green')
    ax2.plot(history.history['val_accuracy'], label='Validation Accuracy', color='green', linestyle='dashed')
    ax2.set_ylabel('Accuracy', color='green')
    ax2.tick_params(axis='y', labelcolor='green')
    ax2.legend(loc='upper right')
    plt.show()
    
    seq_model.save('test_lstm_model3.h5')
    
    return history, seq_model

_, sequence_model = run_experiment()

In [None]:
def prepare_single_video(hands, arm_angles):
    frame_mask = np.zeros((1, MAX_SEQ_LENGTH), dtype="bool")
    frame_hand_angles = np.zeros((1, MAX_SEQ_LENGTH, HAND_FEATURES), dtype="float16")
    frame_arm_angles = np.zeros((1, MAX_SEQ_LENGTH, 4), dtype="float16")  # 4는 왼팔, 오른팔 각도
    
    for j in range(MAX_SEQ_LENGTH):
        # 손가락 각도 데이터 전처리
        hand_feature = preprocess_hand_data(hands[j]) if j < len(hands) else np.zeros(HAND_FEATURES)
        frame_hand_angles[0, j] = hand_feature

        # 팔 각도 데이터 전처리
        arm_angle_feature = np.array(arm_angles[j]) if j < len(arm_angles) else np.zeros(4)
        frame_arm_angles[0, j] = arm_angle_feature

        frame_mask[0, j] = 1 if j < len(hands) or j < len(arm_angles) else 0

    return frame_hand_angles, frame_arm_angles, frame_mask

In [None]:
def sequence_prediction(path):
    class_vocab = label_processor.get_vocabulary()
    hands, arm_angles = load_video(path)
    
    frame_hand_angles, frame_arm_angles, frame_mask = prepare_single_video(hands, arm_angles)
    probabilities = sequence_model.predict([frame_hand_angles, frame_arm_angles, frame_mask])[0]
    
    for i in np.argsort(probabilities)[::-1]:
        print(f"{class_vocab[i]} : {probabilities[i] * 100:5.2f}%")
        
    return probabilities  # 클래스별 예측 확률 반환

test_video = np.random.choice(test_df["video_name"].values.tolist())
print(f"Test video path: {test_video}")

test_frames = sequence_prediction(test_video)