In [None]:
# !pip install opencv-python mediapipe sklearn matplotlib

In [None]:
# !pip install tensorflow-gpu

# I. Import Library and define functions

In [None]:
import cv2
import numpy as np
import os
import time
import mediapipe as mp
import time
import tensorflow as tf

from matplotlib import pyplot as plt
from tensorflow.keras.utils import to_categorical
from sklearn.model_selection import train_test_split
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, LSTM

In [None]:
def mediapipe_detection(image, model):
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) # COLOR CONVERSION BGR 2 RGB
    image.flags.writeable = False                  # Image is no longer writeable
    results = model.process(image)                 # Make prediction
    image.flags.writeable = True                   # Image is now writeable 
    image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR) # COLOR COVERSION RGB 2 BGR
    return image, results

In [None]:
def get_scaled_landmarks(landmarks, dimenson):
    landmarks_2d = []
    landmarks_3d = []
    if dimenson == '2d':
        for landmark in landmarks:
            x, y = int(landmark.x*1280), int(landmark.y*720)
            landmarks_2d.append([x, y])
        return landmarks_2d
    if dimenson == 'both':
        for landmark in landmarks:
            x, y = int(landmark.x*1280), int(landmark.y*720)
            landmarks_2d.append([x, y])
            landmarks_3d.append([x, y, landmark.z])
        return landmarks_2d, landmarks_3d

In [None]:
def draw_landmarks(image, results):
    lmks = results.pose_landmarks.landmark
    pose_landmarks = [lmks[0], lmks[11], lmks[12], lmks[13], lmks[14], lmks[15], lmks[16], lmks[23], lmks[24], lmks[19], lmks[20]] 
    pose_landmarks = get_scaled_landmarks(pose_landmarks, '2d')
    
    cv2.line(image, tuple(pose_landmarks[1]), tuple(pose_landmarks[2]), (255, 255, 255), 2)
    cv2.line(image, tuple(pose_landmarks[1]), tuple(pose_landmarks[3]), (255, 255, 255), 2)
    cv2.line(image, tuple(pose_landmarks[3]), tuple(pose_landmarks[5]), (255, 255, 255), 2)
    cv2.line(image, tuple(pose_landmarks[2]), tuple(pose_landmarks[4]), (255, 255, 255), 2)
    cv2.line(image, tuple(pose_landmarks[4]), tuple(pose_landmarks[6]), (255, 255, 255), 2)
    cv2.line(image, tuple(pose_landmarks[1]), tuple(pose_landmarks[7]), (255, 255, 255), 2)
    cv2.line(image, tuple(pose_landmarks[2]), tuple(pose_landmarks[8]), (255, 255, 255), 2)
    cv2.line(image, tuple(pose_landmarks[7]), tuple(pose_landmarks[8]), (255, 255, 255), 2)
    cv2.line(image, tuple(pose_landmarks[5]), tuple(pose_landmarks[9]), (255, 255, 255), 2)
    cv2.line(image, tuple(pose_landmarks[6]), tuple(pose_landmarks[10]), (255, 255, 255), 2)
    for lm in pose_landmarks:
        cv2.circle(image, (int(lm[0]), int(lm[1])), 4, (0, 0, 255), -1)
#     mp_drawing.draw_landmarks(image, results.pose_landmarks, mp_pose.POSE_CONNECTIONS) # Draw pose connections

In [None]:
def show_fps(image, prev_frame_time):
    new_frame_time = time.time()
    fps = int(1/(new_frame_time-prev_frame_time))
    cv2.putText(image, f"fps: {fps}", (1000, 700), cv2.FONT_HERSHEY_SIMPLEX, 2, (100, 255, 0), 2, cv2.LINE_AA)
    return new_frame_time


In [None]:
def get_joint_angle(a, b, c):
    angle = np.abs(np.arctan2(c.y-b.y, c.x-b.x) - np.arctan2(a.y-b.y, a.x-b.x))
    if angle > np.pi:
        angle = 2*np.pi-angle
    return angle

In [None]:
def get_all_angles(landmarks):
    nose = landmarks[mp_pose.PoseLandmark.NOSE.value]
    right_shoulder = landmarks[mp_pose.PoseLandmark.RIGHT_SHOULDER.value]
    right_elbow = landmarks[mp_pose.PoseLandmark.RIGHT_ELBOW.value]
    right_wrist = landmarks[mp_pose.PoseLandmark.RIGHT_WRIST.value]
    right_ear = landmarks[mp_pose.PoseLandmark.RIGHT_EAR.value]
    left_shoulder = landmarks[mp_pose.PoseLandmark.LEFT_SHOULDER.value]
    left_elbow = landmarks[mp_pose.PoseLandmark.LEFT_ELBOW.value]
    left_wrist = landmarks[mp_pose.PoseLandmark.LEFT_WRIST.value]
    left_ear = landmarks[mp_pose.PoseLandmark.LEFT_EAR.value]
    right_elbow_angle = get_joint_angle(right_shoulder, right_elbow, right_wrist)
    righ_shoulders_angle = get_joint_angle(right_elbow, right_shoulder, left_shoulder)
    left_elbow_angle = get_joint_angle(left_shoulder, left_elbow, left_wrist)
    left_shoulders_angle = get_joint_angle(left_elbow, left_shoulder, right_shoulder)
    nose_angle = get_joint_angle(left_shoulder, nose, right_shoulder)
    left_ear_angle = get_joint_angle(left_shoulder, left_ear, right_shoulder)
    right_ear_angle = get_joint_angle(left_shoulder, right_ear, right_shoulder)
    angles = [right_elbow_angle, righ_shoulders_angle, left_elbow_angle, left_shoulders_angle, nose_angle, left_ear_angle, right_ear_angle]
    return angles

In [None]:
def get_frame_landmarks(results):
    size_landmarks = np.array([[res.x, res.y, res.z, res.visibility] for res in results.pose_landmarks.landmark[:23]]).flatten() if results.pose_landmarks else np.zeros(4*23)
    world_landmarks =  np.array([[res.x, res.y, res.z, res.visibility] for res in results.pose_world_landmarks.landmark[:23]]).flatten() if results.pose_world_landmarks else np.zeros(4*23)
    angles = np.array(get_all_angles(results.pose_landmarks.landmark)) if results.pose_landmarks else np.zeros(4)
    landmarks = np.concatenate([size_landmarks, world_landmarks, angles])
    return landmarks

# II. Data processing

#### Create folders to save data

In [None]:
DATA_PATH = os.path.join('..\Data\Landmark Data')
VIDEO_PATH = os.path.join('..\Data\Video Data')
actions = np.array(['non_cheating', 'cheating'])

# for action in actions:
#     for sequence in range(1, 721):
#         try:
#             os.makedirs(os.path.join(DATA_PATH, action, str(sequence)))
#         except:
#             pass

#### Extracts landmarks to created folders

In [None]:
mp_pose = mp.solutions.pose
mp_drawing = mp.solutions.drawing_utils

In [None]:
prev_frame_time = 0

for action in actions:
    action_path = os.path.join(VIDEO_PATH, action)
    for id_sequence, video_name in enumerate(os.listdir(action_path)):
        video_path = os.path.join(action_path, video_name)
        cap = cv2.VideoCapture(video_path)
        frame_num = 0
        with mp_pose.Pose() as pose:
            while cap.isOpened():
                # Read feed
                ret, frame = cap.read()
                if not ret:
                    print("Video end")
                    break

                frame_num += 1
                # Make detections
                image, results = mediapipe_detection(frame, pose)

                # Draw landmarks
                if results.pose_landmarks:
                    draw_landmarks(image, results) 

                frame_landmarks = get_frame_landmarks(results)
                print(frame_landmarks)
                frame_landmarks_path = os.path.join(DATA_PATH, action, str(id_sequence+1), str(frame_num))
                np.save(frame_landmarks_path, frame_landmarks)

                #Show fps
                prev_frame_time = show_fps(image, prev_frame_time)
                # Show to screen
                cv2.imshow('OpenCV Feed', image)

                # Break gracefully
                if cv2.waitKey(1) & 0xFF == ord('q'):
                    break
        cap.release()
        cv2.destroyAllWindows()


#### Extract landmarks data from folders to numpy array

In [None]:
label_map = {label:num for num, label in enumerate(actions)}
sequences = []
labels = []
for action in actions:
    for sequence in range(720):
        window = []
        for frame_num in range(30):
            frame_landmarks_path = os.path.join(DATA_PATH, action, str(sequence+1), str(frame_num+1))
            frame_landmarks = np.load(f"{frame_landmarks_path}.npy")
            window.append(frame_landmarks)
        sequences.append(window)
        labels.append(label_map[action])

# np.save('final_sequence', sequences)
# np.save('final_labels', labels)

#### Split to train and test

In [None]:
X = np.load('final_sequence.npy')
y = np.load('final_labels.npy')

In [None]:
X. shape

In [None]:
y = to_categorical(y).astype(int)

In [None]:
y.shape

In [None]:
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.1, random_state=42)

# III. Train model

#### Model networks

In [None]:
model = Sequential()
model.add(LSTM(64, return_sequences=True, activation='relu', input_shape=(30, 191)))
model.add(LSTM(128, return_sequences=True, activation='relu'))
model.add(LSTM(64, return_sequences=False, activation='relu'))
model.add(Dense(64, activation='relu'))
model.add(Dense(32, activation='relu'))
model.add(Dense(actions.shape[0], activation='softmax'))

In [None]:
del model

#### Training model 

In [None]:
model.compile(optimizer='Adam', loss='categorical_crossentropy', metrics=['categorical_accuracy'])

In [None]:
history = model.fit(X_train, y_train, epochs=50)

In [None]:
from matplotlib.ticker import MaxNLocator
ax = plt.figure().gca()

plt.plot(np.arange(1,51), history.history['categorical_accuracy'])
plt.title('model accuracy')
plt.ylabel('train_accuracy')
plt.xlabel('epoch')
ax.xaxis.set_major_locator(MaxNLocator(integer=True))

plt.show()

In [None]:
ax = plt.figure().gca()

plt.plot(history.history['loss'])
plt.title('model loss')
plt.ylabel('train_loss')
plt.xlabel('epoch')
ax.xaxis.set_major_locator(MaxNLocator(integer=True))

plt.show()

In [None]:
model.summary()

In [None]:
# model.save('saved_pose.h5')

#### Evaluation on test set

In [None]:
from sklearn.metrics import multilabel_confusion_matrix, accuracy_score, ConfusionMatrixDisplay, confusion_matrix

In [None]:
ytrue = np.argmax(y_test, axis=1).tolist()

In [None]:
model.load_weights('720-data.h5')

In [None]:
y_res = model.predict(X_test)

In [None]:
yhat = np.argmax(y_res, axis=1).tolist()

In [None]:
ConfusionMatrixDisplay(confusion_matrix(ytrue, yhat)).plot()


In [None]:
accuracy_score(ytrue, yhat)

# IV. Test in real time

In [None]:
prev_frame_time = 0
input_sequence = []
predictions = []
cap = cv2.VideoCapture("cheat_2022-04-11 113657.mp4")
# cap = cv2.VideoCapture(1)
mp_drawing_styles = mp.solutions.drawing_styles
cap.set(3, 1280)
cap.set(4, 720)
count = 0
frame_num = 0
# result = cv2.VideoWriter('side4.avi',cv2.VideoWriter_fourcc('M','J','P','G'), 15, (1280, 720))

with mp_pose.Pose() as pose:
    while cap.isOpened():

        # Read feed
        ret, frame = cap.read()
        if not ret:
            print("Can't get frame!")
            break

        frame_num += 1
        # Make detections
        image, results = mediapipe_detection(frame, pose)

        #Draw landmarks
        if results.pose_landmarks:
            draw_landmarks(image, results)

        frame_landmarks = get_frame_landmarks(results)
        input_sequence.append(frame_landmarks)
        input_sequence = input_sequence[-30:]
        if len(input_sequence) == 30:
            res = model.predict(np.expand_dims(input_sequence, axis=0))[0]
            cheating_prob = round(res[1], 2)
            cv2.putText(image, "Cheating probs: "+str(cheating_prob), (0, 200), cv2.FONT_HERSHEY_SIMPLEX, 1.5, ((255, 0, 0)), 2, cv2.LINE_AA)
            if cheating_prob > 0.8:
                cv2.putText(image, "Warning: suspicous behavior", (7, 60), cv2.FONT_HERSHEY_SIMPLEX, 2, (45, 255, 255), 2, cv2.LINE_AA)
        
        #Show fps
        prev_frame_time = show_fps(image, prev_frame_time)

        # Show to screen
        cv2.imshow('OpenCV Feed', image)

        # Break gracefully
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break
    cap.release()
    cv2.destroyAllWindows()