In [1]:
import collections
import os
import time
from typing import Tuple, List
import pandas as pd

from pathlib import Path

import cv2
import numpy as np
from IPython import display
import openvino as ov
from openvino.runtime.ie_api import CompiledModel

import mediapipe as mp
mp_drawing = mp.solutions.drawing_utils
mp_pose = mp.solutions.pose
pose = mp_pose.Pose()

# Fetch `notebook_utils` module
import urllib.request
urllib.request.urlretrieve(
    url='https://raw.githubusercontent.com/openvinotoolkit/openvino_notebooks/main/notebooks/utils/notebook_utils.py',
    filename='notebook_utils.py'
)
import notebook_utils as utils




In [2]:
# A directory where the model will be downloaded.
base_model_dir = "model"
# The name of the model from Open Model Zoo.
model_name = "action-recognition-0001"
# Selected precision (FP32, FP16, FP16-INT8).
precision = "FP16"
model_path_decoder = (
    f"model/intel/{model_name}/{model_name}-decoder/{precision}/{model_name}-decoder.xml"
)
model_path_encoder = (
    f"model/intel/{model_name}/{model_name}-encoder/{precision}/{model_name}-encoder.xml"
)
encoder_url = f"https://storage.openvinotoolkit.org/repositories/open_model_zoo/temp/{model_name}/{model_name}-encoder/{precision}/{model_name}-encoder.xml"
decoder_url = f"https://storage.openvinotoolkit.org/repositories/open_model_zoo/temp/{model_name}/{model_name}-decoder/{precision}/{model_name}-decoder.xml"

if not os.path.exists(model_path_decoder):
    utils.download_ir_model(decoder_url, Path(model_path_decoder).parent)
if not os.path.exists(model_path_encoder):
    utils.download_ir_model(encoder_url, Path(model_path_encoder).parent)

In [3]:
# Download the text from the openvino_notebooks storage
vocab_file_path = utils.download_file(
    "https://storage.openvinotoolkit.org/repositories/openvino_notebooks/data/data/text/kinetics.txt",
    directory="data"
)

with vocab_file_path.open(mode='r') as f:
    labels = [line.strip() for line in f]

print(labels[0:9], np.shape(labels))

'data\kinetics.txt' already exists.
['abseiling', 'air drumming', 'answering questions', 'applauding', 'applying cream', 'archery', 'arm wrestling', 'arranging flowers', 'assembling computer'] (400,)


In [4]:
specified_list = ['clean and jerk','throwing ball','swinging legs','stretching leg','squat','situp','side kick',
                  'push up','pull ups','snatch weight lifting','lunge','exercising with an exercise ball',
                  'exercising arm','deadlifting','yoga','stretching arm']
labels = ['no exercise' if  all(spec not in label.lower() for spec in specified_list) else label for label in labels]
labels[255] 

'pull ups'

In [5]:
import ipywidgets as widgets

core = ov.Core()
device = widgets.Dropdown(
    options=core.available_devices + ["AUTO"],
    value='AUTO',
    description='Device:',
    disabled=False,
)

device

Dropdown(description='Device:', index=2, options=('CPU', 'GPU', 'AUTO'), value='AUTO')

In [7]:
# Initialize OpenVINO Runtime.
core = ov.Core()


def model_init(model_path: str, device: str) -> Tuple:
    """
    Read the network and weights from a file, load the
    model on CPU and get input and output names of nodes

    :param:
            model: model architecture path *.xml
            device: inference device
    :retuns:
            compiled_model: Compiled model
            input_key: Input node for model
            output_key: Output node for model
    """

    # Read the network and corresponding weights from a file.
    model = core.read_model(model=model_path)
    # Compile the model for specified device.
    compiled_model = core.compile_model(model=model, device_name=device)
    # Get input and output names of nodes.
    input_keys = compiled_model.input(0)
    output_keys = compiled_model.output(0)
    return input_keys, output_keys, compiled_model

In [8]:
#Calculate Angles of Joints
def calculate_angle(a,b,c):
    a = np.array(a) # First
    b = np.array(b) # Mid
    c = np.array(c) # End
    
    radians = np.arctan2(c[1]-b[1], c[0]-b[0]) - np.arctan2(a[1]-b[1], a[0]-b[0])
    angle = np.abs(radians*180.0/np.pi)
    
    if angle >180.0:
        angle = 360-angle
        
    return angle 

In [9]:
joints_dictionary = {'push up': [['LEFT_SHOULDER','LEFT_ELBOW','LEFT_WRIST'], ['RIGHT_SHOULDER','RIGHT_ELBOW','RIGHT_WRIST'],[155, 90]],
                     'pull ups': [['LEFT_SHOULDER','LEFT_ELBOW','LEFT_WRIST'], ['RIGHT_SHOULDER','RIGHT_ELBOW','RIGHT_WRIST'],[155, 80]],
                     'situp': [['LEFT_SHOULDER','LEFT_HIP','LEFT_ANKLE'], ['RIGHT_SHOULDER','RIGHT_HIP','RIGHT_ANKLE'],[160, 130]],
                     'squat': [['LEFT_SHOULDER','LEFT_HIP','LEFT_KNEE'], ['RIGHT_SHOULDER','RIGHT_HIP','RIGHT_KNEE'],[155, 110]],
                     'snatch weight lifting': [['LEFT_WRIST','LEFT_SHOULDER','LEFT_ANKLE'], ['RIGHT_WRIST','RIGHT_SHOULDER', 'RIGHT_ANKLE'],[155, 60]],
                     'burpee': [['LEFT_HIP','LEFT_KNEE', 'LEFT_ANKLE'], ['RIGHT_HIP','RIGHT_KNEE', 'RIGHT_ANKLE'],[155, 90]]
                     }
#Returns the 3 angles that constitute a side of the body
def initialize_joints(side_list, landmarks):
    joint_coordinates = []

    for joint_name in side_list:
    # Get the index of the joint name in mp_pose.PoseLandmark enum
        joint_index = getattr(mp_pose.PoseLandmark, joint_name).value
        # Extract the x and y coordinates of the joint and append them to joint_coordinates
        joint_x = landmarks[joint_index].x
        joint_y = landmarks[joint_index].y
        joint_coordinates.append([joint_x, joint_y])

    return joint_coordinates[0], joint_coordinates[1], joint_coordinates[2]

# Manual finder of exercise
def print_actual_exercise(exercie_results_df):
    label_probabilities = dict(zip(exercie_results_df['label'], exercie_results_df['probability']))

    situp = label_probabilities.get('situp', None)
    exercising_with_an_exercise_ball = label_probabilities.get('exercising with an exercise ball', None)
    throwing_ball = label_probabilities.get('throwing ball', None)
    stretching_leg = label_probabilities.get('stretching leg', None)

    squat = label_probabilities.get('squat', None)
    lunge = label_probabilities.get('lunge', None)
    snatch_weight_lifting = label_probabilities.get('snatch weight lifting', None)
    clean_and_jerk = label_probabilities.get('clean and jerk', None)

    deadlifting = label_probabilities.get('deadlifting', None)
    push_up = label_probabilities.get('push up', None)
    exercising_arm = label_probabilities.get('exercising arm', None)
    swinging_legs = label_probabilities.get('swinging legs', None)

    stretching_arm = label_probabilities.get('stretching arm', None)
    side_kick = label_probabilities.get('side kick', None)
    pull_ups = label_probabilities.get('pull ups', None)
    yoga = label_probabilities.get('yoga', None)

    current_exercise = 'no exercise'
    base_situp = situp + exercising_with_an_exercise_ball + throwing_ball + stretching_leg + yoga
    base_squad = squat + lunge + snatch_weight_lifting + situp
    base_snatch = snatch_weight_lifting + lunge + clean_and_jerk + deadlifting + squat
    base_pushup = push_up + exercising_arm + stretching_leg + swinging_legs + stretching_arm + side_kick + exercising_with_an_exercise_ball
    base_burpee = push_up + squat + exercising_arm + situp + lunge + throwing_ball

    if pull_ups >= 0.8:
        current_exercise = 'pull ups'
    #    print(f'Recognized exercie: {current_exercise} with {pull_ups}')
    elif situp >= 0.22:
        if base_situp >= 0.6:
            current_exercise = 'situp'
    #        print(f'Recognized exercie: {current_exercise} with {base_situp}')
    elif squat >= 0.4:
        if base_squad >= 0.7 and snatch_weight_lifting < 0.1:
            current_exercise = 'squat'
    #        print(f'Recognized exercie: {current_exercise} with {base_squad}')
    elif snatch_weight_lifting + clean_and_jerk + deadlifting > 0.15:
        if  base_snatch > 0.5:
            current_exercise = 'snatch weight lifting'
    #        print(f'Recognized exercie: {current_exercise} with {base_snatch}')
    elif push_up > 0.35 or stretching_leg > 0.6:
        if base_pushup > 0.7:
            current_exercise = 'push up'
    #        print(f'Recognized exercie: {current_exercise} with {base_pushup}')
    elif push_up < 0.25 and squat < 0.25 and situp < 0.25 and  base_burpee > 0.6:
        current_exercise = 'burpee'
        print(f'Recognized exercie: {current_exercise} with {base_pushup}')
    #print(f'Recognized exercie: {current_exercise}')
    return current_exercise

In [10]:
# Encoder initialization
input_key_en, output_keys_en, compiled_model_en = model_init(model_path_encoder, device.value)
# Decoder initialization
input_key_de, output_keys_de, compiled_model_de = model_init(model_path_decoder, device.value)

# Get input size - Encoder.
height_en, width_en = list(input_key_en.shape)[2:]
# Get input size - Decoder.
frames2decode = list(input_key_de.shape)[0:][1]

In [11]:
def center_crop(frame: np.ndarray) -> np.ndarray:
    """
    Center crop squared the original frame to standardize the input image to the encoder model

    :param frame: input frame
    :returns: center-crop-squared frame
    """    
    img_h, img_w, _ = frame.shape
    min_dim = min(img_h, img_w)
    start_x = int((img_w - min_dim) / 2.0)
    start_y = int((img_h - min_dim) / 2.0)
    roi = [start_y, (start_y + min_dim), start_x, (start_x + min_dim)]
    return frame[start_y : (start_y + min_dim), start_x : (start_x + min_dim), ...], roi


#Will crop and center the image on the identified body.
def center_body (frame, thres = 0.05):
    image_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    results = pose.process(image_rgb)
    landmarks = results.pose_landmarks.landmark
    landmarks = [[lm.x, lm.y] for lm in landmarks]
    landmarks = np.array(landmarks)

    #Get Landmark X and Y
    x_coords = landmarks[:, 0]
    y_coords = landmarks[:, 1]

    # Find min and max x and y coordinates
    min_x = round(max(np.amin(x_coords) - thres, 0) * frame.shape[1])
    min_y = round(max(np.amin(y_coords) - thres, 0) * frame.shape[0])
    max_x = round(min(np.amax(x_coords) + thres, 1) * frame.shape[1])
    max_y = round(min(np.amax(y_coords) + thres, 1) * frame.shape[0])

    # Create width and height.
    w = max_x - min_x
    h = max_y - min_y

    # Determine size of the square frame
    size = max(w, h)

    # Calculate center of the bounding box
    center_x = min_x + w // 2
    center_y = min_y + h // 2

    # Calculate coordinates for cropping
    start_x = max(0, center_x - size // 2)
    start_y = max(0, center_y - size // 2)
    end_x = min(frame.shape[1], start_x + size)
    end_y = min(frame.shape[0], start_y + size)

    roi = [start_y, (start_y + end_y), start_x, (start_x + end_x)]
    cropped_image = frame[start_y:end_y, start_x:end_x]
    return cropped_image, roi


def adaptive_resize(frame: np.ndarray, size: int) -> np.ndarray:
    """
     The frame going to be resized to have a height of size or a width of size

    :param frame: input frame
    :param size: input size to encoder model
    :returns: resized frame, np.array type
    """
    h, w, _ = frame.shape
    scale = size / min(h, w)
    w_scaled, h_scaled = int(w * scale), int(h * scale)
    if w_scaled == w and h_scaled == h:
        return frame
    #return cv2.resize(frame, (w_scaled, h_scaled))
    return cv2.resize(frame, (size, size))


def decode_output(probs: np.ndarray, labels: np.ndarray, top_k: int = 3) -> np.ndarray:
    """
    Decodes top probabilities into corresponding label names

    :param probs: confidence vector for 400 actions
    :param labels: list of actions
    :param top_k: The k most probable positions in the list of labels
    :returns: decoded_labels: The k most probable actions from the labels list
              decoded_top_probs: confidence for the k most probable actions

    top_ind = np.argsort(-1 * probs)[:top_k]
    out_label = np.array(labels)[top_ind.astype(int)]
    decoded_labels = [out_label[0][0], out_label[0][1], out_label[0][2]]
    top_probs = np.array(probs)[0][top_ind.astype(int)]
    decoded_top_probs = [top_probs[0][0], top_probs[0][1], top_probs[0][2]]

    """
    # Step 1: Create a DataFrame with columns 'label' and 'probability'
    df = pd.DataFrame({'label': labels, 'probability': probs[0]})
    # Step 3: Group by 'label' with the sum of probabilities
    grouped_df = df.groupby('label')['probability'].sum().reset_index()
    current_exercise = print_actual_exercise(grouped_df)

    # Step 4: Get the top k results
    sorted_df = grouped_df.sort_values(by='probability', ascending=False).head(top_k)
    
    # Get decoded labels and probabilities
    decoded_labels = sorted_df['label'].tolist()
    decoded_top_probs = sorted_df['probability'].tolist()

    return decoded_labels, decoded_top_probs, current_exercise


def rec_frame_display(frame: np.ndarray, roi) -> np.ndarray:
    """
    Draw a rec frame over actual frame

    :param frame: input frame
    :param roi: Region of interest, image section processed by the Encoder
    :returns: frame with drawed shape

    """

    cv2.line(frame, (roi[2] + 3, roi[0] + 3), (roi[2] + 3, roi[0] + 100), (0, 200, 0), 2)
    cv2.line(frame, (roi[2] + 3, roi[0] + 3), (roi[2] + 100, roi[0] + 3), (0, 200, 0), 2)
    cv2.line(frame, (roi[3] - 3, roi[1] - 3), (roi[3] - 3, roi[1] - 100), (0, 200, 0), 2)
    cv2.line(frame, (roi[3] - 3, roi[1] - 3), (roi[3] - 100, roi[1] - 3), (0, 200, 0), 2)
    cv2.line(frame, (roi[3] - 3, roi[0] + 3), (roi[3] - 3, roi[0] + 100), (0, 200, 0), 2)
    cv2.line(frame, (roi[3] - 3, roi[0] + 3), (roi[3] - 100, roi[0] + 3), (0, 200, 0), 2)
    cv2.line(frame, (roi[2] + 3, roi[1] - 3), (roi[2] + 3, roi[1] - 100), (0, 200, 0), 2)
    cv2.line(frame, (roi[2] + 3, roi[1] - 3), (roi[2] + 100, roi[1] - 3), (0, 200, 0), 2)
    # Write ROI over actual frame
    FONT_STYLE = cv2.FONT_HERSHEY_SIMPLEX
    org = (roi[2] + 3, roi[1] - 3)
    org2 = (roi[2] + 2, roi[1] - 2)
    FONT_SIZE = 0.5
    FONT_COLOR = (0, 200, 0)
    FONT_COLOR2 = (0, 0, 0)
    cv2.putText(frame, "ROI", org2, FONT_STYLE, FONT_SIZE, FONT_COLOR2)
    cv2.putText(frame, "ROI", org, FONT_STYLE, FONT_SIZE, FONT_COLOR)
    return frame


def display_text_fnc(frame: np.ndarray, display_text: str, index: int):
    """
    Include a text on the analyzed frame

    :param frame: input frame
    :param display_text: text to add on the frame
    :param index: index line dor adding text

    """
    # Configuration for displaying images with text.
    FONT_COLOR = (0, 255, 0)
    FONT_COLOR2 = (0, 0, 0)
    FONT_STYLE = cv2.FONT_HERSHEY_DUPLEX
    FONT_SIZE = 1
    TEXT_VERTICAL_INTERVAL = 25
    TEXT_LEFT_MARGIN = 15
    # ROI over actual frame
    #(processed, roi) = center_crop(frame)
    # Draw a ROI over actual frame.
    #frame = rec_frame_display(frame, roi)
    # Put a text over actual frame.
    text_loc = (TEXT_LEFT_MARGIN, TEXT_VERTICAL_INTERVAL * (index + 1))
    text_loc2 = (TEXT_LEFT_MARGIN + 1, TEXT_VERTICAL_INTERVAL * (index + 1) + 1)
    cv2.putText(frame, display_text, text_loc2, FONT_STYLE, FONT_SIZE, FONT_COLOR2, 2)
    cv2.putText(frame, display_text, text_loc, FONT_STYLE, FONT_SIZE, FONT_COLOR, 2)

In [12]:
def preprocessing(frame: np.ndarray, size: int) -> np.ndarray:
    """
    Preparing frame before Encoder.
    The image should be scaled to its shortest dimension at "size"
    and cropped, centered, and squared so that both width and
    height have lengths "size". The frame must be transposed from
    Height-Width-Channels (HWC) to Channels-Height-Width (CHW).

    :param frame: input frame
    :param size: input size to encoder model
    :returns: resized and cropped frame
    """
    try:
        (preprocessed, roi) = center_body(frame)
        preprocessed = adaptive_resize(preprocessed, size)
    #print('prepro_body: ', preprocessed.shape)
    except:
        # Adaptative resize
        preprocessed = adaptive_resize(frame, size)
        # Center_crop
        (preprocessed, roi) = center_crop(preprocessed)
        
    # Transpose frame HWC -> CHW
    preprocessed = preprocessed.transpose((2, 0, 1))[None,]  # HWC -> CHW
    return preprocessed, roi


def encoder(
    preprocessed: np.ndarray,
    compiled_model: CompiledModel
) -> List:
    """
    Encoder Inference per frame. This function calls the network previously
    configured for the encoder model (compiled_model), extracts the data
    from the output node, and appends it in an array to be used by the decoder.

    :param: preprocessed: preprocessing frame
    :param: compiled_model: Encoder model network
    :returns: encoder_output: embedding layer that is appended with each arriving frame
    """
    output_key_en = compiled_model.output(0)

    # Get results on action-recognition-0001-encoder model
    infer_result_encoder = compiled_model([preprocessed])[output_key_en]
    return infer_result_encoder


def decoder(encoder_output: List, compiled_model_de: CompiledModel) -> List:
    """
    Decoder inference per set of frames. This function concatenates the embedding layer
    froms the encoder output, transpose the array to match with the decoder input size.
    Calls the network previously configured for the decoder model (compiled_model_de), extracts
    the logits and normalize those to get confidence values along specified axis.
    Decodes top probabilities into corresponding label names

    :param: encoder_output: embedding layer for 16 frames
    :param: compiled_model_de: Decoder model network
    :returns: decoded_labels: The k most probable actions from the labels list
              decoded_top_probs: confidence for the k most probable actions
    """
    # Concatenate sample_duration frames in just one array
    decoder_input = np.concatenate(encoder_output, axis=0)
    # Organize input shape vector to the Decoder (shape: [1x16x512]]
    decoder_input = decoder_input.transpose((2, 0, 1, 3))
    decoder_input = np.squeeze(decoder_input, axis=3)
    output_key_de = compiled_model_de.output(0)
    # Get results on action-recognition-0001-decoder model
    result_de = compiled_model_de([decoder_input])[output_key_de]
    # Normalize logits to get confidence values along specified axis
    probs = softmax(result_de - np.max(result_de))
    df = pd.DataFrame({'label': labels, 'probability': probs[0]})
    grouped_df = df.groupby('label')['probability'].sum().reset_index()
    sorted_df = grouped_df.sort_values(by='probability', ascending=False)
    #print(sorted_df.head(10))
    # Decodes top probabilities into corresponding label names
    decoded_labels, decoded_top_probs, current_exercise = decode_output(probs, labels, top_k=3)
    return decoded_labels, decoded_top_probs, current_exercise


def softmax(x: np.ndarray) -> np.ndarray:
    """
    Normalizes logits to get confidence values along specified axis
    x: np.array, axis=None
    """
    exp = np.exp(x)
    return exp / np.sum(exp, axis=None)
    

In [30]:
def run_action_recognition(
    source: str = "0",
    flip: bool = True,
    use_popup: bool = False,
    compiled_model_en: CompiledModel = compiled_model_en,
    compiled_model_de: CompiledModel = compiled_model_de,
    skip_first_frames: int = 0,
):
    size = height_en  # Encoder requiered size
    sample_duration = frames2decode  # Number of frames that decoder needs
    # Select FPS source.
    fps = 20
    player = None
    exercise_dict = {} #Store repetitions per exercise on the video
    record_video = {'no exercise':[]}

    #Mediapipe Pose Detection
    with mp_pose.Pose(min_detection_confidence=0.5, min_tracking_confidence=0.5) as pose:
    
        try:
            # Create a video player.
            player = utils.VideoPlayer(source, flip=flip, fps=fps, skip_first_frames=skip_first_frames)
            # Start capturing.
            player.start()
            if use_popup:
                title = "Press ESC to Exit"
                cv2.namedWindow(title, cv2.WINDOW_GUI_NORMAL | cv2.WINDOW_AUTOSIZE)

            processing_times = collections.deque()
            processing_time = 0
            encoder_output = []
            decoded_labels = ['no exercise', 0, 0]
            actual_exercise = 'no exercise'
            decoded_top_probs = [0, 0, 0]
            counter = 0
            stage = None    #Stage inside a Cycle. Extension or Contraction


            # Create a text template to show inference results over video.
            text_inference_template = "Infer Time:{Time:.1f}ms,{fps:.1f}FPS"
            text_template = "{label},{conf:.2f}%"

            while True:
                counter = counter + 1

                # Read a frame from the video stream.
                frame = player.next()
                if frame is None:
                    print("Source ended")
                    break

                scale = 1280 / max(frame.shape)


                ####### Define Current Exercise Probabilities
                if counter % 2 == 0:
                    # Preprocess frame before Encoder.
                    (preprocessed, _) = preprocessing(frame, size)
                    #record_video['no exercise'].append(preprocessed)

                    # Measure processing time.
                    start_time = time.time()

                    # Encoder Inference per frame
                    encoder_output.append(encoder(preprocessed, compiled_model_en))

                    # Decoder inference per set of frames
                    # Wait for sample duration to work with decoder model.
                    if len(encoder_output) == sample_duration:
                        decoded_labels, decoded_top_probs, actual_exercise = decoder(encoder_output, compiled_model_de)
                        encoder_output = []

                        if actual_exercise != 'no exercise':
                            print(actual_exercise)

                    # Inference has finished. Display the results.
                    stop_time = time.time()

                    # Calculate processing time.
                    processing_times.append(stop_time - start_time)

                    # Use processing times from last 200 frames.
                    if len(processing_times) > 200:
                        processing_times.popleft()

                    # Mean processing time [ms]
                    processing_time = np.mean(processing_times) * 1000
                    fps = 1000 / processing_time

                ####### Define exercise repetitions 
                if actual_exercise == 'no exercise':
                        count_ext = 0
                        count_con = 0

                elif actual_exercise != 'no exercise':
                    # Recolor image to RGB
                    image = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
                    image.flags.writeable = False
                
                    # Make detection
                    results = pose.process(image)
                
                    # Recolor back to BGR
                    image.flags.writeable = True
                    image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)

                    try:
                        landmarks = results.pose_landmarks.landmark
                        
                        # Get coordinates
                        A,B,C = initialize_joints(joints_dictionary[actual_exercise][0],landmarks)
                        D,E,F = initialize_joints(joints_dictionary[actual_exercise][1],landmarks)
                        
                        # Calculate angle
                        angle_l = calculate_angle(A, B, C)
                        angle_r = calculate_angle(D, E, F)
                        #print(f'angle izq: {angle_l}  -  angle der: {angle_r}')

                        AD = [(A[0] + D[0]) / 2, (A[1] + D[1]) / 2]
                        BE = [(B[0] + E[0]) / 2, (B[1] + E[1]) / 2]
                        CF = [(C[0] + F[0]) / 2, (C[1] + F[1]) / 2]
                        angle_mid = calculate_angle(AD, BE, CF)
                        #print(f'angle izq: {angle_l}  -  angle der: {angle_r}  -  angle_mid: {angle_mid}')

                        
                        #Special Case for Snatch (One Arm snatch)
                        if actual_exercise == 'snatch weight lifting':
                        
                            #Extension
                            if angle_mid > joints_dictionary[actual_exercise][2][0] and stage != "extension":
                                stage = "extension"
                                count_ext = 1
                                #print(f'stage: {stage}  -  angle mid: {angle_mid}')
                            
                            #Contraction
                            if angle_mid < joints_dictionary[actual_exercise][2][1] and stage !='contraction':
                                stage="contraction"
                                count_con = 1
                                #print(f'stage: {stage}  -  angle mid: {angle_mid}')

                        #Special Case for Situp (MiddlePoint)
                        elif actual_exercise == 'situp':
                        
                            #Extension
                            if angle_mid > joints_dictionary[actual_exercise][2][0] and stage != "extension":
                                stage = "extension"
                                count_ext = 1
                                #print(f'stage: {stage}  -  angle mid: {angle_mid}')
                            
                            #Contraction
                            if angle_mid < joints_dictionary[actual_exercise][2][1] and stage !='contraction':
                                stage="contraction"
                                count_con = 1
                                #print(f'stage: {stage}  -  angle mid: {angle_mid}')
                        
                        else:

                            #Extension
                            if angle_l > joints_dictionary[actual_exercise][2][0] and angle_r > joints_dictionary[actual_exercise][2][0] and stage != "extension":
                                stage = "extension"
                                count_ext = 1
                                #print(f'stage: {stage}  -  angulo izq: {angle_l}  -  angulo der: {angle_r}')
                            
                            #Contraction
                            if angle_l < joints_dictionary[actual_exercise][2][1] and angle_r < joints_dictionary[actual_exercise][2][1] and stage !='contraction':
                                stage="contraction"
                                count_con = 1
                                #print(f'stage: {stage}  -  angulo izq: {angle_l}  -  angulo der: {angle_r}')

                        #Complete Cycle, Add 1 To counter
                        if count_ext + count_con == 2:
                            count_ext = 0 
                            count_con = 0
                            if actual_exercise not in exercise_dict:
                                exercise_dict[actual_exercise] = 1
                            else:
                                exercise_dict[actual_exercise] += 1
                            print(exercise_dict)

                    except:
                        pass

                # Adaptative resize for visualization.
                if scale < 1:
                    frame = cv2.resize(frame, None, fx=scale, fy=scale, interpolation=cv2.INTER_AREA)


                '''
                # Visualize the results of the 400 activities
                for i in range(0, 3):
                    display_text = text_template.format(
                        label=decoded_labels[i],
                        conf=decoded_top_probs[i] * 100,
                    )
                    display_text_fnc(frame, display_text, i)
                
                display_text = text_inference_template.format(Time=processing_time, fps=fps)
                display_text_fnc(frame, display_text, 3)

                '''

                # Results for the customized results
                display_text_fnc(frame, f'Current Exercise: {actual_exercise}', 0)
                display_text_fnc(frame, 'REPETITIONS:', 1)
                
                print_count = 0
                for exer , reps in exercise_dict.items():
                    display_text_fnc(frame, f'{exer} : {reps}', print_count + 2)
                    print_count += 1
                
                record_video['no exercise'].append(frame)
                

                if use_popup:
                    cv2.imshow(title, frame)
                    key = cv2.waitKey(1)
                    # escape = 27
                    if key == 27:
                        break
                else:
                    _, encoded_img = cv2.imencode(".jpg", frame, params=[cv2.IMWRITE_JPEG_QUALITY, 90])
                    i = display.Image(data=encoded_img)
                    display.clear_output(wait=True)
                    display.display(i)

        except KeyboardInterrupt:
            print("Interrupted")
        except RuntimeError as e:
            print(e)
        finally:
            if player is not None:
                # Stop capturing.
                player.stop()
            if use_popup:
                cv2.destroyAllWindows()
    return exercise_dict, record_video

In [29]:
USE_WEBCAM = True

cam_id = 0
#video_file = "video_data/pull_up/pull_up_1.mp4"
video_file = "video_data/sit_up/sit_up_2.mp4"

source = cam_id if USE_WEBCAM else video_file
additional_options = {"skip_first_frames": 0, "flip": False} if not USE_WEBCAM else {"flip": True}
exercise_dict, record = run_action_recognition(source=source, use_popup=True, **additional_options)

In [27]:
for index, row in exercie_results.iterrows():
    # Replace space with underscore (_) in label to create valid variable name
    label_name = row['label'].replace(' ', '_')
    # Create variables dynamically using the modified label as the variable name
    vars()[label_name] = row['probability']
    print(label_name)

NameError: name 'exercie_results' is not defined

## Record video

In [19]:
record['no exercise'][0].shape

(480, 640, 3)

In [25]:
import cv2
import numpy as np
fps = 16  # Adjust the frame rate as needed
output_video_path = "detect/try_2.mp4"
#wat, channels, height, width = record['no exercise'][0].shape
height, width, channels = record['no exercise'][0].shape
video_writer = cv2.VideoWriter(output_video_path, cv2.VideoWriter_fourcc(*'mp4v'), fps, (width, height))

for frame in record['no exercise']:
    # Ensure the frame data type is uint8
    #frame = frame.reshape(3, 224, 224).transpose(1, 2, 0).astype('uint8')
    # Write the frame to the video
    video_writer.write(frame)

video_writer.release()

print("Video created successfully:", output_video_path)

Video created successfully: detect/try_1.mp4


In [116]:
wat, channels, height, width = record['no exercise'][0].shape
print(height)
print(width)
print(channels)


224
224
3


# Create a database of vectors

In [397]:
def create_vector_pack(
    source: str = "0",
    flip: bool = True,
    use_popup: bool = False,
    compiled_model_en: CompiledModel = compiled_model_en,
    skip_first_frames: int = 0,
):
    """
    Use the "source" webcam or video file to run the complete pipeline for action-recognition problem
    1. Create a video player to play with target fps
    2. Prepare a set of frames to be encoded-decoded
    3. Preprocess frame before Encoder
    4. Encoder Inference per frame
    5. Decoder inference per set of frames
    6. Visualize the results

    :param: source: webcam "0" or video path
    :param: flip: to be used by VideoPlayer function for flipping capture image
    :param: use_popup: False for showing encoded frames over this notebook, True for creating a popup window.
    :param: skip_first_frames: Number of frames to skip at the beginning of the video.
    :returns: display video over the notebook or in a popup window

    """
    size = height_en  # Endoder input size - From Cell 5_9
    sample_duration = frames2decode  # Decoder input size - From Cell 5_7

    # Select frames per second of your source.
    fps = 30
    player = None

    #List of 16-pack vectors
    vector_pack = []

    #Mediapipe Pose Detection
    with mp_pose.Pose(min_detection_confidence=0.5, min_tracking_confidence=0.5) as pose:
    
        try:
            # Create a video player.
            player = utils.VideoPlayer(source, flip=flip, fps=fps, skip_first_frames=skip_first_frames)
            # Start capturing.
            player.start()
            if use_popup:
                title = "Press ESC to Exit"
                cv2.namedWindow(title, cv2.WINDOW_GUI_NORMAL | cv2.WINDOW_AUTOSIZE)

            encoder_output = []
            counter = 0


            while True:
                counter = counter + 1

                # Read a frame from the video stream.
                frame = player.next()
                if frame is None:
                    print("Source ended")
                    break

                if counter % 2 == 0:
                    # Preprocess frame before Encoder.
                    (preprocessed, _) = preprocessing(frame, size)

                    # Encoder Inference per frame
                    encoder_output.append(encoder(preprocessed, compiled_model_en))
                    

                    if len(encoder_output) == sample_duration:
                        vector_pack.append(encoder_output)
                        encoder_output = []

        # ctrl-c
        except KeyboardInterrupt:
            print("Interrupted")
        # Any different error
        except RuntimeError as e:
            print(e)
        finally:
            if player is not None:
                # Stop capturing.
                player.stop()
            if use_popup:
                cv2.destroyAllWindows()
    return vector_pack

In [15]:
USE_WEBCAM = False

cam_id = 0
video_file = "video_data/pull_up/pull_up_1.mp4"

source = cam_id if USE_WEBCAM else video_file
additional_options = {"skip_first_frames": 0, "flip": False} if not USE_WEBCAM else {"flip": True}
vector_pack = create_vector_pack(source=source, use_popup=True, **additional_options)

NameError: name 'create_vector_pack' is not defined

In [253]:
def unpack_vector(vector_pack):
    data_list = []

    for pack in vector_pack:
        pack_input = np.concatenate(pack, axis=0)
        pack_input = pack_input.transpose((2, 0, 1, 3))
        pack_input = np.squeeze(pack_input, axis=3)
        data_list.append(pack_input[0])

    return data_list

In [None]:
# Define the directory containing the videos
root_dir = 'video_data'

# Initialize dictionary to store lists of big arrays
data_dict = {}

# Traverse through the directory structure
for subdir, dirs, files in os.walk(root_dir):
    # Check if the current sub-directory contains videos
    video_files = [file for file in files if file.endswith(('.mp4', '.avi', '.mov'))]
    if video_files:
        # Process each video and generate lists of big arrays
        for video_file in video_files:
            video_path = os.path.join(subdir, video_file)
            video_name = os.path.splitext(video_file)[0]  # Extract video name without extension
            print(f'Loading video {video_path}...')
            vector_pack = create_vector_pack(source=source, use_popup=False, **additional_options)
            print('Vector pack created! Concatenating Vectors...')
            big_arrays = unpack_vector(vector_pack)
            print(f'Succeed. Saving with label {subdir}')
            # Store lists of big arrays in dictionary with folder name as key
            label = os.path.basename(subdir)
            data_dict[(label, video_name)] = big_arrays
# Save all lists of big arrays into a single .npz file
npz_file_path = "vecto_database_video.npz"
np.savez(npz_file_path, **{str(key): value for key, value in data_dict.items()})
print(f"Saved big arrays to {npz_file_path}")

## Try without video name

In [251]:
# Load data from .npz file
npz_file_path = "vecto_database.npz"
loaded_data = np.load(npz_file_path)

# Initialize lists to store label and big array data
labels = []
big_arrays = []

# Iterate through loaded data and extract label and big array
for label, data_array in loaded_data.items():
    labels.extend([label] * len(data_array))
    big_arrays.extend(data_array)

# Create DataFrame
df = pd.DataFrame({'label': labels, 'big_array': big_arrays})

# Display DataFrame
df

Unnamed: 0,label,big_array
0,burpee,"[[0.181176, 0.08002925, 0.16622949, 0.01320751..."
1,burpee,"[[0.3144227, 0.028594825, 0.1010986, 0.0578447..."
2,burpee,"[[0.2852033, 0.26921174, 0.17491572, 0.0464213..."
3,burpee,"[[0.17331505, 0.037380286, 0.08150157, 0.02385..."
4,burpee,"[[0.1418101, 0.14380236, 0.030094568, 0.019460..."
...,...,...
161,sit_up,"[[0.17680885, 0.06596319, 0.040142085, 0.05749..."
162,sit_up,"[[0.11307282, 0.26461148, 0.044432614, 0.01717..."
163,sit_up,"[[0.22903247, 0.4397676, 0.04068598, 0.0053695..."
164,sit_up,"[[0.20696819, 0.071437016, 0.027337344, 0.0001..."


## Try with video name

In [342]:
npz_file_path = "vecto_database_video.npz"
loaded_data = np.load(npz_file_path, allow_pickle=True)

# Initialize lists to store folder names, video names, and big arrays
folder_names = []
#video_names = []
big_arrays = []

# Iterate through loaded data and extract folder names, video names, and big arrays
for label, data in loaded_data.items():
    for video_info in data:
        folder_names.append(label)
        #video_names.append(video_info[0])  # Extracting video name from the first element of video_info
        big_arrays.append(video_info)   # Extracting big array from the second element of video_info

# Create DataFrame
df_v = pd.DataFrame({'label': folder_names, 'big_array': big_arrays})

# Display DataFrame
df_v

Unnamed: 0,label,big_array
0,"('burpee', 'burpee_1')","[[0.25409243, 0.042802602, 0.16231276, 0.02132..."
1,"('burpee', 'burpee_1')","[[0.34191197, 0.008533829, 0.076149285, 0.0106..."
2,"('burpee', 'burpee_1')","[[0.31739214, 0.12699544, 0.16563262, 0.048489..."
3,"('burpee', 'burpee_1')","[[0.20254678, 0.05345901, 0.0992722, 0.0130285..."
4,"('burpee', 'burpee_1')","[[0.124611795, 0.20127398, 0.05477627, 0.0, 0...."
...,...,...
151,"('sit_up', 'sit_up_5')","[[0.15490694, 0.12911242, 0.06561741, 0.003459..."
152,"('sit_up', 'sit_up_5')","[[0.16544935, 0.045581326, 0.031468082, 0.0125..."
153,"('sit_up', 'sit_up_5')","[[0.28516218, 0.15715006, 0.11490148, 0.029659..."
154,"('sit_up', 'sit_up_5')","[[0.1288017, 0.19007759, 0.087020025, 0.008638..."


## Test this thing

In [401]:
test_vector_pack = create_vector_pack(source='video_data/burpee/burpee_1.mp4', use_popup=False, **additional_options)
test_vector_pack_1 = create_vector_pack(source='video_data/burpee/burpee_1.mp4', use_popup=False, **additional_options)

Source ended
Source ended


In [None]:
test_vector_unpack = unpack_vector(test_vector_pack)
test_vector_unpack_1 = unpack_vector(test_vector_pack_1)
print(len(test_vector_unpack),len(test_vector_unpack_1))
test_vector_unpack

In [None]:
test_vector_unpack_1

In [404]:
for i in range(len(test_vector_unpack)):
    list = []
    for j in range(len(test_vector_unpack[0])):
        value = cosine_similarity(test_vector_unpack_1[i][j],test_vector_unpack[i][j])
        #print(f'pack_vector {i+1}: {value}')
        list.append(value)
    total = sum(list)/16
    #print(list)
    print(f'total_pack_vector {i+1}: {total}')

total_pack_vector 1: 0.8925144523382187
total_pack_vector 2: 0.9208042696118355
total_pack_vector 3: 0.9636142700910568
total_pack_vector 4: 0.9319534674286842
total_pack_vector 5: 0.9395167827606201
total_pack_vector 6: 0.8922161310911179
total_pack_vector 7: 0.8287025056779385
total_pack_vector 8: 0.8615151457488537
total_pack_vector 9: 0.805098831653595
total_pack_vector 10: 0.8617491126060486
total_pack_vector 11: 0.8282929845154285
total_pack_vector 12: 0.7571482695639133
total_pack_vector 13: 0.7663170322775841
total_pack_vector 14: 0.7874786108732224
total_pack_vector 15: 0.7756083495914936
total_pack_vector 16: 0.7936922423541546
total_pack_vector 17: 0.8218375742435455
total_pack_vector 18: 0.7935625314712524
total_pack_vector 19: 0.76043919660151
total_pack_vector 20: 0.7768196668475866
total_pack_vector 21: 0.7550389133393764


IndexError: list index out of range

In [405]:
# Function to compute Euclidean distance between two vectors
def cosine_similarity(vec1, vec2):
    dot_product = np.dot(vec1, vec2)
    norm_vec1 = np.linalg.norm(vec1)
    norm_vec2 = np.linalg.norm(vec2)
    return dot_product / (norm_vec1 * norm_vec2)

# Function to compute lowest Euclidean distance for each row
def compute_greatest_similarity(row):
    dataframe_big_array = row['big_array']
    num_vectors = len(dataframe_big_array)
    greatest_similarity = -np.inf  # Initialize with a very small value
    for i in range(num_vectors):
        similarity_sum = 0
        for j in range(num_vectors):
            similarity = cosine_similarity(dataframe_big_array[j], test_vector_unpack[6][(i + j) % num_vectors])
            similarity_sum += (similarity)
        greatest_similarity = max(greatest_similarity, similarity_sum / num_vectors)
    return greatest_similarity

In [406]:
# Apply the function to each row in the DataFrame
results_df = df_v.copy()
results_df['lowest_distance'] = results_df.apply(compute_greatest_similarity, axis=1)
#sorted_results = results_df.sort_values(by='lowest_distance', ascending=False)
#sorted_results
averages_df = results_df.groupby('label')['lowest_distance'].mean().reset_index()
averages_df_sorted = averages_df.sort_values(by='lowest_distance', ascending=False)
averages_df_sorted

Unnamed: 0,label,lowest_distance
8,"('push_up', 'push_up_5')",0.711297
6,"('push_up', 'push_up_3')",0.710972
1,"('burpee', 'burpee_2')",0.70839
9,"('sit_up', 'sit_up_1')",0.70753
4,"('push_up', 'push_up_1')",0.707335
10,"('sit_up', 'sit_up_2')",0.707318
5,"('push_up', 'push_up_2')",0.707069
11,"('sit_up', 'sit_up_3')",0.706944
3,"('pull_up', 'pull_up_2')",0.706341
13,"('sit_up', 'sit_up_5')",0.706209
