In [2]:
import os
import numpy as np
import time
from sklearn.model_selection import train_test_split
from tensorflow.keras.utils import to_categorical

## Load Data

In [3]:
# 1. 打包模組需要的 Dataset
sequences = np.load(os.path.join('data-merge-0828.npy'))
headsSequences = sequences[:, :, 132:258]
headsSequences.shape

(4485, 30, 126)

In [4]:
# 2. 模組需要的字詞 Labels
actions = np.array(['0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'apply', 'check', 'deposit', 'finish', 'get', 'give_you', 'good',
                    'i', 'id_card', 'is', 'job', 'money', 'same', 'saving_book', 'sheet', 'sign', 'stamp', 'taiwan', 'take', 'ten_thousand',
                    'thank_you', 'this', 'thousands', 'transfer', 'transfer_in', 'transfer_out', 'want', 'yes', 'you']) 

label_map = {label:num for num, label in enumerate(actions)}
print(label_map)
print(actions.shape[0])

{'0': 0, '1': 1, '2': 2, '3': 3, '4': 4, '5': 5, '6': 6, '7': 7, '8': 8, '9': 9, 'apply': 10, 'check': 11, 'deposit': 12, 'finish': 13, 'get': 14, 'give_you': 15, 'good': 16, 'i': 17, 'id_card': 18, 'is': 19, 'job': 20, 'money': 21, 'same': 22, 'saving_book': 23, 'sheet': 24, 'sign': 25, 'stamp': 26, 'taiwan': 27, 'take': 28, 'ten_thousand': 29, 'thank_you': 30, 'this': 31, 'thousands': 32, 'transfer': 33, 'transfer_in': 34, 'transfer_out': 35, 'want': 36, 'yes': 37, 'you': 38}
39


In [5]:
# 115 是每個字詞的資料量，可以再修改
labels = []
for action in actions:
    for _ in range(115):
        labels.append(label_map[action])
print([[actions[l], l] for l in labels[::115]])

[['0', 0], ['1', 1], ['2', 2], ['3', 3], ['4', 4], ['5', 5], ['6', 6], ['7', 7], ['8', 8], ['9', 9], ['apply', 10], ['check', 11], ['deposit', 12], ['finish', 13], ['get', 14], ['give_you', 15], ['good', 16], ['i', 17], ['id_card', 18], ['is', 19], ['job', 20], ['money', 21], ['same', 22], ['saving_book', 23], ['sheet', 24], ['sign', 25], ['stamp', 26], ['taiwan', 27], ['take', 28], ['ten_thousand', 29], ['thank_you', 30], ['this', 31], ['thousands', 32], ['transfer', 33], ['transfer_in', 34], ['transfer_out', 35], ['want', 36], ['yes', 37], ['you', 38]]


In [6]:
# Reshape (-1,30,126) 是讓所有的資料變成z:258, y:30 的array
X = headsSequences.reshape(-1,30,126)
y = to_categorical(labels).astype(int)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.1)
print(X.shape)
print(X_train.shape)
print(X_test.shape)

(4485, 30, 126)
(4036, 30, 126)
(449, 30, 126)


# Train model 

In [7]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, GRU
from tensorflow.keras.callbacks import TensorBoard, EarlyStopping

In [8]:
# model = Sequential()
# model.add(GRU(64, activation='relu', input_shape=(30,126))) # LSTM -> GRU
# model.add(Dense(64, activation='relu'))
# model.add(Dense(actions.shape[0], activation='softmax'))
# model.compile(optimizer='Adam', loss='categorical_crossentropy', metrics=['categorical_accuracy'])
# model.summary()

In [9]:
# callback = EarlyStopping(monitor='loss', patience=3, restore_best_weights=True)
# model.fit(X_train, y_train, epochs=50, callbacks=[callback])

In [10]:
def accuracy(X, y):
    print(X.shape, y.shape)
    res = model.predict(X)
    accuracy = (np.argmax(res, axis=1) == np.argmax(y, axis=1)).sum()/len(res)
    return accuracy

In [11]:
# print(accuracy(X_train, y_train))
# print(accuracy(X_test, y_test))

# Save & Load Model

In [12]:
# model.save('model1_0828.h5')

In [13]:
from tensorflow.keras.models import load_model

In [14]:
new_model = load_model("model1_0828.h5")
new_model.summary()

Model: "sequential_6"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 gru_5 (GRU)                 (None, 64)                36864     
                                                                 
 dense_10 (Dense)            (None, 64)                4160      
                                                                 
 dense_11 (Dense)            (None, 39)                2535      
                                                                 
Total params: 43559 (170.15 KB)
Trainable params: 43559 (170.15 KB)
Non-trainable params: 0 (0.00 Byte)
_________________________________________________________________


In [15]:
# print(accuracy(X_train, y_train))

In [16]:
res = new_model.predict(X_test)
accuracy = (np.argmax(res, axis=1) == np.argmax(y_test, axis=1)).sum()/len(res)
print(accuracy)

0.9621380846325167


# TFlite

In [17]:
# import tensorflow as tf

In [18]:
# converter = tf.lite.TFLiteConverter.from_keras_model(model)
# converter.target_spec.supported_ops = [tf.lite.OpsSet.TFLITE_BUILTINS, tf.lite.OpsSet.SELECT_TF_OPS]
# converter._experimental_lower_tensor_list_ops = False
# tflite_model = converter.convert()

# with open('my_model2.tflite', 'wb') as f:
#     f.write(tflite_model)

# Realtime Test

In [19]:
import cv2
import mediapipe as mp
from collections import Counter

In [20]:
mp_holistic = mp.solutions.holistic # Holistic model
mp_drawing = mp.solutions.drawing_utils # Drawing utilities

In [21]:
colors = [(245,117,16)] * 39
def prob_viz(res, actions, input_frame, colors):
    output_frame = input_frame.copy()
    for num, prob in enumerate(res):
        # cv2.rectangle(影像, 頂點座標, 對向頂點座標, 顏色, 線條寬度)
        cv2.rectangle(output_frame, (0,60+num*17), (int(prob*100), 90+num*17), colors[num], -1)
        cv2.putText(output_frame, actions[num], (0, 85+num*17), cv2.FONT_HERSHEY_SIMPLEX, 1, (255,255,255), 2, cv2.LINE_AA)
    return output_frame

In [22]:
def mediapipe_detection(image, model):
    # Transfer image
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    image.flags.writeable = False
    # Make prediction
    results = model.process(image)
    return results

In [23]:
def draw_styled_landmarks(image, results):
    # Draw pose connections
    mp_drawing.draw_landmarks(
        image, results.pose_landmarks, mp_holistic.POSE_CONNECTIONS,
        mp_drawing.DrawingSpec(color=(80,22,10), thickness=2, circle_radius=4), 
        mp_drawing.DrawingSpec(color=(80,44,121), thickness=2, circle_radius=2)
    )
    # Draw left hand connections
    mp_drawing.draw_landmarks(
        image, results.left_hand_landmarks, mp_holistic.HAND_CONNECTIONS, 
        mp_drawing.DrawingSpec(color=(121,22,76), thickness=2, circle_radius=4), 
        mp_drawing.DrawingSpec(color=(121,44,250), thickness=2, circle_radius=2)
    ) 
    # Draw right hand connections  
    mp_drawing.draw_landmarks(
        image, results.right_hand_landmarks, mp_holistic.HAND_CONNECTIONS, 
        mp_drawing.DrawingSpec(color=(245,117,66), thickness=2, circle_radius=4), 
        mp_drawing.DrawingSpec(color=(245,66,230), thickness=2, circle_radius=2)
    ) 

In [24]:
def extract_keypoints_without_face(results):
    pose = np.array([[res.x, res.y, res.z, res.visibility] for res in results.pose_landmarks.landmark]).flatten() if results.pose_landmarks else np.zeros(33*4)
    lh = np.array([[res.x, res.y, res.z] for res in results.left_hand_landmarks.landmark]).flatten() if results.left_hand_landmarks else np.zeros(21*3)
    rh = np.array([[res.x, res.y, res.z] for res in results.right_hand_landmarks.landmark]).flatten() if results.right_hand_landmarks else np.zeros(21*3)
    return np.concatenate([lh, rh]) 

In [25]:
# def get_dataset(data_path_trans):
#     input_texts = []
#     target_texts = []

#     input_characters = set()
#     target_characters = set()
#     with open(data_path_trans, 'r', encoding='utf-8') as f:
#         lines = f.read().split('\n')
#     for line in lines:
#         input_text, target_text= line.split('   ')
#         # 用tab作用序列的开始，用\n作为序列的结束
#         target_text = '\t' + target_text + '\n'

#         input_texts.append(input_text)
#         target_texts.append(target_text)
        
#         for char in input_text:
#             if char not in input_characters:
#                 input_characters.add(char)
#         for char in target_text:
#             if char not in target_characters:
#                 target_characters.add(char)
#     return input_texts,target_texts,input_characters,target_characters


# #------------------------------------------#
# #   init初始化部分
# #------------------------------------------#

# # 获取数据集
# # input_texts為输入的英文手語序 target_texts為對應的中文口語序
# # input_characters用到的所有输入字符,如a,b,c,d,e,……,.,!等
# data_path_trans = 'EngToChinese.txt'
# input_texts,target_texts,input_characters,target_characters = get_dataset(data_path_trans)

# # 对字符进行排序
# input_characters = sorted(list(input_characters))
# target_characters = sorted(list(target_characters))

# # 计算共用到了什么字符
# num_encoder_tokens = len(input_characters)
# num_decoder_tokens = len(target_characters)
# # 计算出最长的序列是多长
# max_encoder_seq_length = max([len(txt) for txt in input_texts])
# max_decoder_seq_length = max([len(txt) for txt in target_texts])

# # 建立字母到数字的映射
# input_token_index = dict(
#     [(char, i) for i, char in enumerate(input_characters)])
# target_token_index = dict(
#     [(char, i) for i, char in enumerate(target_characters)])
# # 求數字到字母的映射
# reverse_target_char_index = dict(
#     (i, char) for char, i in target_token_index.items())
    

# model_trans = load_model("model071604-20.h5")

In [26]:
# def translate(model_opt):
#     # model_opt = 'check yes paper sign'
#     in_encoder = np.zeros((1, max_encoder_seq_length, num_encoder_tokens),dtype='float32')

#     for t, char in enumerate(model_opt):
#         in_encoder[0, t, input_token_index[char]] = 1.
#     in_encoder[0, t + 1:, input_token_index[' ']] = 1.

#     in_decoder = np.zeros((len(in_encoder), max_decoder_seq_length, num_decoder_tokens),dtype='float32')
#     in_decoder[:, 0, target_token_index["\t"]] = 1

#     # 生成 decoder 的 output
#     for i in range(max_decoder_seq_length - 1):
#         predict = model_trans.predict([in_encoder, in_decoder])
#         predict = predict.argmax(axis=-1)
#         predict_ = predict[:, i].ravel().tolist()
#         for j, x in enumerate(predict_):
#             in_decoder[j, i + 1, x] = 1 # 將每個預測出的 token 設為 decoder 下一個 timestamp 的輸入

#     seq_index = 0
#     decoded_sentence = ""
#     output_seq = predict[seq_index, :].ravel().tolist()
#     for x in output_seq:
#         if reverse_target_char_index[x] == "\n":
#             break
#         else:
#             decoded_sentence+=reverse_target_char_index[x]

#     return decoded_sentence

In [27]:
# 1. New detection variables
sequence = []
sentence = []
predictions = []
threshold = 0.7
alarm_set = False
trans_result =""

cap = cv2.VideoCapture(0)
# Set mediapipe model 
with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:
    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        # Make detections
        results = mediapipe_detection(frame, holistic)
        # Draw landmarks
        draw_styled_landmarks(frame, results)
        
        # 2. Prediction logic
        keypoints = extract_keypoints_without_face(results)
        if np.count_nonzero(keypoints) > 30:
            sequence.append(keypoints)
            sequence = sequence[-30:]
        
        if len(sequence) == 30:
            res = new_model.predict(np.expand_dims(sequence, axis=0))[0]
            if res[np.argmax(res)] > threshold: 
                predictions.append(np.argmax(res))


            
            
        #3. Viz logic
            if Counter(predictions[-10:]).most_common(1)[0][0]==np.argmax(res):      
                if len(sentence) > 0: 
                    if actions[np.argmax(res)] != sentence[-1]:
                        sentence.append(actions[np.argmax(res)])
                        sequence = []
                        last_updated_time = time.time()
                        alarm_set = True
                else:
                    sentence.append(actions[np.argmax(res)])
                    sequence = []
                    last_updated_time = time.time()
                    alarm_set = True

            if len(sentence) > 5: 
                sentence = sentence[-5:]

            # Viz probabilities
            frame = prob_viz(res, actions, frame, colors)

        # current_time = time.time()  
        # if alarm_set and current_time - last_updated_time >= 10:
        #     # 時間過10秒，將 sentence 放入下一個模型進行預測
        #     trans_result = translate(' '.join(sentence))
        #     print('---result---', trans_result)
        #     # 清空 sentence 資料
        #     alarm_set = False
        #     sequence = []
        #     sentence = []
            
        cv2.rectangle(frame, (0,0), (640, 40), (245, 117, 16), -1)
        cv2.putText(frame, ' '.join(sentence), (3,30), 
                       cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2, cv2.LINE_AA)
        
        # Show to screen
        cv2.imshow('OpenCV Feed', frame)

        # Break gracefully
        if cv2.waitKey(10) & 0xFF == ord('q'):
            break
    cap.release()
    cv2.destroyAllWindows()

INFO: Created TensorFlow Lite XNNPACK delegate for CPU.


