In [19]:
import cv2
import traceback
import numpy as np
import csv
import mediapipe as mp
mp_drawing = mp.solutions.drawing_utils
mp_pose = mp.solutions.pose

import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression, RidgeClassifier, SGDClassifier
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.neighbors import KNeighborsClassifier
from sklearn.svm import LinearSVC
from sklearn.metrics import accuracy_score, confusion_matrix
import pickle

# Collect Data

The following is the function that will be used later to process and label each frame in a given video and upload the data to an output file.

In [9]:
def processAndClassifyFrames(videoPath, outputFile, classification):
    """
    Process each frame of a given video, extracting pose data using MediaPipe Pose,
    and appends the pose data along with a specified classification to a CSV file.

    Parameters:
    - videoPath (str): The path to the video file.
    - outputFile (str): The path to the CSV file where pose data will be appended.
    - classification (str): The specified classification associated with each frame.

    Note:
    - Ensure that the video frames are aligned with the given classification.

    Example:
    >>> processAndClassifyFrames('video.mp4', 'output.csv', 'walking')

    """
    with open(outputFile, mode='a', newline='') as file:
        writer = csv.writer(file)
        cap = cv2.VideoCapture(videoPath)

        with mp_pose.Pose(min_detection_confidence=.5, min_tracking_confidence=.5) as pose:
            try:
                # Loop through the video frames
                while cap.isOpened():
                    success, frame = cap.read() # Read a frame from the video
                    if not success: break

                    # Get pose data and export to file
                    results = pose.process(frame)
                    if (results.pose_landmarks):
                        keypoints = np.array([[res.x, res.y, res.z, res.visibility] for res in results.pose_landmarks.landmark]).flatten().tolist()
                        keypoints.insert(0, classification)
                        writer.writerow(keypoints)

            except Exception as err:
                traceback.print_exc()
            finally:
                # Release the video capture object and close the display window
                cap.release()
                cv2.destroyAllWindows()

* The variable outputFile is the destination file where the processed data and classification will be stored.
* model_path is the destination where the model weights will be saved after training is complete.


In [17]:
outputFile = "training-data.csv"
model_path = "action_classifier_weights.pkl"

The following code creates the required headers for the output file and overrides any previous data with said header.

In [None]:
# Create and write header to output file
landmarks = ['class']
for val in range(1, 33+1): # 33 keypoints
    landmarks += [s + str(val) for s in 'xyzv']

with open(outputFile, mode='w', newline='') as file:
    writer = csv.writer(file)
    writer.writerow(landmarks)

### Process And Classify Video Frames
The provided code snippet serves the purpose of processing and classifying video frames, with the ability to customize the video source and the classification category. It appends the resulting data to an output file specified within the code. This should be run for every video/classification in the dataset.

* The variable videoPath is set to a specific file path which points to the video to be processed.
* The variable classification indicates the specific category or class by which each frame in the video will be classified.

In [10]:
videoPath = "C:\\Users\\Will\\Downloads\\pushup.MOV"
classification = "pushup"

processAndClassifyFrames(videoPath, outputFile, classification)

# Train Classification Model
Here, we set up the train-test split from the data collected previously. We define multiple machine learning pipelines, each combining data scaling using StandardScaler and a different classification algorithm (Logistic Regression, Ridge Classifier, Random Forest, Gradient Boosting, K-Nearest Neighbors, Linear Support Vector Classifier, and Stochastic Gradient Descent Classifier) and then fit the models.

In [36]:
df = pd.read_csv(outputFile)

X = df.drop('class', axis=1) # features
y = df['class'] # classifications

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=.35, random_state=101)

pipelines = {
    'lr': make_pipeline(StandardScaler(), LogisticRegression()),
    'rc': make_pipeline(StandardScaler(), RidgeClassifier()),
    'rf': make_pipeline(StandardScaler(), RandomForestClassifier()),
    'gb': make_pipeline(StandardScaler(), GradientBoostingClassifier()),
    'nn': make_pipeline(StandardScaler(), KNeighborsClassifier()),
    'svc': make_pipeline(StandardScaler(), LinearSVC()),
    'sgd': make_pipeline(StandardScaler(), SGDClassifier()),
}

fit_models = {}
for name, pipeline in pipelines.items():
    fit_models[name] = pipeline.fit(X_train, y_train)




# Evaluate Models

Here we evaluate each models accuracy and visualize it's corresponding confusion matrix.

In [None]:
confusion_matrices = {}

for algo, model in fit_models.items():
    y_pred = model.predict(X_test)
    confusion_matrices[algo] = confusion_matrix(y_test, y_pred)
    print(algo,
        accuracy_score(y_test.values, y_pred),
    )

In [None]:
import seaborn as sns
import matplotlib.pyplot as plt

# Set up a grid of subplots for each confusion matrix
fig, axes = plt.subplots(2, 4, figsize=(16, 8))
fig.suptitle("Confusion Matrices", fontsize=16)

for i, (name, cm) in enumerate(confusion_matrices.items()):
    ax = axes[i // 4, i % 4]
    sns.heatmap(cm, annot=True, fmt="d", cmap="Blues", cbar=False, ax=ax)
    ax.set_title(f"Confusion Matrix for {name}")
    ax.set_xlabel("Predicted")
    ax.set_ylabel("Actual")

plt.tight_layout()
plt.show()

# Save/Export Model

Now we save the weights for our best performing model.

In [14]:
with open(model_path, 'wb') as f:
    pickle.dump(fit_models['svc'], f)

# Run Model

Here we can test the model in real time.

In [15]:
with open(model_path, 'rb') as f:
    model = pickle.load(f)

In [18]:
# Start video capture
cap = cv2.VideoCapture(0)

with mp_pose.Pose(min_detection_confidence=.5, min_tracking_confidence=.5) as pose:
    try:
        # Loop through the video frames
        while cap.isOpened():
            # Read a frame from the video
            success, frame = cap.read()

            if not success: break

            # Run inference on the frame
            results = pose.process(frame)

            if results.pose_landmarks:
                mp_drawing.draw_landmarks(frame, results.pose_landmarks, mp_pose.POSE_CONNECTIONS)

                keypoints = np.array([[res.x, res.y, res.z, res.visibility] for res in results.pose_landmarks.landmark]).flatten().tolist()
                X = pd.DataFrame([keypoints], columns=landmarks[1:])
                class_ = model.predict(X)[0]
                cv2.putText(frame, class_, (95, 40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255,255,255), 2, cv2.LINE_AA)
                cv2.putText(frame, class_, (95, 40), cv2.FONT_HERSHEY_SIMPLEX, .5, (0,0,0), 2, cv2.LINE_AA)
            
            
            cv2.imshow("", frame)

            # Break the loop if 'q' is pressed
            if cv2.waitKey(1) & 0xFF == ord("q"):
                break

    except Exception as err:
        traceback.print_exc()
    finally:
        # Release the video capture object and close the display window
        cap.release()
        cv2.destroyAllWindows()