In [None]:
import tensorflow as tf
import numpy as np
from matplotlib import pyplot as plt
import cv2
import os

In [None]:
interpreter = tf.lite.Interpreter(model_path='lite-model_movenet_singlepose_lightning_3.tflite')
interpreter.allocate_tensors()

In [None]:
def calculate_angle(a,b,c):
    # a is first angle, b is second angle, c is 3rd angle
    a = np.array(a)
    b = np.array(b)
    c = np.array(b)
    
    radians = np.arctan2(c[1] - b[1], c[0] - b[0]) - np.arctan2(a[1] - b[1], a[0] - b[0])
    angle = np.abs(radians * 180.0/np.pi)
    
    return angle

In [None]:
def which_keys(boolean_list, kps):
    kps_new = []
    edges_new = {}
    need = [False] * 12
    only_edges = []
    if boolean_list[0]: # angle btwn left wrist, elbow, shoulder
        need[4], need[2], need[0] = True, True, True
        edges_new[(7, 9)], edges_new[(5, 7)] = 'm', 'm'
    if boolean_list[1]: # angle btwn right wrist, elbow, shoulder
        need[5], need[3], need[1] = True, True, True
        edges_new[(6, 8)], edges_new[(8, 10)] = 'c', 'c'
    if boolean_list[2]: # angle btwn left hip, shoulder, elbow
        need[6], need[0], need[2] = True, True, True
        edges_new[(5, 11)], edges_new[(5, 7)] = 'm', 'm'
    if boolean_list[3]: # angle btwn right hip, shoulder, elbow
        need[7], need[1], need[3] = True, True, True
        edges_new[(6, 8)], edges_new[(6, 12)] = 'c', 'c'
    if boolean_list[4]: # angle btwn left hip, knee, ankle
        need[6], need[8], need[10] = True, True, True
        edges_new[(11, 13)], edges_new[(13, 15)] = 'm', 'm'
    if boolean_list[5]: # angle btwn right hip, knee, ankle
        need[7], need[9], need[11] = True, True, True
        edges_new[(12, 14)], edges_new[(14, 16)] = 'c', 'c'
    for i in range(len(boolean_list)):
        if boolean_list[i]:
            kps_new.append(kps[i])
    return kps, edges_new

In [None]:
def angle_comparison(scores_1, scores_2):
    
    l1_shoulder, r1_shoulder, l1_elbow, r1_elbow, l1_wrist, r1_wrist, l1_hip, r1_hip, l1_knee, r1_knee, l1_ankle, r1_ankle = scores_1
    l2_shoulder, r2_shoulder, l2_elbow, r2_elbow, l2_wrist, r2_wrist, l2_hip, r2_hip, l2_knee, r2_knee, l2_ankle, r2_ankle = scores_2
    
    # wiggling his fingers
    # angle btwn left wrist, elbow, shoulder
    l_angle_1_1 = calculate_angle(l1_shoulder[:2], l1_elbow[:2], l1_wrist[:2])
    l_angle_1_2 = calculate_angle(l2_shoulder[:2], l2_elbow[:2], l2_wrist[:2])
    
    # angle btwn right wrist, elbow, shoulder
    r_angle_1_1 = calculate_angle(r1_shoulder[:2], r1_elbow[:2], r1_wrist[:2])
    r_angle_1_2 = calculate_angle(r2_shoulder[:2], r2_elbow[:2], r2_wrist[:2])
    
    # angle btwn left hip, shoulder, elbow
    l_angle_2_1 = calculate_angle(l1_elbow[:2], l1_shoulder[:2], l1_hip[:2])
    l_angle_2_2 = calculate_angle(l2_elbow[:2], l2_shoulder[:2], l2_hip[:2])
    
    # angle btwn right hip, shoulder, elbow
    r_angle_2_1 = calculate_angle(r1_elbow[:2], r1_shoulder[:2], r1_hip[:2])
    r_angle_2_2 = calculate_angle(r2_elbow[:2], r2_shoulder[:2], r2_hip[:2])
    
    # angle btwn left hip, knee, ankle
    l_angle_3_1 = calculate_angle(l1_hip[:2], l1_knee[:2], l1_ankle[:2])
    l_angle_3_2 = calculate_angle(l2_hip[:2], l2_knee[:2], l2_ankle[:2])
    
    # angle btwn right hip, knee, ankle
    r_angle_3_1 = calculate_angle(r1_hip[:2], r1_knee[:2], r1_ankle[:2])
    r_angle_3_2 = calculate_angle(r2_hip[:2], r2_knee[:2], r2_ankle[:2])
    
    l_angle_1 = False
    l_angle_2 = False
    l_angle_3 = False
    r_angle_1 = False
    r_angle_2 = False
    r_angle_3 = False
    
    if abs(l_angle_1_1 - l_angle_1_2) > 10.0:
        l_angle_1 = True
    if abs(l_angle_2_1 - l_angle_2_2) > 10.0:
        l_angle_2 = True
    if abs(l_angle_3_1 - l_angle_3_2) > 10.0:
        l_angle_3 = True
    if abs(r_angle_1_1 - r_angle_1_2) > 10.0:
        r_angle_1 = True
    if abs(r_angle_2_1 - r_angle_2_2) > 10.0:
        r_angle_2 = True
    if abs(r_angle_3_1 - r_angle_3_2) > 10.0:
        r_angle_3 = True
    
    return [l_angle_1, l_angle_2, l_angle_3, r_angle_1, r_angle_2, r_angle_3]

In [None]:
def frame_array(video, info):
    all_frames = []
    cap = cv2.VideoCapture(os.path.join('videos',video))
    fps = cap.get(cv2.CAP_PROP_FPS)
    dimensions = (int(cap.get(3)),int(cap.get(4)))
    for frame_idx in range(int(cap.get(cv2.CAP_PROP_FRAME_COUNT))):
        ret, frame = cap.read()
        """try:
            frame = cv2.resize(frame, (200,200))
        except:
            break"""
        all_frames.append(frame)
        if cv2.waitKey(10) & 0xFF == ord('q'):
            break

    # Close down everything
    if info:
        dimensions = (int(cap.get(3)),int(cap.get(4)))
        fps = cap.get(cv2.CAP_PROP_FPS)
        cap.release()
        cv2.destroyAllWindows()
        return all_frames, fps, dimensions
    else:
        return all_frames

In [None]:
def draw_keypoints(frame, keypoints, confidence_threshold):
    y, x, c = frame.shape
    shaped = np.squeeze(np.multiply(keypoints, [y, x, 1])) # confidence metric c, dont transform that
    
    for kp in shaped:
        ky, kx, kp_conf = kp
        if kp_conf > confidence_threshold:
            cv2.circle(frame, (int(kx), int(ky)), 4, (0,255,0), -1) #.circle creates circle, 4 is size, 255 is color green, -1 is fill circle

In [None]:
# nose, left eye, right eye, left ear, right ear, 
# left shoulder, right shoulder, left elbow, right elbow, 
# left wrist, right wrist, left hip, right hip, left knee, right knee, left ankle, right ankle

EDGES = {
    (5, 7): 'm',
    (7, 9): 'm',
    (6, 8): 'c',
    (8, 10): 'c',
    (5, 6): 'y',
    (5, 11): 'm',
    (6, 12): 'c',
    (11, 12): 'y',
    (11, 13): 'm',
    (13, 15): 'm',
    (12, 14): 'c',
    (14, 16): 'c'
}

In [None]:
def draw_connections(frame, keypoints, edges, confidence_threshold):
    y, x, c = frame.shape
    shaped = np.squeeze(np.multiply(keypoints, [y,x,1]))
    
    for edge, color in edges.items():
        p1, p2 = edge
        y1, x1, c1 = shaped[p1-5]
        y2, x2, c2 = shaped[p2-5]
        
        if (c1 > confidence_threshold) & (c2 > confidence_threshold):      
            cv2.line(frame, (int(x1), int(y1)), (int(x2), int(y2)), (0,0,255), 2)

In [None]:
# workout
# play 10 second timer
# play workout video
# follow along
# show similarity score in a popup next to it
# save similarity score and live video
# point out mistakes and differences

def workout(instruct_vid, speed): #, output_name
    # cap1 = cv2.VideoCapture(os.path.join('videos','10secCountdown.mp4'))
    # cap2 = cv2.VideoCapture(os.path.join('videos','jumpingjacks.mp4'))
    all_frames_vid1 = frame_array('10secCountdown.mp4', False)
    all_frames_vid2, fps2, dimensions2 = frame_array(instruct_vid, True)
    
    # show 10 sec countdown
    for frame1 in all_frames_vid1:
        cv2.imshow("Video Player", frame1)
        if cv2.waitKey(10) & 0xFF == ord('q'):
            break
    
    # show live video at same time as workout video
    cap_live = cv2.VideoCapture(0)
    fps_live = cap_live.get(cv2.CAP_PROP_FPS)
    dimensions_live = (int(cap_live.get(3)),int(cap_live.get(4)))
    """
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    fps = cap_live.get(cv2.CAP_PROP_FPS)
    video_writer = cv2.VideoWriter(os.path.join('videos','outputs',output_name), 
                                   fourcc, fps, (int(cap_live.get(3)),int(cap_live.get(4)))) """
    live_frames = [0] * len(all_frames_vid2)
    save_frame1 = []
    save_frame2 = []
    for frame_idx, frame2 in enumerate(all_frames_vid2):
        if (frame2 is None):
            break
        if cv2.waitKey(speed) & 0xFF == ord('q'):
            break
        cv2.imshow('Video Player', frame2)
        ret_live, frame_live = cap_live.read()
        live_frames[frame_idx] = frame_live
        if frame_idx == 50 or frame_idx == 100:
            save_frame1.append(frame2)
            save_frame2.append(frame_live)
        # video_writer.write(frame_live)    
    
    cap_live.release()
    cv2.destroyAllWindows()
    #return all_frames_vid2, live_frames, fps2, fps_live, dimensions2, dimensions_live
    return save_frame1, save_frame2

In [None]:
def two_frames_comparison(all_frames1, all_frames2, fps1, fps2, output1, output2, dimensions1, dimensions2):
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    video_writer1 = cv2.VideoWriter(os.path.join('videos','outputs',output1), 
                                   fourcc, fps1, dimensions1)
    video_writer2 = cv2.VideoWriter(os.path.join('videos','outputs',output2), 
                                   fourcc, fps2, dimensions2)
    angles = []
    frames = []
    for frame_idx, frame1 in enumerate(all_frames1):
        if frame_idx >= len(all_frames2):
            break
        frame2 = all_frames2[frame_idx]
        if (frame1 is None) or (frame2 is None):
            break
        img1 = frame1.copy()
        img1 = tf.image.resize_with_pad(tf.expand_dims(img1, axis = 0), 192, 192)
        input_image1 = tf.cast(img1, dtype=tf.float32)
        
        img2 = frame2.copy()
        img2 = tf.image.resize_with_pad(tf.expand_dims(img2, axis = 0), 192, 192)
        input_image2 = tf.cast(img2, dtype=tf.float32)

        input_details = interpreter.get_input_details()
        output_details = interpreter.get_output_details()

        interpreter.set_tensor(input_details[0]['index'], np.array(input_image1))
        interpreter.invoke()
        keypoints_with_scores1 = interpreter.get_tensor(output_details[0]['index'])

        interpreter.set_tensor(input_details[0]['index'], np.array(input_image2))
        interpreter.invoke()
        keypoints_with_scores2 = interpreter.get_tensor(output_details[0]['index'])

        wanted_scores1 = keypoints_with_scores1[0][0][5:]
        wanted_scores2 = keypoints_with_scores2[0][0][5:]
        # wanted_scores1 = which_keys(boolean_lst, kps)
        
        angle_comp = angle_comparison(wanted_scores1, wanted_scores2)
        angles.append(angle_comp)
        
        new_wanted_scores1, new_edges = which_keys(angle_comp, wanted_scores1)
        new_wanted_scores2, new_edges = which_keys(angle_comp, wanted_scores2)
        draw_keypoints(frame1, new_wanted_scores1, 0.6)
        
        draw_keypoints(frame2, new_wanted_scores2, 0.6)
        
        draw_connections(frame1, new_wanted_scores1, new_edges, 0.6)
        draw_connections(frame2, new_wanted_scores2, new_edges, 0.6)

        video_writer1.write(frame1) 
        video_writer2.write(frame2)
        
        if 100 <= len(angles) <= 105:
            frames.append(frame1)
            frames.append(frame2)

        if cv2.waitKey(10) & 0xFF==ord('q'):
            break
    cv2.destroyAllWindows()
    return frames

In [None]:
all_frames1, all_frames2, fps1, fps2, dimensions1, dimensions2 = workout("5 sec jumping jacks.mp4", 20)

In [None]:
frames = two_frames_comparison(all_frames1, all_frames2, fps1, fps1, "output1.mp4", "output2.mp4", dimensions1, dimensions2)

In [None]:
plt.imshow(cv2.cvtColor(frame1[1],cv2.COLOR_BGR2RGB))

In [None]:
plt.imshow(cv2.cvtColor(frame2[1],cv2.COLOR_BGR2RGB))

In [None]:
frame1, frame2 = workout("5 sec jumping jacks.mp4", 20)

In [None]:
all_frames_vid2, fps2, dimensions2 = frame_array("5 sec jumping jacks.mp4", True)

In [None]:
import threading

def display_video2(all_frames_vid2, speed):
    cv2.namedWindow('Video2', cv2.WINDOW_NORMAL)

    for frame_idx, frame2 in enumerate(all_frames_vid2):
        if frame2 is None:
            break
        cv2.imshow('Video2', frame2)
        if cv2.waitKey(speed) & 0xFF == ord('q'):
            break

def display_live_stream(cap_live, live_frames):
    while True:
        ret_live, frame_live = cap_live.read()
        if not ret_live:
            break

        # Process live frame if needed (e.g., for synchronization)
        # ...

        live_frames.append(frame_live)
        # cv2.imshow('Live Stream', live_frames[-1])  # Display latest live frame
        if cv2.waitKey(1) & 0xFF == ord('q'):  # Check for quit key every frame
            break

if __name__ == '__main__':
    # Read video2 frames

    # Initialize live video capture
    cap_live = cv2.VideoCapture(0)  # Replace 0 with video source ID if needed

    # Create a list to store live frames (optional for potential synchronization)
    live_frames = []

    # Create threads
    video2_thread = threading.Thread(target=display_video2, args=(all_frames_vid2, 1))
    live_stream_thread = threading.Thread(target=display_live_stream, args=(cap_live, live_frames))

    # Start threads
    video2_thread.start()
    live_stream_thread.start()

    # Wait for threads to finish
    video2_thread.join()
    live_stream_thread.join()

    # Release resources
    cap_live.release()
    cv2.destroyAllWindows()