# 1 -  install dependencies

In [119]:
import numpy as np
import cv2 
import os
import mediapipe as mp
import tensorflow as tf 
from tensorflow import keras
import matplotlib.pyplot as plt
import time
import pandas as pd

np.random.seed(42)

# 2 - keypoints extractions and drawing

### 

- link to mediapipe documentation and info about keypoints numbers
- https://google.github.io/mediapipe/solutions/hands.html




- link to mediapipe code for drawing (to draw the point myself)
- https://github.com/google/mediapipe/blob/master/mediapipe/python/solutions/drawing_utils.py



In [2]:
pose_selected_landmarks = [
    [0,2,5,11,13,15,12,14,16], # responsible for pose 
    [0,2,4,5,8,9,12,13,16,17,20], # left hand
    [0,2,4,5,8,9,12,13,16,17,20], # right hand
]


mp_holistic = mp.solutions.holistic
mp_drawing = mp.solutions.drawing_utils
holistic = mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5)

# holistic model process image and return the results as keypoints
def mediapipe_detection(image,model):
    image  = cv2.cvtColor(image,cv2.COLOR_BGR2RGB)
    image.flags.writeable = False
    results = model.process(image)
    image.flags.writeable = True
    image  = cv2.cvtColor(image,cv2.COLOR_RGB2BGR)
    return image,results

                
                
def extract_keypoints(results):
    
    original_landmarks = [
        results.pose_landmarks,
        results.left_hand_landmarks,
        results.right_hand_landmarks
    ]
    
    outputs = []
    for shape in range(3):
        if(original_landmarks[shape]):
            lis = original_landmarks[shape].landmark
            pose = np.array([ [lis[res].x,lis[res].y] for res in pose_selected_landmarks[shape] ]).flatten()
        else:
            pose = np.zeros(len(pose_selected_landmarks[shape])*2)
        outputs.append(pose)
    return np.concatenate([outputs[0],outputs[1],outputs[2]])
            


def draw_landmark_from_results(image,results):
    image_rows, image_cols, _ = image.shape
    
    original_landmarks = [
        results.pose_landmarks,
        results.left_hand_landmarks,
        results.right_hand_landmarks
    ]

    
    for shape in range(3):
        if(original_landmarks[shape]):
            lis = original_landmarks[shape].landmark
            for idx in pose_selected_landmarks[shape]:
                point = lis[idx]
                landmark_px = mp_drawing._normalized_to_pixel_coordinates(point.x, point.y,
                                                           image_cols, image_rows)

                cv2.circle(image, landmark_px, 2, (0,0,255),
                         4)


def draw_landmark_from_array(image,keyPoints):
    image_rows, image_cols, _ = image.shape
    
    
    for i in range(len(keyPoints)//2):
        x = keyPoints[i*2]
        y = keyPoints[i*2+1]
        if(x!=0 and y!=0): 
            landmark_px = mp_drawing._normalized_to_pixel_coordinates(x,y,
                                                       image_cols, image_rows)
            cv2.circle(image, landmark_px, 2, (0,0,255),
                     4)

                

 
        

INFO: Created TensorFlow Lite XNNPACK delegate for CPU.


# 3 - read and process data

### 3.1 get data

In [200]:
data_path = os.path.join("..","..","..","data",'arabic-signs')
signs = sorted(os.listdir(data_path))

# for 10 signs

actions_ids= [
    0,1,2,3,4,5,6,7,8,9 # first 10 actions
]
actions = [
    'one','you','teacher','girl','tomorrow','mom','look','crazy','walk','agree'
]
n_actions = len(actions)

def get_one_class(sign_id):
    sign_path = os.path.join(data_path,signs[sign_id])
    lis = []
    for path in os.listdir(sign_path):
        lis.append(os.path.join(sign_path,path,"Color"))
    return lis


def get_frames(dir_path):
    lis = [] 
    for frame in os.listdir(dir_path):
        lis.append(os.path.join(dir_path,frame))
    return lis
# np.linspace(0,32,16,dtype=np.int16)
        

In [201]:
train_x = []
train_y = []
test_x = []
test_y = []

for i in range(10):
    data = np.array(get_one_class(i))
    indices = np.random.permutation(len(data))
    spliting = int(0.9*len(data))
    train_x.extend([str(i) for i in data[0:spliting] ])
    train_y.extend([i for j in range(spliting)])
    test_x.extend([str(i) for i in data[spliting:] ])
    test_y.extend([i for j in range(len(data)-spliting)])

In [202]:
print(
    len(train_x),
    len(train_y),
    len(test_x),
    len(test_y),
)

859 859 99 99


### 3.2 get frames from path

In [204]:
from keras.preprocessing.image import ImageDataGenerator, array_to_img, img_to_array, load_img
import random

class VideoProcessing:
    def __init__(self,num_frames,transformer=None):
        self.transformer = transformer # the datagenerator class
        self.num_frames = num_frames   # the num_frames per video
        self.seed = random.randint(1,100000000)
    
    
    def change_seed(self):
        self.seed = random.randint(1,100000000)
    
    def transform(self,frame):
        for trans_frame in self.transformer.flow(np.expand_dims(frame, axis=0),seed=self.seed):
            return np.squeeze(trans_frame.astype(np.uint8), axis=0)
        
        

#     def __capture_frames(self,video_path):
#         video = cv2.VideoCapture(video_path)
#         video_length = int(video.get(cv2.CAP_PROP_FRAME_COUNT)) - 1

#         count=0
#         frames = []
#         while video.isOpened():
#             ret, frame = video.read()
#             if not ret:
#                 continue
#             frames.append(frame)
#             count += 1
#             if (count > (video_length-1)):
#                 video.release()
#         video.release()
#         return np.array(frames)


#     def get_frames(self,video_path,num_frames):
#         # collect 2 extra frames and remove one in the beginnign and last one
#         num_frames+=4
        
#         frames = self.__capture_frames(video_path)
#         video_length = len(frames)
#         steps = video_length/num_frames
#         count=0
#         new_frames=[]
#         while count<video_length:
#             frame = frames[int(count)]
#             if(self.transformer !=None):
#                 frame = self.transform(frame)
#             new_frames.append(frame)
#             count+=steps
        
        
#         num_frames-=4
        
#         # return np.array(new_frames[:num_frames])
    
#         return  np.array(new_frames[2:num_frames+2])


    def get_frames(self,video_path,num_frames):
        lis = [] 
        video_frames = os.listdir(video_path)
        selected_frames = np.linspace(0,len(video_frames)-1,num_frames,dtype=np.int16)
        for index in selected_frames:
            frame = video_frames[index]
            lis.append(cv2.imread(os.path.join(video_path,frame)))
        return lis

    
    def extract_keypoints_video(self,frames=None,path=None,display_text=None):
        self.change_seed()
        if(display_text != None ):
            print(display_text,end="\r")

        if(frames==None):
            frames = self.get_frames(path,self.num_frames)
            
        output_key_points=[]
        output_images=[]

        for frame in frames:
            image, results = mediapipe_detection(frame, holistic)
            output_key_points.append(extract_keypoints(results))
            output_images.append(image)
        return np.array(output_images),np.array(output_key_points)
    
    
    
    
class VideosProcessing:
    def __init__(self,transformer,num_frames):
        self.processor = VideoProcessing(transformer=transformer,num_frames=num_frames)
        self.num_frames = num_frames
        self.transformer = transformer
        
    def stop_transofrmation(self):
        self.processor.transformer = None
        
    def enable_transformation(self):
        self.processor.transformer = self.transformer
        
        
    def convert_get_both(self,array):
        output = []
        frames_output=[]
        for index,video in enumerate(array):
            display_text = f"processing video : {index+1}/{len(array)}"
            frames,keypoints = self.processor.extract_keypoints_video(path=video,display_text=display_text)
            output.append(keypoints)
            frames_output.append(frames)
        return np.array(frames_output),np.array(output)
        
    def convert(self,array):
        output = []
        for index,video in enumerate(array):
            display_text = f"processing video : {index+1}/{len(array)}"
            frames,keypoints = self.processor.extract_keypoints_video(path=video,display_text=display_text)
            output.append(keypoints)
        return np.array(output)

    
    
datagen = ImageDataGenerator(
        rotation_range=10,
        width_shift_range=0.1,
        height_shift_range=0.1,
        shear_range=0.2,
        zoom_range=0.2,
        #horizontal_flip=True,
        fill_mode='nearest')



video_processing_obj = VideoProcessing(transformer=datagen,num_frames=16)
video_list_obj = VideosProcessing(transformer=datagen,num_frames=16)


### 3.3 test extracted images and frames

#### 3.3.1 get and view keypoints

In [208]:
# one_class = get_one_class(0,"train")
# video_list_obj.stop_transofrmation()
video_list_obj.enable_transformation()
frames_list,keypoints_list = video_list_obj.convert_get_both(train_x[30:33])

processing video : 3/3

In [374]:
data = get_one_class(1,"test")[:5] # get 5 videos with class label from training data
video_list_obj.stop_transofrmation()
frames_list,keypoints_list = video_list_obj.convert_get_both(data) # convert them to kye points

processing video : 5/5

In [209]:
# images,keypoints = video_processing_obj.extract_keypoints_video(path=train_X[250])

for video_num in range(len(frames_list)):
    images = frames_list[video_num]
    keypoints = keypoints_list[video_num]

    for index in range(16):

        image = images[index]

        keypoint = keypoints[index]

        draw_landmark_from_array(image,keypoint)

        cv2.imshow("frame",image)
        if cv2.waitKey(200) & 0xFF == ord('q'):
            break
        

cv2.destroyAllWindows()

#### 3.3.2 view keypoints only

In [249]:
one_class = get_one_class(5,"train")
# video_list_obj.stop_transofrmation()
video_list_obj.enable_transformation()
keypoints_list = video_list_obj.convert(one_class[60:62])

processing video : 5/5

In [251]:
# images,keypoints = video_processing_obj.extract_keypoints_video(path=train_X[250])

for keypoints in keypoints_list:

    for index in range(16):

        image = np.zeros((512,512,3))+255

        keypoint = keypoints[index]

        draw_landmark_from_array(image,keypoint)

        cv2.imshow("frame",image)
        if cv2.waitKey(200) & 0xFF == ord('q'):
            break
        

cv2.destroyAllWindows()

#### 3.3.3 view keypoints from numpy array

In [43]:
def fil_keypoints(array):
    output = array.copy()
    for i in range(2,len(output)):
        current_frame = output[i]
        prev_prev_frame = output[i-2]
        prev_frame = output[i-1]
        for index,num in enumerate(current_frame):
            if num==0:
                current_frame[index] = prev_frame[index]*2 - prev_prev_frame[index]
                
    return output
                
        

In [45]:
# keypoints_list = np.load(os.path.join("key_points","val",'1.npy'))

test_path,_ =  get_list(actions_ids,"val")

keypoints_list = val_X[:15] # use thing after loading text_X from keypoints directory
new_video_processing = VideoProcessing(transformer=None,num_frames=16)
for video_index,keypoints in enumerate(keypoints_list):
    images = new_video_processing.get_frames(test_path[video_index],16)
    new_keypoints = fil_keypoints(keypoints)
    for index in range(16):

        image = images[index]

        keypoint = new_keypoints[index]

        draw_landmark_from_array(image,keypoint)

        cv2.imshow("frame",image)
        if cv2.waitKey(200) & 0xFF == ord('q'):
            break
        

cv2.destroyAllWindows()

In [350]:
keypoints_list = np.load(os.path.join("key_points","val",'1.npy'))

for keypoints in keypoints_list:

    for index in range(16):

        image = np.zeros((512,512,3))+255

        keypoint = keypoints[index]

        draw_landmark_from_array(image,keypoint)

        cv2.imshow("frame",image)
        if cv2.waitKey(200) & 0xFF == ord('q'):
            break
        

cv2.destroyAllWindows()

In [40]:
cv2.destroyAllWindows()

# 4 - extract keypoint and save them

### 4.1  extract training

In [223]:
# collect realdata and 5 different transformations

label_path = os.path.join("key_points","train","labels.npy")
np.save(label_path,train_y)


num_training_iterations = 6

for transformation_index in range(num_training_iterations):
    data_path = os.path.join("key_points","train",str(transformation_index)+".npy")
    
    print("iteration :",transformation_index," "*40)
    
    if(transformation_index == 0):
        video_list_obj.stop_transofrmation()
    else:
        video_list_obj.enable_transformation()
    
    
    data = train_x  # get videos with class label from training data
    data = video_list_obj.convert(data) # convert them to kye points
    np.save(data_path,data)



iteration : 0                                         
iteration : 1                                         
iteration : 2                                         
iteration : 3                                         
iteration : 4                                         
iteration : 5                                         
processing video : 859/859

### 4.2  extract testing

In [224]:
data_path = os.path.join("key_points","test","data.npy")
label_path = os.path.join("key_points","test","labels.npy")

# save labels
np.save(label_path,test_y)

# save data
video_list_obj.stop_transofrmation()
data = test_x  # get videos with class label from training data
data = video_list_obj.convert(data) # convert them to kye points
np.save(data_path,data)



processing video : 99/99

### 4.3 load and test all

In [30]:
# both depends on actions_id -> [0,1,2,3,4,5,6,7,8,9]
def load_dir(dir_name,data_temp=None,labels_temp=None,actions_id=None):
    if actions_id == None or action_id == "all":
        actions_id = [int(s.split('.')[0]) for s in os.listdir(os.path.join(dir_name))]
        actions_id.sort()
    for action_id in actions_id:
        new_array = np.load(os.path.join(dir_name,f"{action_id}.npy"))
        labels_array = np.array([action_id]*len(new_array))

        if(type(data_temp) == np.ndarray):
            data_temp = np.concatenate([data_temp,new_array])
            labels_temp = np.concatenate([labels_temp,labels_array])
        else:
            data_temp = new_array
            labels_temp = labels_array
    
    return data_temp,labels_temp

def load_mul_dir(parent_dir):
    data_temp = None
    labels_temp = None
    for transformation_index in range(len(os.listdir(parent_dir))):
        dir_name = os.path.join(parent_dir,str(transformation_index))
        data_temp,labels_temp = load_dir(dir_name,data_temp,labels_temp)
    return data_temp,labels_temp


    

In [31]:
train_X,train_Y = load_mul_dir(os.path.join("key_points","train"))
val_X,val_Y = load_dir(os.path.join("key_points","val"))
test_X,test_Y = load_dir(os.path.join("key_points","test"))

In [16]:
print(
train_X.shape,
    train_Y.shape,
    val_X.shape,
    val_Y.shape,
    test_X.shape,
    test_Y.shape
    
)

(7446, 16, 62) (7446,) (190, 16, 62) (190,) (168, 16, 62) (168,)


In [373]:

# test video on train_x
keypoints_list = train_x
for keypoints in keypoints_list:

    for index in range(16):

        image = np.zeros((512,512,3))+255

        keypoint = keypoints[index]

        draw_landmark_from_array(image,keypoint)

        cv2.imshow("frame",image)
        if cv2.waitKey(200) & 0xFF == ord('q'):
            break
        

cv2.destroyAllWindows()

# extra

In [None]:
perm = np.random.permutation(len(train_data))

train_X = train_data[perm]
train_Y = train_labels[perm]
val_X = val_data
val_Y = val_labels
test_X = test_data
test_Y = test_labels


In [223]:
print(
train_X.shape,
train_Y.shape,
val_X.shape,
val_Y.shape,
test_X.shape,
test_Y.shape,
)

(4482, 20, 62) (4482,) (118, 20, 62) (118,) (100, 20, 62) (100,)
