# 0. Install and Import Dependencies

In [None]:
# !pip install mediapipe opencv-python pandas scikit-learn

In [None]:
import mediapipe as mp
import cv2
import numpy as np

In [None]:
# 2. Capture Landmarks & Export to CSV
import csv
import os
import numpy as np
from matplotlib import pyplot as plt

In [None]:
# 3.1 Read in Collected Data and Process
import pandas as pd
from sklearn.model_selection import train_test_split

In [None]:
# 3.2 Train Machine Learning Classification Model
from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import StandardScaler

from sklearn.linear_model import LogisticRegression, RidgeClassifier
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier

In [None]:
# 3.3 Evaluate and Serialize Mode
from sklearn.metrics import accuracy_score, precision_score, recall_score
import pickle

In [None]:
mp_drawing = mp.solutions.drawing_utils # helpers for drawing
mp_pose = mp.solutions.pose

In [None]:
VIDEO_PATH = 'bicep-curl.mp4'
EXPORT_PATH = 'bicep-curl.csv'
MODEL_PATH = 'bicep-curl.pkl'

# 1. Save Video

VIDEO_PATH = `bicep-curl.mp4`


EXPORT_PATH = `bicep-curl.csv`


MODEL_PATH = `bicep-curl.pkl`

In [None]:
cap = cv2.VideoCapture(0)

width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = cap.get(cv2.CAP_PROP_FPS)
videoWriter = cv2.VideoWriter('bicep-curl.avi', cv2.VideoWriter_fourcc('P','I','M','1'), fps, (width, height))

while cap.isOpened():
    ret, frame = cap.read()

    try:
        cv2.imshow('Bicep-Curl', frame)
        videoWriter.write(frame)
    except Exception as e:
        break

    if cv2.waitKey(10) & 0xFF == ord('q'):
        break

cap.release()
videoWriter.release()
cv2.destroyAllWindows()

# 2. Capture Landmarks & Export to CSV

In [None]:
landmarks = ['class']
for val in range(1, 33 +1):
    landmarks += [f'x{val}', f'y{val}', f'z{val}', f'v{val}']

In [None]:
landmarks[1:]

In [None]:
with open('coords.csv', mode= 'w', newline= '') as f:
    csv_writer = csv.writer(f, delimiter= ',', quotechar= '"', quoting= csv.QUOTE_MINIMAL)
    csv_writer.writerow(landmarks)

In [None]:
def export_landmark(results, action):
    try:
        keypoints = np.array([[res.x, res.y, res.z, res.visibility] for res in results.pose_landmarks.landmark]).flatten()
        keypoints.insert(0, action)
        
        with open('coords.csv', mode= 'w', newline= '') as f:
            csv_writer = csv.writer(f, delimiter= ',', quotechar= '"', quoting= csv.QUOTE_MINIMAL)
            csv_writer.writerow(keypoints)
    except Exception as e:
        pass

In [None]:
cap = cv2.VideoCapture(VIDEO_PATH)

# Initiate holistic model
with mp_pose.Pose(min_detection_confidence= 0.5, min_tracking_confidence= 0.5) as pose:
    
    while cap.isOpened():
        ret, image = cap.read()
        
        # Recolor Feed
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        image.flags.writeable = False
        
        # Make Detections
        results = pose.process(image)
        
        # Recolor image back to BGR for rendering
        image.flags.writeable = True
        image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
        
        mp_drawing.draw_landmarks(image, results.pose_landmarks, mp_pose.POSE_CONNECTIONS,
                                  mp_drawing.DrawingSpec(color= (245, 117, 66),
                                                         thickness= 2,
                                                         circle_radius= 4),
                                  mp_drawing.DrawingSpec(color= (245, 66, 230),
                                                         thickness= 2,
                                                         circle_radius= 2)
                                 )
                
        k = cv2.waitKey(1)
        if k == ord('b'):
            export_landmark(results, 'backwards')
        if k == ord('n'):
            export_landmark(results, 'neutral')


        cv2.imshow('Bicep Lean Training', image)

        if cv2.waitKey(1) & 0xFF == ord('q'):
            break
            
cap.release()
cv2.destroyAllWindows()

In [None]:
cap.release()
cv2.destroyAllWindows()

# 3. Train Custom Model Using Scikit Learn

## 3.1 Read in Collected Data and Process

In [None]:
df = pd.read_csv('coords.csv')

In [None]:
df.head()

In [None]:
df.tail()

In [None]:
df[df.class == 'up']

In [None]:
X = df.drop('class', axis= 1) 
y = df.class

In [None]:
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size= 0.3, random_state= 7)

In [None]:
y_test

## 3.2 Train Machine Learning Classification Model

In [None]:
pipelines = {
    'lr': make_pipeline(StandardScaler(), LogisticRegression()),
    'rc': make_pipeline(StandardScaler(), RidgeClassifier()),
    'rf': make_pipeline(StandardScaler(), RandomForestClassifier()),
    'gb': make_pipeline(StandardScaler(), GradientBoostingClassifier()),
}

In [None]:
fit_models = {}

for algo, pipeline in pipeline.items():
    model = pipeline.fit(X_train, y_train)
    fit_models[algo] = model

In [None]:
fit_models

In [None]:
fit_models['rc'].predict(X_test)

## 3.3 Evaluate and Serialize Model

In [None]:
for algo, model in fit_models.items():
    yhat = model.predict(X_test)
    print(algo,
         accuracy_score(y_test.values, yhat),
         precision_score(y_test.values, yhat, average= "binary", pos_label= "up"),
         recall_score(y_test.values, yhat, average= "binary", pos_label= "up"))

In [None]:
yhat = fit_models['rf'].predict(X_test)

In [None]:
yhat[ :10]

In [None]:
with open('bicep-curl.pkl', 'wb') as f:
    pickle.dump(fit_models['rf'], f)

# 4. Make Detections with Model

In [None]:
with open('bicep-curl.pkl', 'rb') as f:
    pickle.load(f)

In [None]:
# X = pd.DataFrame([row], columns= landmarks[1:])

In [None]:
cap = cv2.VideoCapture(0)
counter = 0
current_stage = ''

# Initiate holistic model
with mp_pose.Pose(min_detection_confidence= 0.5, min_tracking_confidence= 0.5) as pose:
    
    while cap.isOpened():
        ret, image = cap.read()
        
        # Recolor Feed
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        image.flags.writeable = False
        
        # Make Detections
        results = pose.process(image)
        
        # Recolor image back to BGR for rendering
        image.flags.writeable = True
        image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
        
        mp_drawing.draw_landmarks(image, results.pose_landmarks, mp_pose.POSE_CONNECTIONS,
                                  mp_drawing.DrawingSpec(color= (245, 117, 66),
                                                         thickness= 2,
                                                         circle_radius= 4),
                                  mp_drawing.DrawingSpec(color= (245, 66, 230),
                                                         thickness= 2,
                                                         circle_radius= 2)
                                 )
                
        try:
            row = np.array([[rex.x, res.y, rex.z, res.visibility] for res in results.pose_landmarks.landmark])
            X = pd.DataFrame([row], columns= landmarks[1:])
            body_language_class = model.predict(X)[0]
            body_language_prob = model.predict_proba(X)[0]
            # print(body_language_class, body_language_prob)
            
        
            if body_language_class == 'down' and body_language_prob[body_language_prob.argmax()] >= .7:
                current_stage = 'down'
            elif current_stage = 'down' and body_language_class == 'up' and body_language_prob[body_language_prob.argmax()] >= .7:
                current_stage = 'up'
                counter += 1
                # print(current_stage)

            # Get status box
            cv2.rectangle(image, (0,0), (250,60), (245,117,16), -1)

            # Display Class
            cv2.putText(image, 
                        'CLASS',
                        (0,0),
                        cv2.FONT_HERSHEY_SIMPLEX,
                        0.5,
                        (0,0,0),
                        1,
                        cv2.LINE_AA
                       )
            cv2.putText(image, 
                        body_language_class(' ')[0],
                        (90,40),
                        cv2.FONT_HERSHEY_SIMPLEX,
                        1,
                        (255,255,255),
                        2,
                        cv2.LINE_AA
                       )
            
            # Display Propability
            cv2.putText(image, 
                        'PROB',
                        (15,12),
                        cv2.FONT_HERSHEY_SIMPLEX,
                        0.5,
                        (0,0,0),
                        1,
                        cv2.LINE_AA
                       )
            cv2.putText(image, 
                        str(round(body_language_prob[np.argmax(body_language_prob)], 2)),
                        (10,40),
                        cv2.FONT_HERSHEY_SIMPLEX,
                        1,
                        (255,255,255),
                        2,
                        cv2.LINE_AA
                       )
            
            # Display Count
            cv2.putText(image, 
                        'COUNT',
                        (180,12),
                        cv2.FONT_HERSHEY_SIMPLEX,
                        0.5,
                        (0,0,0),
                        1,
                        cv2.LINE_AA
                       )
            cv2.putText(image, 
                        str(counter),
                        (175,40),
                        cv2.FONT_HERSHEY_SIMPLEX,
                        1,
                        (255,255,255),
                        2,
                        cv2.LINE_AA
                       )
            
            
        except Exception as e:
            pass


        cv2.imshow('Bicep Prediction', image)

        if cv2.waitKey(10) & 0xFF == ord('q'):
            break
            
cap.release()
cv2.destroyAllWindows()