### 수어 인식 모델 생성

In [None]:
import mediapipe as mp
import cv2
import os
import json
import numpy as np
from matplotlib import pyplot as plt
import time
import tensorflow
from sklearn.model_selection import train_test_split
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Bidirectional, Dropout, Conv2D, MaxPooling2D, Flatten, Reshape, Conv1D, MaxPooling1D, BatchNormalization
from tensorflow.keras.callbacks import TensorBoard
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.losses import CategoricalCrossentropy
from keras.layers import TimeDistributed
from sklearn.metrics import multilabel_confusion_matrix, accuracy_score

In [None]:
mp_holistic = mp.solutions.holistic # Holistic model (face, pose, left/right hand 인식 가능 모듈)
mp_drawing = mp.solutions.drawing_utils # Drawing utilities

In [None]:
def mediapipe_detection(image, model):
    image = cv2.flip(image,1)                      # 이미지 좌/우 반전
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) # COLOR CONVERSION BGR 2 RGB (OpenCV 영상은 BRG 형식, Mediapipe는 RGB 형식이기 때문에)
    image.flags.writeable = False                  # Image is no longer writeable
    results = model.process(image)                 # Make prediction (result에 detection한 결과 값을 저장)
    image.flags.writeable = True                   # Image is now writeable 
    image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR) # COLOR COVERSION RGB 2 BGR (Mediapipe용으로 RGB로 변환했던 것을 OpenCV 영상처리를 위해 다시 BRG로 되돌림)
    return image, results

In [None]:
def draw_landmarks(image, results):
    #mp_drawing.draw_landmarks(image, results.face_landmarks, mp_holistic.FACEMESH_TESSELATION) # 얼굴 랜드마크
    mp_drawing.draw_landmarks(image, results.pose_landmarks, mp_holistic.POSE_CONNECTIONS) 
    mp_drawing.draw_landmarks(image, results.left_hand_landmarks, mp_holistic.HAND_CONNECTIONS) 
    mp_drawing.draw_landmarks(image, results.right_hand_landmarks, mp_holistic.HAND_CONNECTIONS)

In [None]:
# landmark draw custom (각 connection 마다 다른 DrawingSpec 지정.)
def draw_styled_landmarks(image, results):
    # Draw face connections 얼굴 랜드마크
    """"mp_drawing.draw_landmarks(image, results.face_landmarks, mp_holistic.FACEMESH_TESSELATION, 
                             mp_drawing.DrawingSpec(color=(80,110,10), thickness=1, circle_radius=1), 
                             mp_drawing.DrawingSpec(color=(80,256,121), thickness=1, circle_radius=1)
                             )""" 
    # Draw pose connections
    mp_drawing.draw_landmarks(image, results.pose_landmarks, mp_holistic.POSE_CONNECTIONS,
                             mp_drawing.DrawingSpec(color=(80,22,10), thickness=2, circle_radius=4), 
                             mp_drawing.DrawingSpec(color=(80,44,121), thickness=2, circle_radius=2)
                             ) 
    # Draw left hand connections
    mp_drawing.draw_landmarks(image, results.left_hand_landmarks, mp_holistic.HAND_CONNECTIONS, 
                             mp_drawing.DrawingSpec(color=(121,22,76), thickness=2, circle_radius=4), 
                             mp_drawing.DrawingSpec(color=(121,44,250), thickness=2, circle_radius=2)
                             ) 
    # Draw right hand connections  
    mp_drawing.draw_landmarks(image, results.right_hand_landmarks, mp_holistic.HAND_CONNECTIONS, 
                             mp_drawing.DrawingSpec(color=(245,117,66), thickness=2, circle_radius=4), 
                             mp_drawing.DrawingSpec(color=(245,66,230), thickness=2, circle_radius=2)
                             ) 

In [None]:
def extract_keypoints(results):
    pose = np.array([[res.x, res.y, res.z, res.visibility] for res in results.pose_landmarks.landmark]).flatten() if results.pose_landmarks else np.zeros(132)
    #face = np.array([[res.x, res.y, res.z] for res in results.face_landmarks.landmark]).flatten() if results.face_landmarks else np.zeros(1404) 얼굴 랜드마크
    lh = np.array([[res.x, res.y, res.z] for res in results.left_hand_landmarks.landmark]).flatten() if results.left_hand_landmarks else np.zeros(21*3)
    rh = np.array([[res.x, res.y, res.z] for res in results.right_hand_landmarks.landmark]).flatten() if results.right_hand_landmarks else np.zeros(21*3)
    return np.concatenate([pose,lh, rh])

In [None]:
DATA_PATH = os.path.join('7WORD_30-150F_90_Data') # 해당 폴더에 저장
actions = np.array(['americano','cafelatte','cafemocha']) #,hot','syrup','next'])
# actions = np.array(['americano', 'cafelatte', 'cafemocha']) # 아메리카노, 카페모카, 카페라떼
no_sequences = 90  #  데이터 수
sequence_length = 150 # frame의 길이

In [None]:
for action in actions:
    for sequence in range(no_sequences):
        try:
            os.makedirs(os.path.join(DATA_PATH, action, str(sequence)))
        except:
            pass

In [None]:
#동영상 파일 경로 설정
americano_video_path_list = []
cafelatte_video_path_list = []
cafemocha_video_path_list = []
ice_video_path_list = []
hot_video_path_list = []
syrup_video_path_list = []
next_video_path_list = []

for i in range(1, 19):
    for orientation in ['D', 'F', 'L', 'R', 'U']:
        file_name = f'signdata1/zip/NIA_SL_WORD1501_REAL{i:02d}_{orientation}.mp4'
        americano_video_path_list.append(file_name)

for i in range(1, 19):
    for orientation in ['D', 'F', 'L', 'R', 'U']:
        file_name = f'signdata1/zip/NIA_SL_WORD1502_REAL{i:02d}_{orientation}.mp4'
        cafelatte_video_path_list.append(file_name)

for i in range(1, 19):
    for orientation in ['D', 'F', 'L', 'R', 'U']:
        file_name = f'signdata1/zip/NIA_SL_WORD1503_REAL{i:02d}_{orientation}.mp4'
        cafemocha_video_path_list.append(file_name)
        
for i in range(1, 19):
    for orientation in ['D', 'F', 'L', 'R', 'U']:
        file_name = f'signdata1/zip/NIA_SL_WORD1504_REAL{i:02d}_{orientation}.mp4'
        ice_video_path_list.append(file_name)
        
for i in range(1, 19):
    for orientation in ['D', 'F', 'L', 'R', 'U']:
        file_name = f'signdata1/zip/NIA_SL_WORD1505_REAL{i:02d}_{orientation}.mp4'
        hot_video_path_list.append(file_name)
        
for i in range(1, 19):
    for orientation in ['D', 'F', 'L', 'R', 'U']:
        file_name = f'signdata1/zip/NIA_SL_WORD1506_REAL{i:02d}_{orientation}.mp4'
        syrup_video_path_list.append(file_name)
        
for i in range(1, 19):
    for orientation in ['D', 'F', 'L', 'R', 'U']:
        file_name = f'signdata1/zip/NIA_SL_WORD1507_REAL{i:02d}_{orientation}.mp4'
        next_video_path_list.append(file_name)

In [None]:
#데이터 수집 코드
for action in actions:
    for sequence in range(no_sequences):
        video_path = action + '_video_path_list[' + str(sequence) + ']'
        cap = cv2.VideoCapture(eval(video_path))
        frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))  # 프레임 높이
        frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))    # 프레임 너비
        start_frame = 30
        last_frame = None

        total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))   #총프레임수 채움
        for frame_num in range(sequence_length):
            if frame_num >= total_frames:
                empty_frame = np.zeros((frame_height, frame_width, 3), dtype=np.uint8)
                frame = empty_frame
            else:
                ret, frame = cap.read()
                if not ret:
                    if last_frame is not None:
                        frame = last_frame.copy()
                    else:
                        continue

            with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:
                if frame_num < start_frame:
                    continue

                image, results = mediapipe_detection(frame, holistic)

                draw_styled_landmarks(image, results)

                if frame_num == 30:
                    cv2.putText(image, 'STARTING COLLECTION', (120, 200),
                                cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 4, cv2.LINE_AA)
                    cv2.putText(image, 'Collecting frames for {} Video Number {}'.format(action, sequence), (15, 12),
                                cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 1, cv2.LINE_AA)
                    cv2.imshow('OpenCV Feed', image)
                    cv2.waitKey(2000)
                else:
                    cv2.putText(image, 'Collecting frames for {} Video Number {}'.format(action, sequence), (15, 12),
                                cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 1, cv2.LINE_AA)
                    cv2.imshow('OpenCV Feed', image)

                keypoints = extract_keypoints(results)
                npy_path = os.path.join(DATA_PATH, action, str(sequence), str(frame_num))
                np.save(npy_path, keypoints)

                if cv2.waitKey(10) & 0xFF == ord('q'):
                    break

            last_frame = frame

cap.release()
cv2.destroyAllWindows()

In [None]:
#레이블 구분
label_map = {label:num for num, label in enumerate(actions)}
label_map

In [None]:
#관절 좌표 데이터 로드
sequences, labels = [], []
for action in actions:
    for sequence in range(no_sequences):
        window = []
        for frame_num in range(30, sequence_length):
            res = np.load(os.path.join(DATA_PATH, action, str(sequence), "{}.npy".format(frame_num)))
            window.append(res)
        sequences.append(window)
        labels.append(label_map[action])

In [None]:
#데이터 확장. 이동/확대/축소 (선택)
def random_translation(data, value=0.06):
    return data + (np.random.uniform(-value, value, data.shape))
def random_scaling(data, scale_range=(0.9, 1.1)):
    scale_factor = np.random.uniform(scale_range[0], scale_range[1])
    return data * scale_factor
augmented_sequences = []
augmented_labels = []

for sequence, label in zip(sequences, labels):
    augmented_sequences.append(sequence)
    augmented_labels.append(label)
    
    # Random Translation
    translated_data = random_translation(np.array(sequence))
    augmented_sequences.append(translated_data.tolist())
    augmented_labels.append(label)

    # Random Scaling
    scaled_data = random_scaling(np.array(sequence))
    augmented_sequences.append(scaled_data.tolist())
    augmented_labels.append(label)

augmented_sequences = np.array(augmented_sequences)
augmented_labels = to_categorical(augmented_labels).astype(int)
X_train, X_test, y_train, y_test = train_test_split(augmented_sequences, augmented_labels, test_size=0.05)

In [None]:
# 데이터 확장 없이 학습 데이터 생성 
np.array(sequences).shape
np.array(labels).shape

X = np.array(sequences)
X.shape

y = to_categorical(labels).astype(int)
y

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.05)

In [None]:
log_dir = os.path.join('Logs')
tb_callback = TensorBoard(log_dir=log_dir)

In [None]:
#conv1d-lstm-dense 모델

model = Sequential()

# Conv1D 층
model.add(Conv1D(64, 3, activation='relu', padding='same', input_shape=(120, 258)))
model.add(BatchNormalization())
model.add(MaxPooling1D(2))

model.add(Conv1D(128, 3, activation='relu', padding='same'))
model.add(BatchNormalization())
model.add(MaxPooling1D(2))

model.add(Conv1D(256, 3, activation='relu', padding='same'))
model.add(BatchNormalization())
model.add(MaxPooling1D(2))


model.add(BatchNormalization())
model.add(MaxPooling1D(2))


# LSTM 층
#model.add(LSTM(128, return_sequences=True, activation='tanh'))#추가2:0.75
#model.add(LSTM(64, return_sequences=True, activation='tanh'))#감소3:0.97
model.add(LSTM(32, return_sequences=True, activation='tanh'))
model.add(LSTM(16, return_sequences=False, activation='tanh'))#추가:1.0/0,82
model.add(Dropout(0.5))

# Dense 층
model.add(Dense(256, activation='relu'))
model.add(BatchNormalization())
model.add(Dropout(0.5))

model.add(Dense(128, activation='relu'))
model.add(BatchNormalization())
model.add(Dropout(0.5))

# 출력 층
model.add(Dense(actions.shape[0], activation='softmax'))

In [None]:
#데이터 nan 값 확인
has_nan = np.isnan(X_train).any()

if has_nan:
    print("리스트에 NaN 값이 있습니다.")
else:
    print("리스트에 NaN 값이 없습니다.")

In [None]:
custom_optimizer = Adam(learning_rate=0.001)
model.compile(optimizer=custom_optimizer, loss = 'categorical_crossentropy' , metrics=['categorical_accuracy'])
model.fit(X_train, y_train, epochs=100, batch_size=32, callbacks=[tb_callback])

In [None]:
res = model.predict(X_test)

In [None]:
actions[np.argmax(res[0])]

In [None]:
actions[np.argmax(y_test[0])]

In [None]:
model.save('action.h5')

In [None]:
model.load_weights('action.h5')

In [None]:
#임시 검증
yhat = model.predict(X_test)
ytrue = np.argmax(y_test, axis=1).tolist()  
yhat = np.argmax(yhat, axis=1).tolist()
multilabel_confusion_matrix(ytrue, yhat)

In [None]:
accuracy_score(ytrue, yhat)

In [None]:
#실시간 감지 화면 설정
colors = [(245,117,16), (117,245,16), (16,117,245)] #,(255,20,147)] #(255,255,0),(255,0,255),(0,255,255)]
def prob_viz(res, actions, input_frame, colors):
    output_frame = input_frame.copy()
    for num, prob in enumerate(res):
        cv2.rectangle(output_frame, (0,60+num*40), (int(prob*100), 90+num*40), colors[num], -1)
        cv2.putText(output_frame, actions[num], (0, 85+num*40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255,255,255), 2, cv2.LINE_AA)
        
    return output_frame

In [None]:
import cv2
import mediapipe as mp
import time
import numpy as np

# 필요한 변수 초기화
sequence = []     # 각 프레임의 키포인트를 저장하는 리스트
sentence = []     # 인식된 수어를 저장하는 리스트
res = np.zeros(len(actions))  # 수어 인식 결과를 저장하는 배열
threshold = 0.8   # 적중 확률 임계값
start_capture = False  # 수어 인식 시작 여부
circle_color = (0, 255, 0)  # 녹화 표시 원의 색 (녹색=대기, 빨간색=녹화중)
countdown = 7 # 카운트다운 시간 (초)
last_capture_time = time.time()  # 마지막 인식 시작 시간

# 웹캠 설정
cap = cv2.VideoCapture(0)
cv2.namedWindow('OpenCV Feed', cv2.WINDOW_NORMAL)
cv2.resizeWindow('OpenCV Feed', 1920, 1080)

# MediaPipe 모델 설정
with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:
    while cap.isOpened():
        # 현재 시간 업데이트
        current_time = time.time()

        # 비디오 프레임 읽기
        ret, frame = cap.read()

        # 랜드마크 감지
        image, results = mediapipe_detection(frame, holistic)

        # 카운트다운 표시
        if not start_capture and current_time - last_capture_time < countdown:
            time_left = countdown - int(current_time - last_capture_time)
            cv2.putText(image, str(time_left), (320, 240),
                        cv2.FONT_HERSHEY_SIMPLEX, 2, (0, 255, 255), 3, cv2.LINE_AA)
        elif not start_capture:
            start_capture = True

        # 랜드마크 그리기
        draw_styled_landmarks(image, results)

        # 수어 인식 시작
        if start_capture:
            keypoints = extract_keypoints(results)
            sequence.append(keypoints)
            circle_color = (0, 0, 255)  # 빨간 원으로 변경 (녹화 중)

            # "next" 수어가 인식되면 프로그램 종료
            #if "next" in sentence:
                #break

            # 시퀀스 길이가 120에 도달하면 수어 인식 시작
            if len(sequence) == 120:
                res = model.predict(np.expand_dims(sequence, axis=0))[0]
                print(actions[np.argmax(res)])  # 인식된 수어 출력

                # 인식된 수어 처리
                if res[np.argmax(res)] > threshold:
                    if len(sentence) > 0 and actions[np.argmax(res)] != sentence[-1]:
                        sentence.append(actions[np.argmax(res)])
                    elif len(sentence) == 0:
                        sentence.append(actions[np.argmax(res)])
                    

                # 다음 인식을 위한 초기화
                start_capture = False
                last_capture_time = time.time()
                sequence = []

        # 인식 결과 시각화
        image = prob_viz(res, actions, image, colors)
        cv2.circle(image, (610, 450), 20, circle_color, -1)
        cv2.rectangle(image, (0,0), (640, 40), (245, 117, 16), -1)
        cv2.putText(image, ' '.join(sentence), (3,30),
                    cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2, cv2.LINE_AA)

        # 화면에 표시
        cv2.imshow('OpenCV Feed', image)

        # 키 입력 처리 ('q'로 종료)
        key = cv2.waitKey(10)
        if key & 0xFF == ord('q'):
            break

    # 웹캠 및 OpenCV 창 해제
    cap.release()
    cv2.destroyAllWindows()