In [1]:
import cv2
import mediapipe as mp
import time
import numpy as np
# import numpy as np
import random
from tqdm import tqdm
import scipy.ndimage.interpolation as inter
from scipy.signal import medfilt 
from scipy.spatial.distance import cdist

from keras.optimizers import *
from keras.models import Model
from keras.layers import *
from keras.layers.core import *
from tensorflow.keras.callbacks import *
from keras.layers.convolutional import *
import tensorflow as tf

In [2]:
random.seed(1234)

class Config():
    def __init__(self):
        self.frame_l = 96 # the length of frames
        self.joint_n = 33 # the number of joints
        self.joint_d = 2 # the dimension of joints
        self.clc_num = 10 # the number of class
        self.feat_d = 528
        self.filters = 64
        self.nd = 60
C = Config()

In [3]:
def zoom(p,target_l=32,joints_num=20,joints_dim=3):
    l = p.shape[0]
    p_new = np.empty([target_l,joints_num,joints_dim]) 
    for m in range(joints_num):
        for n in range(joints_dim):
            p[:,m,n] = medfilt(p[:,m,n],3)
            p_new[:,m,n] = inter.zoom(p[:,m,n],target_l/l)[:target_l]         
    return p_new

def sampling_frame(p,C):
    full_l = p.shape[0] # full length
    if random.uniform(0,1)<0.5: # aligment sampling
        valid_l = np.round(np.random.uniform(0.9,1)*full_l)
        s = random.randint(0, full_l-int(valid_l))
        e = s+valid_l # sample end point
        p = p[int(s):int(e),:,:]    
    else: # without aligment sampling
        valid_l = np.round(np.random.uniform(0.9,1)*full_l)
        index = np.sort(np.random.choice(range(0,full_l),int(valid_l),replace=False))
        p = p[index,:,:]
    p = zoom(p,C.frame_l,C.joint_n,C.joint_d)
    return p

from scipy.spatial.distance import cdist
def get_CG(p,C):
    M = []
    iu = np.triu_indices(C.joint_n,1,C.joint_n)
    for f in range(C.frame_l):
        #distance max 
        d_m = cdist(p[f],np.concatenate([p[f],np.zeros([1,C.joint_d])]),'euclidean')       
        d_m = d_m[iu] 
        M.append(d_m)
    M = np.stack(M)   
    return M

def norm_train(p):
    # normolize to start point, use the center for hand case
    # p[:,:,0] = p[:,:,0]-p[:,3:4,0]
    # p[:,:,1] = p[:,:,1]-p[:,3:4,1]
    # p[:,:,2] = p[:,:,2]-p[:,3:4,2]
    # # return p
       
    p[:,:,0] = p[:,:,0]-np.mean(p[:,:,0])
    p[:,:,1] = p[:,:,1]-np.mean(p[:,:,1])
    p[:,:,2] = p[:,:,2]-np.mean(p[:,:,2])
    return p
def norm_train2d(p):
    # normolize to start point, use the center for hand case
    # p[:,:,0] = p[:,:,0]-p[:,3:4,0]
    # p[:,:,1] = p[:,:,1]-p[:,3:4,1]
    # p[:,:,2] = p[:,:,2]-p[:,3:4,2]
    # # return p
       
    p[:,:,0] = p[:,:,0]-np.mean(p[:,:,0])
    p[:,:,1] = p[:,:,1]-np.mean(p[:,:,1])
    # p[:,:,2] = p[:,:,2]-np.mean(p[:,:,2])
    return p
# def normlize_test(p):
#     # normolize to start point, use the center for hand case
#     p[:,:,0] = p[:,:,0]-p[:,1:2,0]
#     p[:,:,1] = p[:,:,1]-p[:,1:2,1]
#     p[:,:,2] = p[:,:,2]-p[:,1:2,2]
#     # p[:,:,0] = p[:,:,0]-np.mean(p[:,:,0])
#     # p[:,:,1] = p[:,:,1]-np.mean(p[:,:,1])
#     # p[:,:,2] = p[:,:,2]-np.mean(p[:,:,2])
#     return p
#     return p

In [4]:
drop_rate = 0.1
def poses_diff(x):
    H, W = x.get_shape()[1],x.get_shape()[2]
    x = tf.subtract(x[:,1:,...],x[:,:-1,...])
    x = tf.image.resize(x,size=[H,W]) 
    return x
def poses_diff_2(x):
    H, W = x.get_shape()[1],x.get_shape()[2]
    # x = tf.subtract(x[:,1:,...],x[:,:-1,...])
    x = tf.image.resize(x,size=[H,W]) 
    return x
def pose_motion_2(D, frame_l):
    x_1 = Lambda(lambda x: poses_diff_2(x))(D)
    x_1 = Reshape((frame_l,-1))(x_1)
    return x_1

def pose_motion(P,frame_l):
    P_diff_slow = Lambda(lambda x: poses_diff(x))(P)
    P_diff_slow = Reshape((frame_l,-1))(P_diff_slow)
    P_fast = Lambda(lambda x: x[:,::2,...])(P)
    P_diff_fast = Lambda(lambda x: poses_diff(x))(P_fast)
    P_diff_fast = Reshape((int(frame_l/2),-1))(P_diff_fast)
    x_1 = Reshape((frame_l,-1))(P)
    return P_diff_slow,P_diff_fast, x_1
# def reshape_x_2(D, frame_l):
#     x_1 = Lambda(lambda y: poses_diff_2(y))(D)
#     x_1 = Reshape((frame_l, -1))(D)

def c1D(x,filters,kernel):
    x = Conv1D(filters, kernel_size=kernel,padding='same',use_bias=False)(x)
    x = BatchNormalization()(x)
    x = LeakyReLU(alpha=0.2)(x)
    return x

def block(x,filters):
    x = c1D(x,filters,3)
    x = c1D(x,filters,3)
    return x
    
def d1D(x,filters):
    x = Dense(filters,use_bias=False)(x)
    x = BatchNormalization()(x)
    x = LeakyReLU(alpha=0.2)(x)
    return x

def build_FM(frame_l=32,joint_n=20,joint_d=3,feat_d=190,filters=16, nd=60):   
    # M = Input(shape=(frame_l,feat_d))
    P = Input(shape=(frame_l,joint_n,joint_d))
    # D = Input(shape =(frame_l, joint_n, joint_d))
    # x_ = pose_motion_2(D, frame_l)
    diff_slow,diff_fast, x_1 = pose_motion(P,frame_l)
    


    # x = c1D(P,filters*2,1)
    # x = SpatialDropout1D(drop_rate)(x)
    # x = c1D(x,filters,3)
    # x = SpatialDropout1D(drop_rate)(x)
    # x = c1D(x,filters,1)
    # x = MaxPooling1D(2)(x)
    # x = SpatialDropout1D(drop_rate)(x)

    
    x_1 = c1D(x_1, filters*2,1)
    x_1 = SpatialDropout1D(drop_rate)(x_1)
    x_1 = c1D(x_1, filters, 3)
    x_1 = SpatialDropout1D(drop_rate)(x_1)
    x_1 = c1D(x_1, filters,1)
    x_1 = MaxPooling1D(2)(x_1)
    x_1 = SpatialDropout1D(drop_rate)(x_1)

    x_d_slow = c1D(diff_slow,filters*2,1)
    x_d_slow = SpatialDropout1D(drop_rate)(x_d_slow)
    x_d_slow = c1D(x_d_slow,filters,3)
    x_d_slow = SpatialDropout1D(drop_rate)(x_d_slow)
    x_d_slow = c1D(x_d_slow,filters,1)
    x_d_slow = MaxPool1D(2)(x_d_slow)
    x_d_slow = SpatialDropout1D(drop_rate)(x_d_slow)

    # x = c1D(diff_fast,filters*2,1)
    # x = SpatialDropout1D(drop_rate)(x)
    # x = c1D(x,filters,3) 
    # x = SpatialDropout1D(drop_rate)(x)
    # x = c1D(x,filters,1) 
    # x = SpatialDropout1D(drop_rate)(x)

    x_d_fast = c1D(diff_fast,filters*2,1)
    x_d_fast = SpatialDropout1D(drop_rate)(x_d_fast)
    x_d_fast = c1D(x_d_fast,filters,3) 
    x_d_fast = SpatialDropout1D(drop_rate)(x_d_fast)
    x_d_fast = c1D(x_d_fast,filters,1) 
    x_d_fast = SpatialDropout1D(drop_rate)(x_d_fast)
   
    x = concatenate([x_1,x_d_slow,x_d_fast])
    x = block(x,filters*2)
    x = MaxPool1D(2)(x)
    x = SpatialDropout1D(drop_rate)(x)
    
    x = block(x,filters*4)
    x = MaxPool1D(2)(x)
    x = SpatialDropout1D(drop_rate)(x)

    x = block(x,filters*8)
    x = SpatialDropout1D(drop_rate)(x)
    
    return Model(inputs=[P],outputs=x)


def build_DD_Net(C):
    # M = Input(name='M', shape=(C.frame_l,C.feat_d))  
    P = Input(name='P', shape=(C.frame_l,C.joint_n,C.joint_d)) 
    # D = Input(name ='D', shape =(C.frame_l, C.joint_n,C.joint_d))
    FM = build_FM(C.frame_l,C.joint_n,C.joint_d,C.feat_d,C.filters)
    
    x = FM([P])

    x = GlobalMaxPool1D()(x)
    
    x = d1D(x,128)
    x = Dropout(0.5)(x)
    x = d1D(x,128)
    x = Dropout(0.5)(x)
    x = Dense(10, activation='softmax')(x)
    
    ######################Self-supervised part
    model = Model(inputs=[P],outputs=x)
    return model

In [5]:
def data_generator_rt(T):
    X = []
#     X_1 = []

    T = np.expand_dims(T, axis = 0)
    for i in range(len(T)): 
        p = T[i]
#         p = zoom(p,target_l=C.frame_l,joints_num=C.joint_n,joints_dim=C.joint_d)

#         M = get_CG(p,C)

#         X_0.append(M)
#         p = norm_train2d(p)

        X.append(p)

    X = np.stack(X)  
#     X_1 = np.stack(X_1) 

    return X

In [6]:
labels_text = ['Xin chào, rất vui được gặp bạn!', 'Tạm biệt, hẹn gặp lại!', 'Xin cảm ơn, bạn thật tốt bụng!', 'Tôi xin lỗi, bạn có sao không','Tôi yêu gia đình và bạn bè.', 'Tôi là học sinh.', 'Tôi thích động vật.', 'Tôi ăn cơm.', 'Tôi sống ở Việt Nam.','Tôi là người Điếc']
# labels = ['1', '2','3','4','5','6','7','8','9','10']
labels = ['xin chao rat vui duoc gap ban', 'tam biet hen gap lai', 'xin cam on ban that tot bung', 'toi xin loi ban co sao khong', 'toi yeu gia dinh va ban be', 'toi la hoc sinh', 'toi thich dong vat', 'toi an com', 'toi song o viet nam', 'toi la nguoi diec']
len(labels)

10

In [7]:
colors = [(245,117,16), (117,245,16), (16,117,245)]
def prob_viz(res, actions, input_frame, colors):
    output_frame = input_frame.copy()
    for num, prob in enumerate(res):
#         cv2.rectangle(output_frame, (0,60+num*40), (int(prob*100), 90+num*40), colors[num], -1)
        cv2.putText(output_frame, actions[num], (0, 85+num*40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255,255,255), 2, cv2.LINE_AA)  
    return output_frame

In [8]:
mp_drawing = mp.solutions.drawing_utils
mp_drawing_styles = mp.solutions.drawing_styles
mp_pose = mp.solutions.pose

In [10]:
DD_Net = build_DD_Net(C)
DD_Net.summary()

DD_Net.load_weights('nnkh-8-11-cv(1).h5')

Model: "model_1"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
P (InputLayer)               [(None, 96, 33, 2)]       0         
_________________________________________________________________
model (Functional)           (None, 12, 512)           1719040   
_________________________________________________________________
global_max_pooling1d (Global (None, 512)               0         
_________________________________________________________________
dense (Dense)                (None, 128)               65536     
_________________________________________________________________
batch_normalization_15 (Batc (None, 128)               512       
_________________________________________________________________
leaky_re_lu_15 (LeakyReLU)   (None, 128)               0         
_________________________________________________________________
dropout (Dropout)            (None, 128)               0   

In [11]:
time0 = 0
sequence = []
sentence = ['']
predictions = []
threshold = 0.6

cap = cv2.VideoCapture(0)
with mp_pose.Pose(min_detection_confidence=0.5, min_tracking_confidence=0.5) as pose:
    while cap.isOpened():
        success, image = cap.read()

        # Flip the image horizontally for a selfie-view display.
        image = cv2.flip(image, 1)
        
        # To improve performance, optionally mark the image as not writeable to
        # pass by reference.
        image.flags.writeable = False
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        results = pose.process(image)

        # Draw the pose annotation on the image.
        image.flags.writeable = True
        image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
        mp_drawing.draw_landmarks(
            image,
            results.pose_landmarks,
            mp_pose.POSE_CONNECTIONS,
            landmark_drawing_spec=mp_drawing_styles.get_default_pose_landmarks_style()
            )

        if results.pose_landmarks:
            keypoint = np.array([[res.x, res.y] for res in results.pose_landmarks.landmark]).flatten()
            keypoint = np.array_split(keypoint, 33)
            #print(keypoint)
            sequence.append(keypoint)
            #print(sequence)
            sequence = sequence[-116:]

        if len(sequence) == 116:
            #print(sequence)
            X_test_rt = data_generator_rt(sequence[-96:])
            res = DD_Net.predict([X_test_rt])[0]
            print(labels_text[np.argmax(res)])
            sentence.append(labels[np.argmax(res)])
#             predictions.append(np.argmax(res))
#             sequence.clear()
#             if np.unique(predictions[-10:])[0]==np.argmax(res): 
#                 if res[np.argmax(res)] > threshold: 
                    
#                     if len(sentence) > 0: 
#                         if labels[np.argmax(res)] != sentence[-1]:
#                             sentence.append(labels[np.argmax(res)])
#                     else:
#                         sentence.append(labels[np.argmax(res)])
            sequence.clear()  
#             if len(sentence) > 1: 
#                 sentence = sentence[-1:]
            print(sentence)
#             image = prob_viz(res, labels, image, colors)

        # Show fps
        time1 = time.time()
        fps = 1 / (time1 - time0)
        time0 = time1
        cv2.putText(image,'FPS:' + str(int(fps)), (3, 475), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2, cv2.LINE_AA)
        cv2.rectangle(image, (0,0), (640, 40), (245, 117, 16), -1)
#         cv2.putText(image, ' '.join(sentence), (3,30), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2, cv2.LINE_AA)
        cv2.putText(image, ''.join(sentence[-1:]), (3,30), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2, cv2.LINE_AA)

        cv2.imshow('MediaPipe Pose', image)

        if cv2.waitKey(5) & 0xFF == ord('q'):
            break

cap.release()
cv2.destroyAllWindows()


Xin chào, rất vui được gặp bạn!
['', 'xin chao rat vui duoc gap ban']
Tôi là học sinh.
['', 'xin chao rat vui duoc gap ban', 'toi la hoc sinh']
Tôi là học sinh.
['', 'xin chao rat vui duoc gap ban', 'toi la hoc sinh', 'toi la hoc sinh']
Tôi là người Điếc
['', 'xin chao rat vui duoc gap ban', 'toi la hoc sinh', 'toi la hoc sinh', 'toi la nguoi diec']
Tôi là học sinh.
['', 'xin chao rat vui duoc gap ban', 'toi la hoc sinh', 'toi la hoc sinh', 'toi la nguoi diec', 'toi la hoc sinh']
