In [2]:
import cv2
import mediapipe as mp
import numpy as np
import math
import datetime
import pandas as pd
import os 
import pickle

#### Calculate Angle Between Three Point (1.shoulder,2.elbow,3.wrist)

In [3]:
def calculate_angle(SHOULDER,ELBOW,WRIST):
    """
    _ calculate_angle:Function to calculate angle between three point (1.shoulder,2.elbow,3.wrist)
    _ shoulder:pose landmark number 11 and 12 containe X_position and Y_position
    _ elbow:pose landmark number 13 and 14 containe X_position and Y_position
    _ wrist:pose landmark number 15 and 16 containe X_position and Y_position
    """
    SHOULDER = np.array(SHOULDER) # First
    ELBOW = np.array(ELBOW) # Mid
    WRIST = np.array(WRIST) # End
    # https://www.medcalc.org/manual/atan2-function.php
    radians = np.arctan2(WRIST[1]-ELBOW[1] , WRIST[0]-ELBOW[0]) - np.arctan2(SHOULDER[1]-ELBOW[1] , SHOULDER[0]-ELBOW[0] )
    angle = np.abs(radians*180.0/np.pi)
    if angle > 180:angle = 360-angle
    return angle

#### filtter Points in Curent Frame 

In [4]:
def filtter_points(landmark_1):
    """
    _ filtter_points:Function to fillter all points in specific landmark (shoulder,elbow,wrist,hip,knee,ankle "_left&right_")
    _ landmark_1:pose landmark for curent frame
    """
    my_list = []
    for ind in range(len(landmark_1)):
        if ((ind>0)and(ind<=10))or((ind>=17)and(ind<=32)):continue
        p1 = [landmark_1[ind].x,landmark_1[ind].y]
        my_list.append(np.array([[p1[0]],[p1[1]]]))
    return my_list

#### Kalman Filter Equation Prediction Update 

In [5]:
def Prediction_update(X,A,P,Q):
    """
    _ Prediction_update:Function to Applying Prediction Equation
    _ X:State Matrix [X_position,Y_position,X_velocity,Y_velocity]
    _ A:State Transition
    _ P:Process Covariance Matrix
    _ Q:Erorr Terms
    """
    X = A.dot(X)
    TEPM = A.dot(P)
    P = TEPM.dot(A.T)+Q
    return X,P

##### Kalman Filter Equation Measurement Update

In [6]:
def Measurement_update(Z,X,P,H,R):
    """
    _ Measurement_update:Function to Applying Measurement Equation
    _ X:State Matrix [X_position,Y_position,X_velocity,Y_velocity]
    _ P:Process Covariance Matrix
    _ H:unit matrix
    _ R:Erorr Terms of the Measurement
    """
    I = np.array([[1,0],
                  [0, 1]])
    
    T1 = P.dot(H.T)
    T2 = H.dot(P)
    T3 = T2.dot(H.T)
    T3 = T3 + R
    K = T1.dot(np.linalg.inv(T3))
    
    T1 = H.dot(X)
    T2 = Z - T1
    T3 = K.dot(T2)
    X = X + T3
    
    T1 = K.dot(H)
    T2 = I - T1
    P = T2.dot(P)

    return X,P,K

#### get all video in any root path

In [7]:
def get_videos(path_folder):
    """
    _ get_videos:Function to get all videos in root path
    _ input:(path_folder) the root path for all dirctory that contant video
    _ output:(folders) all dirctory in root path (all_video_paths) all paths video
    """
    folders = os.listdir(path_folder)
    all_video_paths = []
    for folder in folders:
        curr_dataset_path = os.path.join(path_folder, folder)
        try:videos = os.listdir(curr_dataset_path)
        except:
            if ".mp4" in curr_dataset_path:
                all_video_paths.append(curr_dataset_path)
                continue
            else:continue
        for video_p in videos:all_video_paths.append(os.path.join(curr_dataset_path, video_p))
    return folders , all_video_paths

#### Calculate Average for List or Array of Numbers

In [8]:
def Average(lst):
    """
    _Average:Function to calculate mean for list or array
    _input:List or array of numbers
    _output:the mean for input numbre
    """
    return sum(lst) / len(lst)

#### Save any Object We Need 

In [9]:
def save_object(obj, filename):
    """
    _save_object:Function to save any object
    _input:(obj) the object that we want save , (filename) Memorization path 
    """
    with open(filename+".pkl", 'wb') as outp:
        pickle.dump(obj, outp, pickle.HIGHEST_PROTOCOL)
    outp.close()

#### Save any Object

In [10]:
def load_object(filename):
    """
    _load_object:Function to load any object
    _input:object path 
    _output:the object
    """
    with open(filename+".pkl", 'rb') as outp:
        loaded_object = pickle.load(outp)
    outp.close()
    return loaded_object

#### calculate angles between shoulder and elbow and wrist point for left and right hand

In [11]:
def calculate_left_right_arm_angle(X):
    """
    _ calculate_left_right_arm_angle:Function to calculate angle between three points from arm 
    _ input: (landmarks) all pose points , (mp_pose) contante all landmark name
    _ output: left and right angle for tow arm
    """
    LEFTangle = calculate_angle([X[1][0][0],X[1][1][0]],[X[3][0][0],X[3][1][0]],[X[5][0][0],X[5][1][0]])
    RIGHTangle = calculate_angle([X[2][0][0],X[2][1][0]],[X[4][0][0],X[4][1][0]],[X[6][0][0],X[6][1][0]])
    return LEFTangle,RIGHTangle

#### Check if ram is moving or not

In [12]:
def check_hand_moving(pre_angle,cur_angle): #####
    """
    _ check_hand_moving: Function to check if arm moving or not 
    _ input: (pre_angle) angle from previose frame (cur_angle) angle from curent frame
    _ output: (True) if arm moving (False) if arm not moving 
    """
    angle = cur_angle-pre_angle
    stage=False
    if angle>5:stage = True
    return stage

#### give the results of the calculation

In [13]:
def hand_get_decistion(l_is_moved,r_is_moved):  #####
    """ 
    _ hand_get_decistion: get decis
    _ input: ("l_is_moved") true if left arm is moved or false if not , ("r_is_moved") true if right arm is moved or false if not
    _ output: (Moving Hand , 1) or (Rest Hand 0)
    """
    if l_is_moved or r_is_moved:return "Moving Hand",1
    return "Rest Hand",0

In [14]:
def Drawing_utils(image,results,mp_pose):
    """
    _To draw joint points on the image
    """
    mp_drawing = mp.solutions.drawing_utils
    mp_drawing.draw_landmarks(image,results.pose_landmarks,mp_pose.POSE_CONNECTIONS,
                              mp_drawing.DrawingSpec(color=(245,117,66),thickness=4,circle_radius=2),
                              mp_drawing.DrawingSpec(color=(245,66,230),thickness=4))

In [15]:
def creat_kalman_parameters(delta_t):
    A = np.array([[1, delta_t],[0, 1]]) # STATE MATRIC (The relationship between system variables in the equations of motion)
    P = np.eye(2)*10 # Guess Kalmen , Uncertainty Matrix , The relationship of state variables , external forces
    H = np.eye(2) # Measurement matrix is used to calculate Calmen error
    R = np.eye(2)*0.05 # Noise added for update
    Q =np.eye(2)*0.1 # Noise added for measurement
    X = [np.array([[0], [0]]) for _ in range(7)]
    return A,P,H,R,Q,X

In [16]:
def dist(p1,p2):
    return math.sqrt((p2[0]-p1[0])**2+(p2[1]-p1[1])**2)

In [17]:
def check_body_moving(i,head_points,shoulder_points): 
    """
    _To check whether the presenter's body has moved or turned
    """
    if (head_points[-1]<shoulder_points[-2][1]) or (head_points[-1]>shoulder_points[-2][0]):return True
    if (shoulder_points[-2][0]-shoulder_points[-2][1])>(shoulder_points[-1][0]-shoulder_points[-1][1]):return True
    return False

#### Download the video, perform the required calculations, and then save result

In [19]:
def body(X,head_points,shoulder_points,temp,index):
    """
    _Gives the final decision regarding body movement
    """
    NOSE = X[0][0][0]
    LEFT_SHOULDER = X[1][0][0]
    RIGHT_SHOULDER = X[2][0][0]
    
    if NOSE and temp<2:
        head_points.append(NOSE)
        shoulder_points.append([LEFT_SHOULDER,RIGHT_SHOULDER])
        temp+=1
    
    if len(head_points)>=1:
        head_points.append(NOSE)
        if check_body_moving(index,head_points,shoulder_points):
            shoulder_points.append([LEFT_SHOULDER,RIGHT_SHOULDER])
            decistion_body,Ndecistion_body = "Moving Body",1
        else:decistion_body,Ndecistion_body = "Rest Body",0
    return decistion_body,Ndecistion_body

In [20]:
def hand(X,freeLEFT,freeRIGHT):
    """
    _Gives the final decision regarding hand movement
    """
    LEFTangle , RIGHTangle = calculate_left_right_arm_angle(X)
    freeLEFT.append(LEFTangle)
    freeRIGHT.append(RIGHTangle)
    if len(freeLEFT)>1:l_is_moved = check_hand_moving(freeLEFT[-2],freeLEFT[-1])
    else:l_is_moved = False
    if len(freeRIGHT)>1:r_is_moved =check_hand_moving(freeRIGHT[-2],freeRIGHT[-1])
    else:r_is_moved = False
    decistion_hand,Ndecistion_hand = hand_get_decistion(l_is_moved,r_is_moved)
    return decistion_hand,Ndecistion_hand

In [50]:
def grouping_frames(X_List,Type):
    temp,t = [],[]
    for i,x in enumerate(X_List):
        t.append(x)
        if len(t)%15==0:
            X = sum(t)
            if Type=="hand":
                if X>=5:temp.append(1)
                else:temp.append(0)
            if Type=="body":
                if X>=3:temp.append(1)
                else:temp.append(0)
            t = [] 
    return temp

In [51]:
def video_calss(hand_list,body_list):
    number_hand_positive = sum(hand_list)
    all_hand_frames = len(hand_list)
    hand_score = 0.65*(number_hand_positive/all_hand_frames)
    number_body_positive = sum(body_list)
    all_body_frames = len(body_list)
    body_score = 0.35*(number_body_positive/all_body_frames)
    final_score = hand_score+body_score
    return 1 if final_score>0.35 else 0

In [75]:
def convert_to_csv(dectionary,pre_name):
    for name in dectionary:
        for key in dectionary[name]:
            for i in range(Max-len(dectionary[name][key])):
                dectionary[name][key].append(-1)
        df = pd.DataFrame(dectionary[name])
        df.to_csv(pre_name+" "+name+'.csv', index=False)
        df.to_excel (pre_name+" "+name+'.xlsx', index = None, header=True)

In [46]:
def pipeline_with_kalman(i,path_A):
    print(path_A)
    path_A_ = path_A.split("\\")
    key = path_A_[-1].split(".")[-2].split("-")[-1]+" "+path_A_[-2].split("-")[-1]
    print(i," : ",key)
    ind = []
    freeRIGHT,freeLEFT,freeRIGHT_k,freeLEFT_k,freeRIGHT_nk,freeLEFT_nk = [],[],[],[],[],[]
    head_points_without_k,shoulder_points_without_k,head_points_with_k,shoulder_points_with_k,head_points,shoulder_points = [],[],[],[],[],[]
    decistion_body_without_k,decistion_body_with_k,decistion_body,decistion_hand_without_k,decistion_hand_with_k,decistion_hand = "","","","","",""
    HandFrameLable_kalman['without_kalman'][key],HandFrameLable_kalman['with_kalman'][key],HandFrameLable_kalman['Hybrid'][key] = [],[],[]
    BodyFrameLable_kalman['without_kalman'][key],BodyFrameLable_kalman['with_kalman'][key],BodyFrameLable_kalman['Hybrid'][key] = [],[],[]
    points_in_frames,point_list,index,indexs = [],[],0,0
    mp_pose = mp.solutions.pose
    cap = cv2.VideoCapture(path_A)
    if (cap.isOpened() == False): 
        print("Error reading video file")
    frame_width,frame_height = int(cap.get(3)),int(cap.get(4))
    size = (frame_width, frame_height)
    result = cv2.VideoWriter(path_A[:-4]+" Producted.avi",cv2.VideoWriter_fourcc(*'MJPG'),10, size)
    frames = cap.get(cv2.CAP_PROP_FRAME_COUNT)
    fps = cap.get(cv2.CAP_PROP_FPS)
    
    seconds = round(frames / fps)
    video_time,delta_t = datetime.timedelta(seconds=seconds),1/fps
    
    A,P,H,R,Q,X = creat_kalman_parameters(delta_t)
    temp_1,temp_2,temp_3 = 0,0,0
    with mp_pose.Pose(min_detection_confidence=0.5,min_tracking_confidence=0.5) as pose:
        while cap.isOpened():
            ind.append(indexs)
            ret, frame = cap.read()
            if (not ret) or (cv2.waitKey(10)==ord('q')):break
            image = cv2.cvtColor(frame,cv2.COLOR_BGR2RGB)
            results = pose.process(image)
            image = cv2.cvtColor(image,cv2.COLOR_RGB2BGR)
            try:
                landmarks = results.pose_landmarks.landmark
                points_in_frames.append(landmarks)
                point_list = filtter_points(points_in_frames[index])
                if index==1:X = point_list
                for i in range(len(point_list)):
                    Z = point_list[i]
                    X[i] , P = Prediction_update(X[i],A,P,Q)
                    X[i] , P , K = Measurement_update(Z,X[i],P,H,R)
                
                decistion_body_without_k,Ndecistion_body_without_k = body(point_list,head_points_without_k,shoulder_points_without_k,temp_1,index)
                decistion_body_with_k,Ndecistion_body_with_k = body(X,head_points_with_k,shoulder_points_with_k,temp_2,index)
                if (len(point_list)==7):decistion_body,Ndecistion_body = body(point_list,head_points,shoulder_points,temp_3,index)
                else:decistion_body,Ndecistion_body = body(X,head_points,shoulder_points)
                
                BodyFrameLable_kalman["without_kalman"][key].append(Ndecistion_body_without_k)
                BodyFrameLable_kalman["with_kalman"][key].append(Ndecistion_body_with_k)
                BodyFrameLable_kalman["Hybrid"][key].append(Ndecistion_body)
                
                decistion_hand_without_k,Ndecistion_hand_without_k = hand(point_list,freeLEFT_nk,freeRIGHT_nk)
                decistion_hand_with_k,Ndecistion_hand_with_k = hand(X,freeLEFT_k,freeRIGHT_k)
                
                if (len(point_list)==7):decistion_hand,Ndecistion_hand = hand(point_list,freeLEFT,freeRIGHT)
                else:decistion_hand,Ndecistion_hand = hand(X,freeLEFT,freeRIGHT)
                    
                HandFrameLable_kalman['without_kalman'][key].append(Ndecistion_hand_without_k)
                HandFrameLable_kalman['with_kalman'][key].append(Ndecistion_hand_with_k)
                HandFrameLable_kalman['Hybrid'][key].append(Ndecistion_hand)
                index+=1
            except:pass
            
            cv2.putText(image,decistion_hand_without_k+": without_kalman",(0,25),cv2.FONT_HERSHEY_SIMPLEX,1,(255,255,255),2)
            cv2.putText(image,decistion_body_without_k+" : without_kalman",(0,60),cv2.FONT_HERSHEY_SIMPLEX,1,(255,255,255),2)
            
            cv2.putText(image,decistion_hand_with_k+" : with_kalman",(0,95),cv2.FONT_HERSHEY_SIMPLEX,1,(255,255,255),2)
            cv2.putText(image,decistion_body_with_k+" : with_kalman",(0,130),cv2.FONT_HERSHEY_SIMPLEX,1,(255,255,255),2)
            
            cv2.putText(image,decistion_hand+" : Hybrid",(0,165),cv2.FONT_HERSHEY_SIMPLEX,1,(255,255,255),2)
            cv2.putText(image,decistion_body+" : Hybrid",(0,200),cv2.FONT_HERSHEY_SIMPLEX,1,(255,255,255),2)
                        
            decistion_hand_with_k,decistion_body_with_k,decistion_hand,decistion_body,decistion_hand_without_k,decistion_body_without_k  ='','','','','',''
            
            Drawing_utils(image,results,mp_pose)
            result.write(image)
            indexs+=1
            cv2.imshow(key,image)
            if cv2.waitKey(10)==ord('q'):break
    indall.append(ind)
    cap.release()
    result.release()
    cv2.destroyAllWindows()

In [47]:
path_folder = "D:\\DataSet\\representatiion_scoring_data_set\\Presenter No11-Rafeef" ########
folders ,videos = get_videos(path_folder)
HandFrameLable_kalman = {}
HandFrameLable_kalman['without_kalman'] = {}
HandFrameLable_kalman['with_kalman'] = {}
HandFrameLable_kalman['Hybrid'] = {}


BodyFrameLable_kalman = {}
BodyFrameLable_kalman['without_kalman'] = {}
BodyFrameLable_kalman['with_kalman'] = {}
BodyFrameLable_kalman['Hybrid'] = {}
indall = []
folders , all_video_paths = get_videos(path_folder)
for j,i in enumerate([all_video_paths[-1]]):
    try:pipeline_with_kalman(j,i)
    except:print("exception")

D:\DataSet\representatiion_scoring_data_set\Presenter No11-Rafeef\C_Presenter No11-GroupC.mp4
0  :  GroupC Rafeef


In [48]:
save_object(HandFrameLable_kalman,"HandFrameLable_kalman")
save_object(BodyFrameLable_kalman,"BodyFrameLable_kalman")
save_object(indall,"indall")
# HandFrameLable_kalman = load_object('HandFrameLable_kalman')
# BodyFrameLable_kalman = load_object('BodyFrameLable_kalman')

In [73]:
print("______________________________Hand Moveing______________________________")
name_video,hand_grouping_frames,body_grouping_frames = [],[],[]
dic_hand_grouping_frames,dic_body_grouping_frames = {},{}
Max = 0
for Type in HandFrameLable_kalman:
    print(Type,".......\n")
    dic_hand_grouping_frames[Type]={}
    for name in HandFrameLable_kalman[Type]:
        print(name,":")
        Max = max(Max,len(HandFrameLable_kalman[Type][name]))
        temp = grouping_frames(HandFrameLable_kalman[Type][name],'hand')
        hand_grouping_frames.append(temp)
        dic_hand_grouping_frames[Type][name] = temp
        name_video.append(name)
        print("number of frame : ",len(HandFrameLable_kalman[Type][name]))
        print("number of positive : ",sum(HandFrameLable_kalman[Type][name]))
        print("number of negative : ",len(HandFrameLable_kalman[Type][name])-sum(HandFrameLable_kalman[Type][name]))
        print("average of positive : ",sum(HandFrameLable_kalman[Type][name])/len(HandFrameLable_kalman[Type][name]))
        print("average of negative : ",(len(HandFrameLable_kalman[Type][name])-sum(HandFrameLable_kalman[Type][name]))/len(HandFrameLable_kalman[Type][name]))
        print("============================================================================================")
print("\n############################################################################################\n")
print("______________________________Body Moveing______________________________")        
for Type in BodyFrameLable_kalman:
    print(Type,".......\n")
    dic_body_grouping_frames[Type]={}
    for name in BodyFrameLable_kalman[Type]:
        print(name,":\n")
        Max = max(Max,len(BodyFrameLable_kalman[Type][name]))
        temp = grouping_frames(BodyFrameLable_kalman[Type][name],'body')
        body_grouping_frames.append(temp)
        dic_body_grouping_frames[Type][name] = temp
        
        print("number of frame : ",len(BodyFrameLable_kalman[Type][name]))
        print("number of positive : ",sum(BodyFrameLable_kalman[Type][name]))
        print("number of negative : ",len(BodyFrameLable_kalman[Type][name])-sum(BodyFrameLable_kalman[Type][name]))
        print("average of positive : ",sum(BodyFrameLable_kalman[Type][name])/len(BodyFrameLable_kalman[Type][name]))
        print("average of negative : ",(len(BodyFrameLable_kalman[Type][name])-sum(BodyFrameLable_kalman[Type][name]))/len(BodyFrameLable_kalman[Type][name]))
        print("============================================================================================")
save_object(hand_grouping_frames,"hand_grouping_frames") 
save_object(body_grouping_frames,"body_grouping_frames")
save_object(name_video,"name_video")

# load_object('hand')
# load_object('body')

______________________________Hand Moveing______________________________
without_kalman .......

GroupC Rafeef :
number of frame :  2997
number of positive :  0
number of negative :  2997
average of positive :  0.0
average of negative :  1.0
with_kalman .......

GroupC Rafeef :
number of frame :  2997
number of positive :  0
number of negative :  2997
average of positive :  0.0
average of negative :  1.0
Hybrid .......

GroupC Rafeef :
number of frame :  2997
number of positive :  0
number of negative :  2997
average of positive :  0.0
average of negative :  1.0

############################################################################################

______________________________Body Moveing______________________________
without_kalman .......

GroupC Rafeef :

number of frame :  2997
number of positive :  1566
number of negative :  1431
average of positive :  0.5225225225225225
average of negative :  0.4774774774774775
with_kalman .......

GroupC Rafeef :

number of frame :  299

In [79]:
print("Final evaluation of the video : ")
for i in range(len(hand_grouping_frames)):
    print(name_video[i])
    print(video_calss(hand_grouping_frames[i],body_grouping_frames[i]))

Final evaluation of the video : 
GroupC Rafeef
0
GroupC Rafeef
0
GroupC Rafeef
0


In [76]:
convert_to_csv(BodyFrameLable_kalman,'results_for_all_frame_body')
convert_to_csv(HandFrameLable_kalman,'results_for_all_frame_hand')

In [77]:
convert_to_csv(dic_body_grouping_frames,'results_after_grouping_body')
convert_to_csv(dic_hand_grouping_frames,'results_after_grouping_hand')