In [3]:
!pip install ruptures

Collecting pandas
  Downloading pandas-1.4.2-cp39-cp39-win_amd64.whl (10.5 MB)
Collecting pytz>=2020.1
  Using cached pytz-2022.1-py2.py3-none-any.whl (503 kB)
Installing collected packages: pytz, pandas
Successfully installed pandas-1.4.2 pytz-2022.1
Collecting ruptures
  Using cached ruptures-1.1.6-cp39-cp39-win_amd64.whl (378 kB)
Installing collected packages: ruptures
Successfully installed ruptures-1.1.6


In [1]:
import tensorflow as tf
import tensorflow_hub as hub
import cv2
from matplotlib import pyplot as plt
import numpy as np
import math
import os
import pandas as pd
import ruptures as rpt
import csv
import glob

In [2]:
gpus = tf.config.experimental.list_physical_devices('GPU')   #for gpu usage
for gpu in gpus:
    tf.config.experimental.set_memory_growth(gpu, True)

In [3]:
model = hub.load('https://tfhub.dev/google/movenet/multipose/lightning/1')
movenet = model.signatures['serving_default']

In [4]:
# Function to loop through each person detected and render
def loop_through_people(frame, keypoints_with_scores, edges, confidence_threshold):
    for person in keypoints_with_scores:
        draw_connections(frame, person, edges, confidence_threshold)
        draw_keypoints(frame, person, confidence_threshold)

def draw_keypoints(frame, keypoints, confidence_threshold):
    y, x, c = frame.shape
    shaped = np.squeeze(np.multiply(keypoints, [y,x,1]))
    
    for kp in shaped:
        ky, kx, kp_conf = kp
        if kp_conf > confidence_threshold:
            cv2.circle(frame, (int(kx), int(ky)), 6, (0,255,0), -1)

EDGES = {
    (0, 1): 'm',
    (0, 2): 'c',
    (1, 3): 'm',
    (2, 4): 'c',
    (0, 5): 'm',
    (0, 6): 'c',
    (5, 7): 'm',
    (7, 9): 'm',
    (6, 8): 'c',
    (8, 10): 'c',
    (5, 6): 'y',
    (5, 11): 'm',
    (6, 12): 'c',
    (11, 12): 'y',
    (11, 13): 'm',
    (13, 15): 'm',
    (12, 14): 'c',
    (14, 16): 'c'
}

def draw_connections(frame, keypoints, edges, confidence_threshold):
    y, x, c = frame.shape
    shaped = np.squeeze(np.multiply(keypoints, [y,x,1]))
    
    for edge, color in edges.items():
        p1, p2 = edge
        y1, x1, c1 = shaped[p1]
        y2, x2, c2 = shaped[p2]
        
        if (c1 > confidence_threshold) & (c2 > confidence_threshold):      
            cv2.line(frame, (int(x1), int(y1)), (int(x2), int(y2)), (0,0,255), 4)

In [5]:
def find_closest_to_centre(b_box,centre, h,w):
    # data in b_box: x_min, Y_min, x_max, y_max, confidence/score
    conf = np.array(b_box[:,4])
    n_people = (conf>0.2).sum()
    if (n_people<=1):
        res = np.argmax(conf) #most prominent
        centroid = [((b_box[res][0]+b_box[res][2])/2)*w,((b_box[res][1]+b_box[res][3])/2)*h]
    else:
        #to find closest to center the euclidean dist. of the centroid of the bounding must be closest to centre pixel
        res = -1
        dis = 1000000
        for i in range (6):
            if (b_box[i][4]<0.2):
                continue
            c = [((b_box[i][0]+b_box[i][2])/2)*w,((b_box[i][1]+b_box[i][3])/2)*h]
            new = math.dist(centre,c)
            if (new<dis):
                dis = new
                res = i
                centroid = c
    return res, centroid
#Major assumption - subject starts off closest to centre of the frame, irrespective of who is the most prominent in the frame

def calculate_angle(y1,x1,y2,x2,y3,x3):

    # Calculate the angle between the three points
    angle = math.degrees(math.atan2(y3 - y2, x3 - x2) - math.atan2(y1 - y2, x1 - x2))
    
    # Check if the angle is less than zero.
    if angle < 0:
        
        # Add 360 to the found angle.
        angle += 360
   
    # Return the calculated angle.
    return angle

def classify_pose(lka,rka):
    label = 'none'
    if ((lka<120 or lka > 240) or (rka<120 or rka>240)):
        label = 'sitting'
    else:
        label = 'standing'
    
    return label


In [6]:
dir = 'D:\LifeSpark Technology\videos'  
files = glob.glob(os.path.join(dir, '*.mp4'))

In [7]:
files[0][:-4]

'D:\\LifeSpark Technology\\check\\DJI_0230_conv'

In [9]:
for file in files:
    cap = cv2.VideoCapture(file)
    final = []
    fps = int(cap.get(cv2.CAP_PROP_FPS))
    # #fourcc = cv2.VideoWriter_fourcc(*'XVID')  #for saving video
    h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    centroid = [w/2,h/2]
    video_writer = cv2.VideoWriter(file[:-4]+'.avi', cv2.VideoWriter_fourcc('P','I','M','1'), fps, (w, h), isColor=True) 
    f=1
    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break
        # Resize image
        img = frame.copy()
        img = tf.image.resize_with_pad(tf.expand_dims(img, axis=0), 192,288)  #movenet requires multiples of 32
        input_img = tf.cast(img, dtype=tf.int32)

        # Detection section
        results = movenet(input_img)
        b_box = results['output_0'].numpy()[:,:,51:].reshape((6,5))
        keypoints_with_scores = results['output_0'].numpy()[:,:,:51].reshape((6,17,3))

        #to find subject among others, considering in initial frame, subject is the person nearest centre
        data = []
        
        
        key, centroid = find_closest_to_centre(b_box,centroid,h,w)   
        #centroid keeps changing, so that same person is being tracked, Some videos may give better results when centroid is fixed
        
        
        lka = calculate_angle(keypoints_with_scores[key][11][0]*h,keypoints_with_scores[key][11][1]*w,keypoints_with_scores[key][13][0]*h,keypoints_with_scores[key][13][1]*w,keypoints_with_scores[key][15][0]*h,keypoints_with_scores[key][15][1]*w)
        rka = calculate_angle(keypoints_with_scores[key][12][0]*h,keypoints_with_scores[key][12][1]*w,keypoints_with_scores[key][14][0]*h,keypoints_with_scores[key][14][1]*w,keypoints_with_scores[key][16][0]*h,keypoints_with_scores[key][16][1]*w)
        data.append(lka)
        data.append(rka)
        label = classify_pose(lka,rka)
        data.append(label)
        time = int(f/fps)
        f+=1
        data.append(time)
        final.append(data)
        # Render keypoints 

        #keypoints_with_scores = keypoints_with_scores[key].reshape((1,17,3))  #comment this line to view all keypoints // uncomment to view only extracted keypoint

        loop_through_people(frame, keypoints_with_scores, EDGES, 0.3)
        video_writer.write(frame)
        #frame = cv2.resize(frame,(960,540))
        cv2.putText(frame, label, (10, 30),cv2.FONT_HERSHEY_PLAIN, 2, (0, 255, 0), 2)
        cv2.imshow('Movenet Multipose', frame)

        if cv2.waitKey(10) & 0xFF==ord('q'):
            break
    cap.release()
    video_writer.release()
    cv2.destroyAllWindows()
    
    headers = ['left_knee_angle','right_knee_angle','label','timestamp']
    with open(file[:-4]+'.csv',"w+") as my_csv:     #saving data at similar path 
        w = csv.writer(my_csv,delimiter=',', lineterminator = '\n')
        w.writerow(headers)
        w.writerows(final)