In [1]:
import pandas as pd
import numpy as np
import tensorflow as tf
import os
from glob import glob

# Remove axis according to 'valid_axis' list
def remove_axis(skeleton_sample):
    result, valid_axis = [], [1, 2, 3, 4, 5, 6, 7, 8, 9, 13, 14, 15, 16, 17, 18, 22, 23, 24, 43, 44, 45, 46, 47, 48]

    for index in range(skeleton_sample.shape[0]):
        result.append(skeleton_sample[index,:]) if index + 1 in valid_axis else None
    
    result = np.array(result)
    return result

def interpolate(skeleton_sample, finger_sample):
    SKELETON_DATA_LENGTH = skeleton_sample.shape[1]
    FINGER_DATA_LENGTH = finger_sample.shape[1]
    ratio = (FINGER_DATA_LENGTH - 1) / (SKELETON_DATA_LENGTH - 1)
    interpolate_result = []
    
    # Procedure check the element's value is '-10' in python file and interpolate it using mean value
    for line in range(skeleton_sample.shape[0]):
        float_line = skeleton_sample[line,:] # To handle, convert string list "line" to "float" list
        invalid_loc = [i for i, x in enumerate(float_line) if x == -10] # Extract the location of element which is value -10
        # Interpolate "p" point with "p1" and "p2" point, where the order is p1 << p << p2
        for p_index in invalid_loc:
            p1_index = max([index for index, value in enumerate(float_line) if value != -10 and index < p_index])
            p2_index = min([index for index, value in enumerate(float_line) if value != -10 and index > p_index])
            p_value = float_line[p1_index] + (float_line[p2_index] - float_line[p1_index]) * (p_index - p1_index) / (p2_index - p1_index) # Linear interpolation method (https://en.wikipedia.org/wiki/Linear_interpolation)
            float_line[p_index] = round(p_value, 2)

        interpolate_result.append(float_line)

    final_result = [[0 for i in range(FINGER_DATA_LENGTH)] for j in range(len(interpolate_result))] # Make 2D array, same column with 'MATLAB' & same row with 'PYTHON'
    valid_loc = []

    # Filling the value in 'final_result' list with some stride
    for row_index, row in enumerate(interpolate_result):
        count = 0

        # The values of "interpolation_result" are inserted in final result and that locations are inserted in valid_loc
        while count * ratio < FINGER_DATA_LENGTH:
            valid_loc.append(round(count * ratio)) if row_index == 0 else None
            final_result[row_index][round(count * ratio)] = row[count]
            count += 1
    
    # Interpolate the element of 'final_result' list which value is 0
    for row_index, row in enumerate(final_result):
        # p1, p2 points are valid pixel and the pixels between them are invalid
        # Process interpolation invalid pixel using p1 and p2 value
        for x in range(len(valid_loc) - 1):
            p1_index, p2_index = valid_loc[x], valid_loc[x + 1]
            p1_value, p2_value = final_result[row_index][p1_index], final_result[row_index][p2_index]
            dis = (p2_value - p1_value) / (p2_index - p1_index)

            for i in range(p1_index + 1, p2_index):
                final_result[row_index][i] = round(final_result[row_index][i - 1] + dis, 4)

    return np.array(final_result)

def location_generator(windowed_skeleton_sample):
    nose = windowed_skeleton_sample[:,:3] # joint index = 1 (starting with 1)
    torso = windowed_skeleton_sample[:,3:6] # joint index = 2
    right_shoulder = windowed_skeleton_sample[:,6:9] # joint index = 3
    right_hand = windowed_skeleton_sample[:,9:12] # joint index = 5
    left_shoulder = windowed_skeleton_sample[:,12:15] # joint index = 6
    left_hand = windowed_skeleton_sample[:,15:18] # joint index = 8
    right_eye = windowed_skeleton_sample[:,18:21] # joint index = 15
    left_eye = windowed_skeleton_sample[:,21:24] # joint index = 16
    
    LL = np.hstack((left_hand-nose, left_hand-torso, left_hand-right_shoulder, left_hand-left_shoulder))
    LR = np.hstack((right_hand-nose, right_hand-torso, right_hand-right_shoulder, right_hand-left_shoulder))
    
    return LL, LR

def movement_generator(windowed_skeleton_sample):
    right_hand = windowed_skeleton_sample[:,9:12] # joint index = 5
    left_hand = windowed_skeleton_sample[:,15:18] # joint index = 8
    
    left_mean_pos = np.mean(left_hand, axis=0)
    right_mean_pos = np.mean(right_hand, axis=0)
    
    for i in range(left_hand.shape[0]):
        left_dist = np.linalg.norm(left_hand[i,:]-left_mean_pos)
        right_dist = np.linalg.norm(right_hand[i,:]-right_mean_pos)
        
        if i==0:
            left_shape = left_dist
            right_shape = right_dist
        else:
            left_shape = np.hstack((left_shape,left_dist))
            right_shape = np.hstack((right_shape,right_dist))
    
    left_direction = left_hand[-1,:] - left_hand[0,:]
    right_direction = right_hand[-1,:] - right_hand[0,:]
    
    ML = np.hstack((left_shape, left_direction))
    MR = np.hstack((right_shape, right_direction))
    
    return ML, MR

def handshape_generator(windowed_finger_sample):
    left_hand = windowed_finger_sample[:,:10]
    right_hand = windowed_finger_sample[:,10:]
    
    return left_hand, right_hand
    
if __name__=='__main__':
    window_size = 25
    window_stride = 25
    
    finger_raw = r'D:\\MinHyuk\\Hand Sign Recognition\\2021_SLT_prj\\DATA\\finger_raw'
    skeleton_raw = r'D:\\MinHyuk\\Hand Sign Recognition\\2021_SLT_prj\\DATA\\skeleton_raw'
    HSL_path = 'D:\\MinHyuk\\Hand Sign Recognition\\2021_SLT_prj\\DATA\\feature\\HSL'
    HSR_path = 'D:\\MinHyuk\\Hand Sign Recognition\\2021_SLT_prj\\DATA\\feature\\HSR'    
    LL_path = 'D:\\MinHyuk\\Hand Sign Recognition\\2021_SLT_prj\\DATA\\feature\\LL'
    LR_path = 'D:\\MinHyuk\\Hand Sign Recognition\\2021_SLT_prj\\DATA\\feature\\LR'
    ML_path = 'D:\\MinHyuk\\Hand Sign Recognition\\2021_SLT_prj\\DATA\\feature\\ML'
    MR_path = 'D:\\MinHyuk\\Hand Sign Recognition\\2021_SLT_prj\\DATA\\feature\\MR'    
    
    # Phoneme dictionary load
    HS_path = "D:\\MinHyuk\\Hand Sign Recognition\\2021_SLT_prj\\DATA\\phoneme_dict\\Handshape"
    HS_dict = np.array(pd.read_csv(HS_path+'\\HS_raw.csv',header=None), dtype='float32')
    L_path = "D:\\MinHyuk\\Hand Sign Recognition\\2021_SLT_prj\\DATA\\phoneme_dict\\Location"
    L_dict = np.array(pd.read_csv(L_path+'\\L_raw.csv',header=None), dtype='float32')
    M_path = "D:\\MinHyuk\\Hand Sign Recognition\\2021_SLT_prj\\DATA\\phoneme_dict\\Movement"
    M_dict = np.array(pd.read_csv(M_path+'\\M_raw.csv',header=None), dtype='float32')
    
    skeleton_sample_set = []
    finger_sample_set = []
    for skeleton_sample_name, finger_sample_name in zip(glob(skeleton_raw+'\\*.csv'), glob(finger_raw+'\\*.csv')):
        finger_sample = np.array(pd.read_csv(finger_sample_name, header=None), dtype='float32')
        if not finger_sample_set:
            finger_sample_set = [finger_sample]
        else:
            finger_sample_set.append(finger_sample)
        
        skeleton_sample = np.array(pd.read_csv(skeleton_sample_name, header=None), dtype='float32')
        skeleton_sample = remove_axis(skeleton_sample) # 불필요한 skeletal point 제거
        skeleton_sample = interpolate(skeleton_sample, finger_sample)
        if not skeleton_sample_set:
            skeleton_sample_set = [skeleton_sample]
        else:
            skeleton_sample_set.append(skeleton_sample)
    
    # Export preprocessed handshape feature sequences
    # Feature extraction, standardization, windowing, export data (순서 중요!)
    HSL_sample_set = [None] * len(finger_sample_set)
    HSR_sample_set = [None] * len(finger_sample_set)
    for i, finger_sample in enumerate(finger_sample_set):
        HSL_sample_set[i], HSR_sample_set[i] = handshape_generator(np.transpose(finger_sample)) # Feature extraction
    
    HS_serial_connect = [] # 왼손, 오른손 합쳐서 mean/std 계산
    i=0
    for HSL_sample, HSR_sample in zip(HSL_sample_set, HSR_sample_set):
        if i==0:
            HS_serial_connect = HSL_sample
            HS_serial_connect = np.vstack((HS_serial_connect, HSR_sample))
                        
        else:
            HS_serial_connect = np.vstack((HS_serial_connect, HSL_sample))
            HS_serial_connect = np.vstack((HS_serial_connect, HSR_sample))
        i=i+1
        
    HS_mean = np.mean(HS_serial_connect, axis=0)
    HS_std = np.std(HS_serial_connect, axis=0)
    
    for i, HSL_sample in enumerate(HSL_sample_set): # Standardization
        HSL_sample_set[i] = (HSL_sample - HS_mean)/HS_std
    
    for i, HSR_sample in enumerate(HSR_sample_set):
        HSR_sample_set[i] = (HSR_sample - HS_mean)/HS_std
    
    # Export standardized handshape phoneme
    HS_dict = (HS_dict - HS_mean)/HS_std
    
    HS_dict_for_save = pd.DataFrame(HS_dict)
    filename = HS_path + '\\HS_normalized.csv'
    HS_dict_for_save.to_csv(filename, mode='w', header=False, index=False)
    
    # Windowing
    for HSL_sample, finger_sample_name in zip(HSL_sample_set, os.listdir(finger_raw)[1:]):
        chop_sample = tf.keras.utils.timeseries_dataset_from_array(HSL_sample, targets=None, batch_size = 1, sequence_length = window_size, sequence_stride= window_stride, shuffle=False)
        windowed_set = list(chop_sample.as_numpy_iterator()) # list 타입, 각 element는 array, 각 element의 shape = (window_length,sensor)
        for i in range(len(windowed_set)):
            windowed_HSL_sample = windowed_set[i]
            windowed_HSL_sample = windowed_HSL_sample[0,:,:]                                      
            HSL_each_window = np.vstack((windowed_HSL_sample, 10000*np.ones((1,windowed_HSL_sample.shape[1]))))
            
            if i==0:
                HSL_sample_final = HSL_each_window
            else:
                HSL_sample_final = np.vstack((HSL_sample_final, HSL_each_window))
    
        HSL_sample_for_save = pd.DataFrame(HSL_sample_final)
        filename = HSL_path + '\\[HSL]' + finger_sample_name[8:]
        HSL_sample_for_save.to_csv(filename, mode='w', header=False, index=False) # Export data
                                                                      
    for HSR_sample, finger_sample_name in zip(HSR_sample_set, os.listdir(finger_raw)[1:]):
        chop_sample = tf.keras.utils.timeseries_dataset_from_array(HSR_sample, targets=None, batch_size = 1, sequence_length = window_size, sequence_stride= window_stride, shuffle=False)
        windowed_set = list(chop_sample.as_numpy_iterator()) # list 타입, 각 element는 array, 각 element의 shape = (window_length,sensor)
        for i in range(len(windowed_set)):
            windowed_HSR_sample = windowed_set[i]
            windowed_HSR_sample = windowed_HSR_sample[0,:,:]                                      
            HSR_each_window = np.vstack((windowed_HSR_sample, 10000*np.ones((1,windowed_HSR_sample.shape[1]))))

            if i==0:
                HSR_sample_final = HSR_each_window
            else:
                HSR_sample_final = np.vstack((HSR_sample_final, HSR_each_window))
                
        HSR_sample_for_save = pd.DataFrame(HSR_sample_final)
        filename = HSR_path + '\\[HSR]' + finger_sample_name[8:]
        HSR_sample_for_save.to_csv(filename, mode='w', header=False, index=False) # Export data
        
    # Export preprocessed movement feature sequences
    # Windowing, feature extraction, standardization, export data (순서 중요!)
    ML_serial_connect = [] # list 타입, 각 element는 timestep방향으로 stack된 movement sample, (timestep,movement feature)
    MR_serial_connect = []
    for skeleton_sample in skeleton_sample_set: # Windowing
        ML_sample_connect = []
        MR_sample_connect = []
        chop_sample = tf.keras.utils.timeseries_dataset_from_array(np.transpose(skeleton_sample), targets=None, batch_size = 1, sequence_length = window_size, sequence_stride= window_stride, shuffle=False)
        windowed_set = list(chop_sample.as_numpy_iterator()) # list 타입, 각 element는 array, 각 element의 shape = (window_length,sensor)
        for i in range(len(windowed_set)):
            windowed_skeleton_sample = windowed_set[i]
            windowed_skeleton_sample = windowed_skeleton_sample[0,:,:]
            
            ML_each_window, MR_each_window = movement_generator(windowed_skeleton_sample) # Feature extraction
            
            if i==0:
                ML_sample_connect = ML_each_window
            else:
                ML_sample_connect = np.vstack((ML_sample_connect,ML_each_window))
            
            if i==0:
                MR_sample_connect = MR_each_window
            else:
                MR_sample_connect = np.vstack((MR_sample_connect,MR_each_window))
                
        if not ML_serial_connect:
            ML_serial_connect = [ML_sample_connect]
        else:
            ML_serial_connect.append(ML_sample_connect)
        if not MR_serial_connect:
            MR_serial_connect = [MR_sample_connect]
        else:
            MR_serial_connect.append(MR_sample_connect)
    
    M_total_connect = ML_serial_connect + MR_serial_connect # 왼손, 오른손 합쳐서 mean/std 계산
    M_total_serial_connect = []
    for i, M_total_sample in enumerate(M_total_connect):
        if i==0:
            M_total_serial_connect = M_total_sample
        else:
            M_total_serial_connect = np.vstack((M_total_serial_connect, M_total_sample))
    M_mean = np.mean(M_total_serial_connect, axis=0)
    M_std = np.std(M_total_serial_connect, axis=0)
    
    # Export standardized movement phoneme
    M_dict = (M_dict - M_mean)/M_std
    
    M_dict_for_save = pd.DataFrame(M_dict)
    filename = M_path + '\\M_normalized.csv'
    M_dict_for_save.to_csv(filename, mode='w', header=False, index=False)
    
    for i, ML_sample in enumerate(ML_serial_connect): # Standardization
        ML_serial_connect[i] = (ML_sample - M_mean)/M_std
    for i, MR_sample in enumerate(MR_serial_connect):
        MR_serial_connect[i] = (MR_sample - M_mean)/M_std
    
    for ML_sample_connect, skeleton_sample_name in zip(ML_serial_connect, os.listdir(skeleton_raw)[1:]):
        ML_sample = []
        for i in range(ML_sample_connect.shape[0]):
            ML_each_window = ML_sample_connect[i,:]
            ML_each_window = np.vstack((ML_each_window, 10000*np.ones((1,len(ML_each_window)))))
            if i==0:
                ML_sample = ML_each_window
            else:
                ML_sample = np.vstack((ML_sample, ML_each_window))       
        ML_sample_for_save = pd.DataFrame(ML_sample)
        filename = ML_path + '\\[ML]' + skeleton_sample_name[8:]
        ML_sample_for_save.to_csv(filename, mode='w', header=False, index=False) # Export data
    
    for MR_sample_connect, skeleton_sample_name in zip(MR_serial_connect, os.listdir(skeleton_raw)[1:]):
        MR_sample = []
        for i in range(MR_sample_connect.shape[0]):
            MR_each_window = MR_sample_connect[i,:]
            MR_each_window = np.vstack((MR_each_window, 10000*np.ones((1,len(MR_each_window)))))
            if i==0:
                MR_sample = MR_each_window
            else:
                MR_sample = np.vstack((MR_sample, MR_each_window))       
        MR_sample_for_save = pd.DataFrame(MR_sample)
        filename = MR_path + '\\[MR]' + skeleton_sample_name[8:]
        MR_sample_for_save.to_csv(filename, mode='w', header=False, index=False) # Export data

    # Export preprocessed location feature sequences
    # Feature extraction, standardization, windowing, export data (순서 중요!)
    LL_sample_set = [None] * len(skeleton_sample_set)
    LR_sample_set = [None] * len(skeleton_sample_set)
    for i, skeleton_sample in enumerate(skeleton_sample_set):
        LL_sample_set[i],LR_sample_set[i] = location_generator(np.transpose(skeleton_sample)) # Feature extraction
    
    L_serial_connect = [] # 왼손, 오른손 합쳐서 mean/std 계산
    i=0
    for LL_sample, LR_sample in zip(LL_sample_set, LR_sample_set):
        if i==0:
            L_serial_connect = LL_sample
            L_serial_connect = np.vstack((L_serial_connect, LR_sample))
                        
        else:
            L_serial_connect = np.vstack((L_serial_connect, LL_sample))
            L_serial_connect = np.vstack((L_serial_connect, LR_sample))
        i=i+1
        
    L_mean = np.mean(L_serial_connect, axis=0)
    L_std = np.std(L_serial_connect, axis=0)

    # Export standardized movement phoneme
    L_dict = (L_dict - L_mean)/L_std
    
    L_dict_for_save = pd.DataFrame(L_dict)
    filename = L_path + '\\L_normalized.csv'
    L_dict_for_save.to_csv(filename, mode='w', header=False, index=False)
    
    for i, LL_sample in enumerate(LL_sample_set): # Standardization
        LL_sample_set[i] = (LL_sample - L_mean)/L_std
    
    for i, LR_sample in enumerate(LR_sample_set):
        LR_sample_set[i] = (LR_sample - L_mean)/L_std
    
    # Windowing
    for LL_sample, skeleton_sample_name in zip(LL_sample_set, os.listdir(skeleton_raw)[1:]):
        chop_sample = tf.keras.utils.timeseries_dataset_from_array(LL_sample, targets=None, batch_size = 1, sequence_length = window_size, sequence_stride= window_stride, shuffle=False)
        windowed_set = list(chop_sample.as_numpy_iterator()) # list 타입, 각 element는 array, 각 element의 shape = (window_length,sensor)
        for i in range(len(windowed_set)):
            windowed_LL_sample = windowed_set[i]
            windowed_LL_sample = windowed_LL_sample[0,:,:]                                      
            LL_each_window = np.vstack((windowed_LL_sample, 10000*np.ones((1,windowed_LL_sample.shape[1]))))
            
            if i==0:
                LL_sample_final = LL_each_window
            else:
                LL_sample_final = np.vstack((LL_sample_final, LL_each_window))
    
        LL_sample_for_save = pd.DataFrame(LL_sample_final)
        filename = LL_path + '\\[LL]' + skeleton_sample_name[8:]
        LL_sample_for_save.to_csv(filename, mode='w', header=False, index=False) # Export data
                                                                      
    for LR_sample, skeleton_sample_name in zip(LR_sample_set, os.listdir(skeleton_raw)[1:]):
        chop_sample = tf.keras.utils.timeseries_dataset_from_array(LR_sample, targets=None, batch_size = 1, sequence_length = window_size, sequence_stride= window_stride, shuffle=False)
        windowed_set = list(chop_sample.as_numpy_iterator()) # list 타입, 각 element는 array, 각 element의 shape = (window_length,sensor)
        for i in range(len(windowed_set)):
            windowed_LR_sample = windowed_set[i]
            windowed_LR_sample = windowed_LR_sample[0,:,:]                                      
            LR_each_window = np.vstack((windowed_LR_sample, 10000*np.ones((1,windowed_LR_sample.shape[1]))))

            if i==0:
                LR_sample_final = LR_each_window
            else:
                LR_sample_final = np.vstack((LR_sample_final, LR_each_window))
                
        LR_sample_for_save = pd.DataFrame(LR_sample_final)
        filename = LR_path + '\\[LR]' + skeleton_sample_name[8:]
        LR_sample_for_save.to_csv(filename, mode='w', header=False, index=False) # Export data