In [8]:
!pip install mediapipe opencv-python pandas scikit-learn

Defaulting to user installation because normal site-packages is not writeable
You should consider upgrading via the '/usr/bin/python3 -m pip install --upgrade pip' command.[0m


In [9]:
!pip install emoji

Defaulting to user installation because normal site-packages is not writeable
You should consider upgrading via the '/usr/bin/python3 -m pip install --upgrade pip' command.[0m


In [10]:
!pip install pillow

Defaulting to user installation because normal site-packages is not writeable
You should consider upgrading via the '/usr/bin/python3 -m pip install --upgrade pip' command.[0m


In [11]:
import mediapipe as mp # Import mediapipe
import cv2 # Import opencv
import csv
import os
import numpy as np
import pandas as pd

from sklearn.model_selection import train_test_split
from sklearn.pipeline import make_pipeline 
from sklearn.preprocessing import StandardScaler 
from sklearn.linear_model import LogisticRegression
from sklearn.svm import SVC
from sklearn.ensemble import RandomForestClassifier
from sklearn.neighbors import KNeighborsClassifier
from sklearn.decomposition import PCA

from sklearn.metrics import accuracy_score # Accuracy metrics 
import pickle 

import emoji
from PIL import ImageFont, ImageDraw, Image

In [12]:
mp_drawing = mp.solutions.drawing_utils # Drawing helpers
mp_holistic = mp.solutions.holistic # Mediapipe Solutions

# Get Data from files and combine to one dataframe

In [13]:
maria_df = pd.read_csv('data/coords_maria.csv')
konna_df = pd.read_csv('data/konna_data.csv')
elena_df = pd.read_csv('data/elena_data.csv')

In [14]:
konna_df['class'] = konna_df['class'].replace({'neutral': 'neutral_face'})
elena_df['class'] = elena_df['class'].replace({'neutral': 'neutral_face'})

In [15]:
maria_df['class'].value_counts()

face_screaming_in_fear     786
neutral_face               714
smiling_face_with_halo     632
smiling_face_with_horns    595
slightly_smiling_face      593
grinning_face              570
hugging_face               512
shushing_face              484
white_frowning_face        403
Name: class, dtype: int64

In [16]:
konna_df['class'].value_counts()

grinning_face              731
smiling_face_with_halo     659
slightly_smiling_face      622
neutral_face               617
hugging_face               586
white_frowning_face        581
face_screaming_in_fear     546
smiling_face_with_horns    450
shushing_face              450
Name: class, dtype: int64

In [17]:
elena_df['class'].value_counts()

smiling_face_with_halo     750
grinning_face              705
slightly_smiling_face      631
neutral_face               629
smiling_face_with_horns    608
face_screaming_in_fear     605
hugging_face               541
shushing_face              500
white_frowning_face        479
Name: class, dtype: int64

In [18]:
df_final = maria_df.append(konna_df, ignore_index=True).append(elena_df, ignore_index=True)

In [19]:
df_final['class'].value_counts()

smiling_face_with_halo     2041
grinning_face              2006
neutral_face               1960
face_screaming_in_fear     1937
slightly_smiling_face      1846
smiling_face_with_horns    1653
hugging_face               1639
white_frowning_face        1463
shushing_face              1434
Name: class, dtype: int64

## Split into train and test set

In [20]:
X = df_final.drop('class', axis=1) # features
y = df_final['class'] # target value

In [21]:
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=1234)

# Train Classification Models

In [23]:
pipelines = {
    'lr':make_pipeline(StandardScaler(), LogisticRegression(max_iter=10000)),
    'svm':make_pipeline(StandardScaler(), SVC(probability=True)),
    'rf':make_pipeline(StandardScaler(), RandomForestClassifier(max_depth=10)),
    'lr_pca':make_pipeline(StandardScaler(), PCA(n_components = 50), LogisticRegression(max_iter=10000)),
    'svm_pca':make_pipeline(StandardScaler(), PCA(n_components = 50), SVC(probability=True)),
    'rf_pca':make_pipeline(StandardScaler(), PCA(n_components = 50), RandomForestClassifier(max_depth=10)),
}

In [24]:
fit_models = {}
for algo, pipeline in pipelines.items():
    model = pipeline.fit(X_train, y_train)
    fit_models[algo] = model

In [25]:
fit_models

{'lr': Pipeline(steps=[('standardscaler', StandardScaler()),
                 ('logisticregression', LogisticRegression(max_iter=10000))]),
 'svm': Pipeline(steps=[('standardscaler', StandardScaler()),
                 ('svc', SVC(probability=True))]),
 'rf': Pipeline(steps=[('standardscaler', StandardScaler()),
                 ('randomforestclassifier',
                  RandomForestClassifier(max_depth=10))]),
 'lr_pca': Pipeline(steps=[('standardscaler', StandardScaler()),
                 ('pca', PCA(n_components=50)),
                 ('logisticregression', LogisticRegression(max_iter=10000))]),
 'svm_pca': Pipeline(steps=[('standardscaler', StandardScaler()),
                 ('pca', PCA(n_components=50)), ('svc', SVC(probability=True))]),
 'rf_pca': Pipeline(steps=[('standardscaler', StandardScaler()),
                 ('pca', PCA(n_components=50)),
                 ('randomforestclassifier',
                  RandomForestClassifier(max_depth=10))])}

# Test Classification Models

In [26]:
for algo, model in fit_models.items():
    yhat = model.predict(X_test)
    print(algo, accuracy_score(y_test, yhat))

lr 0.9987484355444305
svm 0.9874843554443054
rf 0.9899874843554443
lr_pca 0.9974968710888611
svm_pca 0.9870671672924489
rf_pca 0.9941593658740092


# Save models in pkl files

In [27]:
for algo, model in fit_models.items():
    with open(algo+'.pkl', 'wb') as f:
        pickle.dump(model, f)

# Make Detections

Select Model

In [67]:
file_path = 'svm.pkl'

In [68]:
with open(file_path, 'rb') as f:
    model = pickle.load(f)

In [69]:
model

Pipeline(steps=[('standardscaler', StandardScaler()),
                ('svc', SVC(probability=True))])

Detections every frame

In [70]:
cap = cv2.VideoCapture(0)
# Initiate holistic model
with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:

    while cap.isOpened():
        # change video fps
        cap.set(cv2.CAP_PROP_FPS, 60)
        ret, frame = cap.read()
        
        # Recolor Feed
        image = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        image.flags.writeable = False        
        
        # Make Detections
        results = holistic.process(image)
        # print(results.face_landmarks)
        
        # face_landmarks, pose_landmarks, left_hand_landmarks, right_hand_landmarks
        
        # Recolor image back to BGR for rendering
        image.flags.writeable = True   
        image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
        
        # Export coordinates
        try:
            # Extract Pose landmarks
            pose = results.pose_landmarks.landmark
            pose_row = list(np.array([[landmark.x, landmark.y, landmark.z, landmark.visibility] for landmark in pose]).flatten())
            
            # Extract Face landmarks
            face = results.face_landmarks.landmark
            face_row = list(np.array([[landmark.x, landmark.y, landmark.z, landmark.visibility] for landmark in face]).flatten())
            
            # Concate rows
            row = pose_row+face_row
            
            # Make Detections
            X = pd.DataFrame([row])
            body_language_class = model.predict(X)[0]
            body_language_prob = model.predict_proba(X)[0]
            #print(body_language_class, np.round(body_language_prob, decimals=1))
                               
            # Get status box
            cv2.rectangle(image, (0,0), (460, 60), (245, 117, 16), -1)

            # Display Class
            cv2.putText(image, 'CLASS'
                        , (95,12), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 0), 1, cv2.LINE_AA)
            cv2.putText(image, body_language_class.split(' ')[0]
                        , (90,40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2, cv2.LINE_AA)

            # Display Probability
            cv2.putText(image, 'PROB'
                        , (15,12), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 0), 1, cv2.LINE_AA)
            cv2.putText(image, str(round(body_language_prob[np.argmax(body_language_prob)],2))
                        , (10,40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2, cv2.LINE_AA)
                
            # add emoji to frame
            # load with alpha channel -1
            emoticon = cv2.imread(body_language_class+'.png', -1)
            img_height, img_width, _ = emoticon.shape
            x_offset = 0
            y_offset = 60
            y1, y2 = y_offset, y_offset + img_height
            x1, x2 = x_offset, x_offset + img_width
            # image[ y1:y2 , x1:x2 ] = emoticon
            alpha_s = emoticon[:, :, 3] / 255.0
            alpha_l = 1.0 - alpha_s

            for c in range(0, 3):
                image[y1:y2, x1:x2, c] = (alpha_s * emoticon[:, :, c] +
                                          alpha_l * image[y1:y2, x1:x2, c])
            
        except Exception as e:
            pass
                        
        cv2.imshow('Emoji Recognition', image)

        if cv2.waitKey(10) & 0xFF == ord('q'):
            break

cap.release()
cv2.destroyAllWindows()

Detections with sampling (class is updated every time the blue box flashes)

In [62]:
cap = cv2.VideoCapture(0)
# initiate holistic model
with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:
    
    time = 0
    predictions = []
    agg_pred = ' '
    while cap.isOpened():
        # change video fps
        cap.set(cv2.CAP_PROP_FPS, 60)
        # get frame
        ret, frame = cap.read()
        # recolor feed
        image = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        image.flags.writeable = False
        
        # make detections
        results = holistic.process(image)
        
        # recolor image back to BGR for rendering
        image.flags.writeable = True
        image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
        
        # export coordinates
        try:
            # Extract Pose landmarks
            pose = results.pose_landmarks.landmark
            pose_row = list(np.array([[landmark.x, landmark.y, landmark.z, landmark.visibility] for landmark in pose]).flatten())
            
            # Extract Face landmarks
            face = results.face_landmarks.landmark
            face_row = list(np.array([[landmark.x, landmark.y, landmark.z, landmark.visibility] for landmark in face]).flatten())
            
            # Concate rows
            row = pose_row+face_row
            
            # Make Detections
            X = pd.DataFrame([row])
            body_language_class = model.predict(X)[0]
            body_Savelanguage_prob = model.predict_proba(X)[0]
            
            predictions.append(body_language_class)
            
            
            if time % 10 == 0:
                agg_pred = max(set(predictions), key = predictions.count)
                predictions = []
                time = 0
                # Get status box
                cv2.rectangle(image, (0,0), (460, 60), (245, 117, 16), -1)
            # Display Class
            cv2.putText(image, 'CLASS', (95,12), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 0), 1, cv2.LINE_AA)
            cv2.putText(image, agg_pred.split(' ')[0]
                            , (90,40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2, cv2.LINE_AA)
            # add emoji to frame
            # load with alpha channel -1
            emoticon = cv2.imread(agg_pred+'.png', -1)
            img_height, img_width, _ = emoticon.shape
            x_offset = 0
            y_offset = 60
            y1, y2 = y_offset, y_offset + img_height
            x1, x2 = x_offset, x_offset + img_width
            # image[ y1:y2 , x1:x2 ] = emoticon
            alpha_s = emoticon[:, :, 3] / 255.0
            alpha_l = 1.0 - alpha_s

            for c in range(0, 3):
                image[y1:y2, x1:x2, c] = (alpha_s * emoticon[:, :, c] +
                                          alpha_l * image[y1:y2, x1:x2, c])
            time += 1
                
        except Exception as e:
            pass
        
        cv2.imshow('Emoji Recognition', image)
        if cv2.waitKey(10) & 0xFF == ord('q'):
            break

cap.release()
cv2.destroyAllWindows()