In [1]:
import cv2
import mediapipe as mp
import time
import pandas as pd
import numpy as np

In [2]:
def landmmark_work(results, mpPose, mpDraw,mpHol, frame):
    if (results.pose_landmarks):
        mpDraw.draw_landmarks(frame, results.face_landmarks, mpHol.FACEMESH_TESSELATION, 
                                 mpDraw.DrawingSpec(color=(80,110,10), thickness=1, circle_radius=1),
                                 mpDraw.DrawingSpec(color=(80,256,121), thickness=1, circle_radius=1)
                                 )
        
        # 2. Right hand
        mpDraw.draw_landmarks(frame, results.right_hand_landmarks, mpHol.HAND_CONNECTIONS, 
                                 mpDraw.DrawingSpec(color=(80,22,10), thickness=2, circle_radius=4),
                                 mpDraw.DrawingSpec(color=(80,44,121), thickness=2, circle_radius=2)
                                 )

        # 3. Left Hand
        mpDraw.draw_landmarks(frame, results.left_hand_landmarks, mpHol.HAND_CONNECTIONS, 
                                 mpDraw.DrawingSpec(color=(121,22,76), thickness=2, circle_radius=4),
                                 mpDraw.DrawingSpec(color=(121,44,250), thickness=2, circle_radius=2)
                                 )

        # 4. Pose Detections
        mpDraw.draw_landmarks(frame, results.pose_landmarks, mpHol.POSE_CONNECTIONS, 
                                 mpDraw.DrawingSpec(color=(245,117,66), thickness=2, circle_radius=4),
                                 mpDraw.DrawingSpec(color=(245,66,230), thickness=2, circle_radius=2)
                                 )

def request_capture(type=0, path=None):

    if(type):
        capture = cv2.VideoCapture(path)
    else:
        capture = cv2.VideoCapture(0)
    return capture

In [3]:
mpPose = mp.solutions.pose
mpDraw = mp.solutions.drawing_utils
mp_holistic = mp.solutions.holistic

In [4]:


holistic = mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5)
    
#0 for webcam
#1 for default video
path_to_file = 'gestures/1.mp4'
capture = request_capture(1, path_to_file)
pTime = 0

while(1):
    ret , frame = capture.read()
    try:
        #mpPose processa cada frame no formato RGB
        frameRGB = cv2.cvtColor(frame,cv2.COLOR_RGB2BGR)
        results = holistic.process(frameRGB)

        #R_landmark = results.pose_landmarks

        cTime = time.time()
        
        landmmark_work(results, mpPose, mpDraw, mp_holistic, frame)
        
        fps = 1/(cTime - pTime)
        pTime = cTime

        cv2.putText(frame, str(int(fps)), (60,60), cv2.FONT_HERSHEY_PLAIN, 3, (255,100,120),3)
        cv2.imshow("Video", frame)
    except:
        break

    k = cv2.waitKey(5) & 0xff
    if k == 27:
        break

capture.release()
cv2.destroyAllWindows()

Export Landmarks

In [5]:
import csv
import os
import numpy as np

In [6]:
num_coords = len(results.pose_landmarks.landmark) + len(results.face_landmarks.landmark)
num_coords

501

In [7]:
colunas = ['x', 'y', 'z', 'v']

In [8]:
'''df = pd.DataFrame()
for i in range(1,num_coords):
    data = {"x": results.pose_landmarks.landmark[i].x, "y":results.pose_landmarks.landmark[i].y, "z":results.pose_landmarks.landmark[i].z, "v":results.pose_landmarks.landmark[i].visibility}
    df = df.append(data, ignore_index=True)'''

'df = pd.DataFrame()\nfor i in range(1,num_coords):\n    data = {"x": results.pose_landmarks.landmark[i].x, "y":results.pose_landmarks.landmark[i].y, "z":results.pose_landmarks.landmark[i].z, "v":results.pose_landmarks.landmark[i].visibility}\n    df = df.append(data, ignore_index=True)'

In [9]:
filename = 'coord.csv'

In [11]:
landmarks_class = ['class']
for i in range (1, num_coords+1):
    landmarks_class += ['x{}'.format(i), 'y{}'.format(i) , 'z{}'.format(i), 'v{}'.format(i)]

In [12]:
with open(filename, mode='w', newline='') as f:
    csv_writer = csv.writer(f, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL)
    csv_writer.writerow(landmarks_class)

In [37]:
class_name = "airsquat"

In [20]:
path = './gestures'

    
dir = path
sub_dir = [name for name in os.listdir(dir) if os.path.isdir(os.path.join(dir, name))]

In [21]:
sub_dir

['physical', 'testing']

In [23]:
for i in sub_dir:
        class_name = i
        path_to_file = dir+'/'+i

In [24]:
path_to_file

'./gestures/testing'

In [40]:
holistic = mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5)
    
#0 for webcam
#1 for default video
path_to_file = 'gestures/physical/airsquat.mp4'
capture = request_capture(1, path_to_file)
pTime = 0
while(1):
    ret , frame = capture.read()

    #mpPose processa cada frame no formato RGB

    try:
        image = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        image.flags.writeable = False


        results = holistic.process(image)


        image.flags.writeable = True   
        image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)

        #R_landmark = results.pose_landmarks

        cTime = time.time()
    
        landmmark_work(results, mpPose, mpDraw, mp_holistic, frame)
    
        fps = 1/(cTime - pTime)
        pTime = cTime
    
        cv2.putText(frame, str(int(fps)), (60,60), cv2.FONT_HERSHEY_PLAIN, 3, (255,100,120),3)
        cv2.imshow("Video", frame)


        try:
            # Extract Pose landmarks
            pose = results.pose_landmarks.landmark
            pose_row = list(np.array([[landmark.x, landmark.y, landmark.z, landmark.visibility] for landmark in pose]).flatten())

            if(results.face_landmarks):
              face = results.face_landmarks.landmark
              face_row = list(np.array([[landmark.x, landmark.y, landmark.z, landmark.visibility] for landmark in face]).flatten())
            else:
              face_row = list(np.zeros(468*4).flatten())
        
            # Concate rows
            row = pose_row+face_row
        
            # Append class name 
            row.insert(0, class_name)
        
            # Export to CSV
            with open(filename, mode='a', newline='') as f:
                csv_writer = csv.writer(f, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL)
                csv_writer.writerow(row) 
        
        except:
            pass
    except:
        pass


    k = cv2.waitKey(5) & 0xff
    if k == 27:
        break

capture.release()
cv2.destroyAllWindows()

Model

In [42]:
data = pd.read_csv(filename)

In [43]:
data

Unnamed: 0,class,x1,y1,z1,v1,x2,y2,z2,v2,x3,...,z499,v499,x500,y500,z500,v500,x501,y501,z501,v501
0,fighting,0.317064,0.297323,-0.198524,0.997053,0.315736,0.266954,-0.166135,0.996997,0.316682,...,0.001840,0.0,0.320197,0.274789,0.017987,0.0,0.320573,0.271809,0.019195,0.0
1,fighting,0.314465,0.297350,-0.259700,0.997178,0.312838,0.267185,-0.229077,0.997111,0.314278,...,0.000467,0.0,0.317291,0.276397,0.015993,0.0,0.317641,0.273520,0.017142,0.0
2,fighting,0.370492,0.324895,-0.288756,0.990647,0.371475,0.292176,-0.254000,0.990152,0.373409,...,0.000990,0.0,0.375294,0.298615,0.016953,0.0,0.375780,0.295943,0.018104,0.0
3,fighting,0.343145,0.327347,-0.119479,0.998767,0.340826,0.297875,-0.095216,0.998881,0.341833,...,-0.000186,0.0,0.318080,0.324639,-0.000060,0.0,0.316504,0.328040,-0.000128,0.0
4,fighting,0.349382,0.307495,-0.224579,0.997990,0.347115,0.274173,-0.191923,0.997874,0.347782,...,0.001140,0.0,0.346726,0.284535,0.017149,0.0,0.347021,0.281538,0.018390,0.0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
2741,airsquat,0.680632,0.402722,-0.179848,0.999963,0.686719,0.385743,-0.157169,0.999964,0.690781,...,-0.000568,0.0,0.692350,0.380450,0.003409,0.0,0.693318,0.378014,0.003532,0.0
2742,airsquat,0.680397,0.387492,-0.184402,0.999966,0.686644,0.369842,-0.162563,0.999967,0.690736,...,-0.000680,0.0,0.692603,0.363168,0.003342,0.0,0.693555,0.360508,0.003481,0.0
2743,airsquat,0.680354,0.368732,-0.194770,0.999969,0.686593,0.351571,-0.172129,0.999970,0.690677,...,-0.000434,0.0,0.692380,0.344320,0.003549,0.0,0.693324,0.341614,0.003698,0.0
2744,airsquat,0.680336,0.349574,-0.194855,0.999972,0.686582,0.332461,-0.171762,0.999973,0.690663,...,-0.000496,0.0,0.692341,0.325188,0.003433,0.0,0.693321,0.322493,0.003564,0.0


In [44]:
data['class'].value_counts()

airsquat       1974
jumpingjack     575
fighting        116
pushup           81
Name: class, dtype: int64

In [45]:
data.iloc[:, :1]

Unnamed: 0,class
0,fighting
1,fighting
2,fighting
3,fighting
4,fighting
...,...
2741,airsquat
2742,airsquat
2743,airsquat
2744,airsquat


In [46]:
classes = data["class"].unique()

In [47]:
data["class"].isna().sum()

0

In [48]:
from sklearn.model_selection import train_test_split
from sklearn import preprocessing

In [49]:
le = preprocessing.LabelEncoder()

In [50]:
data['class'] = le.fit_transform(data['class'])

In [51]:
le.classes_

array(['airsquat', 'fighting', 'jumpingjack', 'pushup'], dtype=object)

In [52]:
X = data.iloc[:, 1:]
y = data.iloc[:, :1]

X_train, X_test, y_train, y_test = train_test_split(X, y, train_size=0.1, random_state=40)

In [53]:
from xgboost import XGBClassifier


xgb = XGBClassifier(n_estimators = 1000)

In [54]:
xgb.fit(X_train, y_train)

In [55]:
predicted = xgb.predict(X_test)

In [56]:
from sklearn.metrics import f1_score


f1_score(predicted,y_test, average='macro')

0.9362851605112147

from vid

In [25]:
from sklearn.pipeline import make_pipeline 
from sklearn.preprocessing import StandardScaler 

from sklearn.linear_model import LogisticRegression, RidgeClassifier
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier

In [26]:
pipelines = {
    'lr':make_pipeline(StandardScaler(), LogisticRegression()),
    'rc':make_pipeline(StandardScaler(), RidgeClassifier()),
    'rf':make_pipeline(StandardScaler(), RandomForestClassifier()),
    'gb':make_pipeline(StandardScaler(), GradientBoostingClassifier()),
}

In [27]:
fit_models = {}
for algo, pipeline in pipelines.items():
    model = pipeline.fit(X_train, y_train)
    fit_models[algo] = model

  y = column_or_1d(y, warn=True)
STOP: TOTAL NO. of ITERATIONS REACHED LIMIT.

Increase the number of iterations (max_iter) or scale the data as shown in:
    https://scikit-learn.org/stable/modules/preprocessing.html
Please also refer to the documentation for alternative solver options:
    https://scikit-learn.org/stable/modules/linear_model.html#logistic-regression
  n_iter_i = _check_optimize_result(
  y = column_or_1d(y, warn=True)
  self._final_estimator.fit(Xt, y, **fit_params_last_step)
  y = column_or_1d(y, warn=True)


In [28]:
out = fit_models['rc'].predict(X_test)

for i in fit_models:
    out = fit_models[i].predict(X_test)
    print(f1_score(out,y_test, average='macro'))

0.9835738792260814
0.9958046648654854
0.9841166205735289
0.979246917698053


Predict and Show

In [59]:
holistic = mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5)
    
#0 for webcam
#1 for default video
path_to_file = 'gestures/testing/airsquat-wild.mp4'
capture = request_capture(1, path_to_file)
pTime = 0
while(1):
    ret , frame = capture.read()

    try:
        #mpPose processa cada frame no formato RGB
        frameRGB = cv2.cvtColor(frame,cv2.COLOR_RGB2BGR)
        results = holistic.process(frameRGB)

        #R_landmark = results.pose_landmarks

        cTime = time.time()
        
        landmmark_work(results, mpPose, mpDraw, mp_holistic, frame)
        
        fps = 1/(cTime - pTime)
        pTime = cTime

        cv2.putText(frame, str(int(fps)), (60,60), cv2.FONT_HERSHEY_PLAIN, 3, (250,0,90),3)
        

        try:
            # Extract Pose landmarks
            pose = results.pose_landmarks.landmark
            pose_row = list(np.array([[landmark.x, landmark.y, landmark.z, landmark.visibility] for landmark in pose]).flatten())
            
            # Extract Face landmarks
            if(results.face_landmarks):
                face = results.face_landmarks.landmark
                face_row = list(np.array([[landmark.x, landmark.y, landmark.z, landmark.visibility] for landmark in face]).flatten())
            else:
                face_row = list(np.zeros(468*4).flatten())
            # Concate rows
            
            row = pose_row+face_row


            
            X = np.array(row).reshape(1,-1)
            body_language_class = le.inverse_transform(xgb.predict(X))
            body_language_prob = xgb.predict_proba(X)
            #print(body_language_class, body_language_prob)

            
            # Get status box
            #cv2.rectangle(frame, (100,100), (250, 60), (245, 117, 16), -1)
            
            # Display Class
            cv2.putText(frame, 'CLASS'
                        , (120,140), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 0), 1, cv2.LINE_AA)
            cv2.putText(frame, body_language_class[0]
                        , (130,168), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 0, 233), 2, cv2.LINE_AA)
            
            # Display Probability
            cv2.putText(frame, 'PROB'
                        , (300,12), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 0), 1, cv2.LINE_AA)
            cv2.putText(frame, str(round(body_language_prob.max(),2))
                        , (300,40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 0, 233), 2, cv2.LINE_AA)
        except:
            pass

    except:
        pass

    cv2.imshow("Video", frame)

    k = cv2.waitKey(5) & 0xff
    if k == 27:
        break

capture.release()
cv2.destroyAllWindows()

In [40]:
np.array(row).reshape(1,-1)[0]

array([ 0.51385081,  0.44113106, -1.17197299, ...,  0.37191319,
        0.03729007,  0.        ])

In [56]:
body_language_class[0]

'cross'

In [53]:
body_language_prob.max()

0.43646085

In [None]:
X_test[:1]

Unnamed: 0,x1,y1,z1,v1,x2,y2,z2,v2,x3,y3,...,z499,v499,x500,y500,z500,v500,x501,y501,z501,v501
431,0.613151,0.323376,-0.863807,0.999949,0.638241,0.280118,-0.788851,0.999919,0.652348,0.282709,...,0.006362,0.0,0.649286,0.283245,0.022695,0.0,0.653343,0.280082,0.023166,0.0


In [None]:
np.array(row).shape

(2004,)

In [217]:
np.array(row).reshape(1,-1)

array([[ 0.57147515,  0.3097128 , -1.16323829, ...,  0.24264657,
         0.0232226 ,  0.        ]])

In [5]:
dir = './gestures'
sub_dir = [name for name in os.listdir(dir) if os.path.isdir(os.path.join(dir, name))]

In [6]:
sub_dir

['physical', 'testing']

In [15]:
new_path = dir+'/'+sub_dir[0]
new_path

'./gestures/physical'

In [18]:
files = [name for name in os.listdir(new_path) if os.path.isfile((os.path.join(new_path, name)))]
files

['air-squat.mp4',
 'airsquat.mp4',
 'fighting1.mp4',
 'fighting2.mp4',
 'jumping-jack.mp4',
 'push-up.mp4']

In [19]:
new_path+'/'+files[0]

'./gestures/physical/air-squat.mp4'