# Local Inference

In [1]:
%cd PFD

/tf/PFD


In [6]:
%matplotlib inline
import cv2, numpy as np, matplotlib.pyplot as plt
import torch
import torchvision
from torchvision.models import ResNet50_Weights
from torchvision.models.detection import KeypointRCNN_ResNet50_FPN_Weights
from torchvision.models.detection.rpn import AnchorGenerator
from torchvision.transforms import functional as F
import time
import tensorflow as tf
import torchvision.transforms as T
import onnx
import onnxruntime as onnxrt
from onnx_tf.backend import prepare
from torch.utils.mobile_optimizer import optimize_for_mobile

%matplotlib inline

In [9]:
def get_model(num_keypoints, weights_path=None):
    
    anchor_generator = AnchorGenerator(sizes=(32, 64, 128, 256, 512), aspect_ratios=(0.25, 0.5, 0.75, 1.0, 2.0, 3.0, 4.0))
    model = torchvision.models.detection.keypointrcnn_resnet50_fpn(weights=None,
                                                                   weights_backbone=ResNet50_Weights.DEFAULT,
                                                                   num_keypoints=num_keypoints,
                                                                   num_classes = 2, # Background is the first class, object is the second class
                                                                   rpn_anchor_generator=anchor_generator, 
                                                                   min_size=512)

    if weights_path:
        state_dict = torch.load(weights_path)
        model.load_state_dict(state_dict)        
        
    return model

In [10]:
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')

model = get_model(num_keypoints = 4, weights_path='./keypoint_model/weights/keypointsrcnn_weights.pth')
model.to(device)
print('done')

done


### Live Frame filter Algorithm

In [57]:
def calculate_bbox_center(bbox):
    # calculate center of bbox
    bbox_width = abs(bbox[0] - bbox[2])
    bbox_height = abs(bbox[1] - bbox[3])

    return [bbox[0] + bbox_width / 2, bbox[1] + bbox_height / 2], bbox_width, bbox_height


In [58]:
def live_frame_filter(keypoint, bbox):
    filtered_keypoints = []
    filtered_frames = []
    # filter threshold
    x_threshold = 0.5
    y_threshold = 6

    # calculate center of bbox
    bbox_center, bbox_width, bbox_height = calculate_bbox_center(bbox)

    # find left and right side coordinates
    left_side_coordinates = []
    right_side_coordinates = []

    # Get the left and right side coordinates
    for point in keypoint:
        if bbox_center[0] < point[0]:
            left_side_coordinates.append(point)
        elif bbox_center[0] > point[0]:
            right_side_coordinates.append(point)

    if len(left_side_coordinates) != 2 or len(right_side_coordinates) != 2:
        return False

    # Get the top and bottom side coordinates
    top_side_coordinates = []
    bottom_side_coordinates = []

    for point in keypoint:
        if bbox_center[1] < point[1]:
            top_side_coordinates.append(point)
        elif bbox_center[1] > point[1]:
            bottom_side_coordinates.append(point)
    
    if len(top_side_coordinates) != 2 or len(bottom_side_coordinates) != 2:
        return False

    # Extract the slope angles
    left_side_slope = 0 if left_side_coordinates[0][0] - left_side_coordinates[1][0] == 0 else (left_side_coordinates[0][1] - left_side_coordinates[1][1]) / (left_side_coordinates[0][0] - left_side_coordinates[1][0])
    right_side_slope = 0 if right_side_coordinates[0][0] - right_side_coordinates[1][0] == 0 else (right_side_coordinates[0][1] - right_side_coordinates[1][1]) / (right_side_coordinates[0][0] - right_side_coordinates[1][0])
    top_side_slope = 0 if top_side_coordinates[1][0] - top_side_coordinates[0][0] == 0 else (top_side_coordinates[1][1] - top_side_coordinates[0][1]) / (top_side_coordinates[1][0] - top_side_coordinates[0][0])
    bottom_side_slope = 0 if bottom_side_coordinates[1][0] - bottom_side_coordinates[0][0] == 0 else (bottom_side_coordinates[1][1] - bottom_side_coordinates[0][1]) / (bottom_side_coordinates[1][0] - bottom_side_coordinates[0][0])

    diff_top_bottom = abs(top_side_slope - bottom_side_slope)
    diff_left_right = abs(left_side_slope - right_side_slope)

    if diff_top_bottom < x_threshold and diff_left_right < y_threshold:
        return True
        
    return False

In [59]:
import mediapipe as mp
from mediapipe.python.solutions import drawing_utils as mp_drawing
from mediapipe.python.solutions import pose as mp_pose

mp_drawing = mp.solutions.drawing_utils
mp_drawing_styles = mp.solutions.drawing_styles
mp_hands = mp.solutions.hands

bbox_buffer = 5

def is_point_inside_rect(point, rect):
    x, y = point
    top_left_x, top_left_y, bottom_right_x, bottom_right_y = rect
    rect_x = top_left_x - bbox_buffer
    rect_y = top_left_y - bbox_buffer
    rect_w = bottom_right_x - top_left_x + bbox_buffer
    rect_h = bottom_right_y - top_left_y + bbox_buffer
    return rect_x <= x <= rect_x + rect_w and rect_y <= y <= rect_y + rect_h

def isHandInFrame(input_frame, bbox):
    # Initialize fresh pose tracker and run it.
      with mp_pose.Pose() as pose_tracker:
        result = pose_tracker.process(image=input_frame)
        pose_landmarks = result.pose_landmarks
        
        # Save landmarks.
        if pose_landmarks is not None:
            # Check the number of landmarks and take pose landmarks.
            assert len(pose_landmarks.landmark) == 33, 'Unexpected number of predicted pose landmarks: {}'.format(len(pose_landmarks.landmark))

            pose_landmarks = [[lmk.x, lmk.y, lmk.z] for lmk in pose_landmarks.landmark]

            # only extract upper body
            pose_landmarks = pose_landmarks[:25]

            # Map pose landmarks from [0, 1] range to absolute coordinates to get
            # correct aspect ratio.
            frame_height, frame_width = input_frame.shape[:2]
            pose_landmarks *= np.array([frame_width, frame_height, frame_width])

            # check if pose is inside keypoint
            for pose in pose_landmarks:
                if not is_point_inside_rect(pose[:2], bbox):
                    return False
            return True
        else:
            return False

## Run model on Live Camera or Video Recording

In [60]:
# frame capture rate in seconds
capture_rate = 10

# video file
video_file = "./assets/pfd_video_dataset/demo_0227.mp4"

### Test on video recording 
Choose from:
* Display video and draw over bounding box
* Extract frames for further processing (saves more time)

In [61]:
# with displaying video
frame_list = []
keypoint_list = []
bbox_list = []

# Open the video file
video = cv2.VideoCapture(video_file)

# Check if video is opened successfully
if not video.isOpened():
    print("Error opening video file")

# Get the width and height of the video
width = int(video.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(video.get(cv2.CAP_PROP_FRAME_HEIGHT))

# Set the desired width
desired_width = 500

# Calculate the aspect ratio
aspect_ratio = height / width

# Calculate the new height
desired_height = int(desired_width * aspect_ratio)

# Get the frames per second of the video
fps = video.get(cv2.CAP_PROP_FPS)

# Calculate the number of frames to skip
skip_frames = int(fps * capture_rate)

#### Display video and draw keypoints and bounding box

In [None]:
# Initialize the counter for frames
frame_count = 0

# keypoint, bbox and frame lists 
keypoints_main = []
bboxes_main = []

# Create a VideoWriter object to write the output video
fourcc = cv2.VideoWriter_fourcc(*'mp4v')  # codec
output_video = cv2.VideoWriter('output_video.mp4', fourcc, fps, (desired_width, desired_height))

# Read until video is completed
while(video.isOpened()):
    # Capture frame-by-frame
    ret, frame = video.read()
    
    if not ret:
        break
    
    if frame_count % skip_frames == 0:
        bboxes_main = []
        keypoints_main = []

        frame_np = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        frame_tensor = F.to_tensor(frame_np)
        frame_tensor = frame_tensor.to(device)
        
        images = [frame_tensor]

        with torch.no_grad():
            model.to(device)
            model.eval()
            output = model(images)
        
        frame_tensor = (images[0].permute(1,2,0).detach().cpu().numpy() * 255).astype(np.uint8)
        scores = output[0]['scores'].detach().cpu().numpy()
        
        high_scores_idxs = np.where(scores > 0.8)[0].tolist() # Indexes of boxes with scores > 0.7
        post_nms_idxs = torchvision.ops.nms(output[0]['boxes'][high_scores_idxs], output[0]['scores'][high_scores_idxs], 0.3).cpu().numpy() # Indexes of boxes left after applying NMS (iou_threshold=0.3)
        
        keypoints = []
        for kps in output[0]['keypoints'][high_scores_idxs][post_nms_idxs].detach().cpu().numpy():
            keypoints.append([list(map(int, kp[:2])) for kp in kps])
            
        bboxes = []
        for bbox in output[0]['boxes'][high_scores_idxs][post_nms_idxs].detach().cpu().numpy():
            bboxes.append(list(map(int, bbox.tolist())))


        #filter keypoints
        if len(keypoints) != 0 and live_frame_filter(keypoints[0], bboxes[0]) and isHandInFrame(frame_np, bboxes[0]) == False:
            if(len(keypoints) != 0):
                keypoints_main = keypoints
            
            if(len(bboxes) != 0):
                bboxes_main = bboxes

        # add frame to to frame_list
        frame_list.append(cv2.cvtColor(frame.copy(), cv2.COLOR_BGR2RGB))
        keypoint_list.append(keypoints_main)
        bbox_list.append(bboxes_main)

    if(len(keypoints_main) != 0):
        points = np.array(keypoints_main[0], np.int32)
        points = points.reshape((-1, 1, 2))

        # Draw a keypoints on the frame
        # cv2.polylines(frame, [points], isClosed=True, color=(0, 0, 0), thickness=8)
        for point in points:
            center = (point[0][0], point[0][1])
            radius = 15

            # Draw the keypoints using the circle() function
            cv2.circle(frame, center, radius, (0, 0, 255), -1)

        if(len(bboxes_main) != 0):
            # draw a bbox on the frame
            cv2.rectangle(frame, (bboxes_main[0][0], bboxes_main[0][1]),  (bboxes_main[0][2], bboxes_main[0][3]), color=(0, 255, 0), thickness=8)

    # Display the resulting frame
    frame = cv2.resize(frame, (desired_width, desired_height), interpolation=cv2.INTER_AREA)
    # cv2.imshow('Frame', frame)
    # Write the modified frame to the output video
    output_video.write(frame)
    
    # Press Q on keyboard to exit
    if cv2.waitKey(25) & 0xFF == ord('q'):
        break

    frame_count = frame_count + 1
# Release the video
video.release()
output_video.release()

# Close all windows
cv2.destroyAllWindows()

#### Extract frames and keypoints for further processing

In [59]:
# Get the total number of frames in the video
total_frames = int(video.get(cv2.CAP_PROP_FRAME_COUNT))
fps = int(video.get(cv2.CAP_PROP_FPS))

# keypoint, bbox and frame lists 
keypoints_main = []
bboxes_main = []

# Loop through the frames of the video
for i in range(total_frames):
    # Capture frame-by-frame
    ret, frame = video.read()

    if i % skip_frames == 0:
        bboxes_main = []
        keypoints_main = []

        frame_np = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)            
        frame_tensor = F.to_tensor(frame_np)
        frame_tensor = frame_tensor.to(device)

        images = [frame_tensor]
        
        with torch.no_grad():
            model.to(device)
            model.eval()
            output = model(images)

        frame_tensor = (images[0].permute(1,2,0).detach().cpu().numpy() * 255).astype(np.uint8)
        scores = output[0]['scores'].detach().cpu().numpy()

        high_scores_idxs = np.where(scores > 0.8)[0].tolist() # Indexes of boxes with scores > 0.7
        post_nms_idxs = torchvision.ops.nms(output[0]['boxes'][high_scores_idxs], output[0]['scores'][high_scores_idxs], 0.3).cpu().numpy() # Indexes of boxes left after applying NMS (iou_threshold=0.3)

        keypoints = []
        for kps in output[0]['keypoints'][high_scores_idxs][post_nms_idxs].detach().cpu().numpy():
            keypoints.append([list(map(int, kp[:2])) for kp in kps])
            
        bboxes = []
        for bbox in output[0]['boxes'][high_scores_idxs][post_nms_idxs].detach().cpu().numpy():
            bboxes.append(list(map(int, bbox.tolist())))

        #filter keypoints
        if len(keypoints) != 0 and live_frame_filter(keypoints[0], bboxes[0]) and isHandInFrame(frame_np, bboxes[0]) == False:
            if(len(keypoints) != 0):
                keypoints_main = keypoints
            
            if(len(bboxes) != 0):
                bboxes_main = bboxes
        
        # add frame to to frame_list
        if len(keypoints_main) > 0:
            frame_list.append(frame_tensor)
            keypoint_list.append([keypoints_main[0]])
            bbox_list.append(bboxes)

# Release the video
video.release()

### Test on live camera

In [4]:
# Initialize the counter for frames
frame_count = 0

# keypoint, bbox and frame lists 
keypoints_main = []
bboxes_main = []

# Create a VideoCapture object
cap = cv2.VideoCapture(0) # 0 means the default camera

while True:
    # Read a frame from the video
    ret, frame = cap.read()

    if int(time.time()) % capture_rate == 0:
        frame_np = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        frame_tensor = F.to_tensor(frame_np)
        frame_tensor = frame_tensor.to(device)
        images = [frame_tensor]


        with torch.no_grad():
            model.to(device)
            model.eval()
            output = model(images)
        
        frame_tensor = (images[0].permute(1,2,0).detach().cpu().numpy() * 255).astype(np.uint8)
        scores = output[0]['scores'].detach().cpu().numpy()
        
        high_scores_idxs = np.where(scores > 0.7)[0].tolist() # Indexes of boxes with scores > 0.7
        post_nms_idxs = torchvision.ops.nms(output[0]['boxes'][high_scores_idxs], output[0]['scores'][high_scores_idxs], 0.3).cpu().numpy() # Indexes of boxes left after applying NMS (iou_threshold=0.3)

        keypoints = []
        for kps in output[0]['keypoints'][high_scores_idxs][post_nms_idxs].detach().cpu().numpy():
            keypoints.append([list(map(int, kp[:2])) for kp in kps])
            
        bboxes = []
        for bbox in output[0]['boxes'][high_scores_idxs][post_nms_idxs].detach().cpu().numpy():
            bboxes.append(list(map(int, bbox.tolist())))
    
        if(len(keypoints) != 0):
            print(keypoints)
            keypoints_main = keypoints
        
        if(len(bboxes) != 0):
            print(bboxes)
            bboxes_main = bboxes

        # add frame to to frame_list
        frame_list.append(frame)
        keypoint_list.append(keypoints_main)
        bbox_list.append(bboxes_main)

    if(len(keypoints_main) != 0):
        points = np.array(keypoints_main, np.int32)
        points = points.reshape((-1, 1, 2))

        # Draw a keypoints on the frame
        # cv2.polylines(frame, [points], isClosed=True, color=(0, 0, 0), thickness=8)
        for point in points:
            center = (point[0][0], point[0][1])
            radius = 15

            # Draw the keypoints using the circle() function
            cv2.circle(frame, center, radius, (255, 0, 0), -1)

    if(len(bboxes_main) != 0):
        # draw a bbox on the frame
        cv2.rectangle(frame, (bboxes_main[0][0], bboxes_main[0][1]),  (bboxes_main[0][2], bboxes_main[0][3]), color=(0, 255, 0), thickness=8)
        
    # Display the frame with the bounding box
    cv2.imshow("Live Video with Bounding Box", frame)

    # `Exit the loop if the 'q' key is pressed
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

    frame_count = frame_count + 1

# Release the video capture object
cap.release()

# Close all windows
cv2.destroyAllWindows()

## Perspective Transformation and Timelapse

#### Perspective Transformation and Timelapse Creation

In [None]:
# Calculate the frame rate
num_frames = len(frame_list)
timelapse_length = num_frames / 2
frame_rate = int(num_frames / timelapse_length)

#

# previous frame for blending
prev_frame = []
# keypoint = keypoint_list[0][0]
# bbox = bbox_list[0][0]

bbox_center, bbox_width, bbox_height = calculate_bbox_center(bbox)

#  Define the codec and create a video writer
fourcc = cv2.VideoWriter_fourcc(*"MP4V" )
out = cv2.VideoWriter("timelapse.mp4", fourcc, frame_rate, (bbox_width, bbox_height))


for count, frame in enumerate(frame_list):
    keypoint = keypoint_list[count][0]
    bbox = bbox_list[count][0]

    # pt1 => (top_left, top_right, bottom_left, bottom_right)
    top_left = []
    top_right = []
    bottom_left = []
    bottom_right = []

    for kp in keypoint:
        print(kp)
        if kp[0] < bbox_center[0] and kp[1] < bbox_center[1]:
            top_left = kp
        elif kp[0] > bbox_center[0] and kp[1] > bbox_center[1]:
            bottom_right = kp
        elif kp[0] < bbox_center[0] and kp[1] > bbox_center[1]:
            top_right = kp
        elif kp[0] > bbox_center[0] and kp[1] < bbox_center[1]:
            bottom_left = kp

    pt1 = np.float32([top_left, bottom_left, top_right, bottom_right])
    pt2 = np.float32([[0,0],[bbox_width, 0], [0, bbox_height], [bbox_width, bbox_height]])
    M = cv2.getPerspectiveTransform(pt1,pt2)

    # Perform the transformation
    warped_image = cv2.warpPerspective(frame, M, (bbox_width, bbox_height))
    warped_image = cv2.cvtColor(warped_image,cv2.COLOR_BGR2RGB)

    # blend frames together for better transition
    if count != 0:
        blended = cv2.addWeighted(prev_frame, 0.1, warped_image, 0.9, 0)
    else:
        blended = warped_image
    prev_frame = warped_image

    out.write(blended)

# Release the video writer
out.release()