# Real-time 2D Multi-Person Pose Estimation on CPU: Lightweight OpenPose


Pose estimation test bed. I have used the following github repository for reference : https://github.com/Daniil-Osokin/lightweight-human-pose-estimation.pytorch

Link to the paper : https://arxiv.org/pdf/1811.12004.pdf



_____________________________________________________________
## **Part 1 :** 
* Cloning the repository
* Downloading the pertrained model mentioned in the repository.
* Downloading an image to test

In [0]:
!git clone https://github.com/Daniil-Osokin/lightweight-human-pose-estimation.pytorch.git

Cloning into 'lightweight-human-pose-estimation.pytorch'...
remote: Enumerating objects: 9, done.[K
remote: Counting objects: 100% (9/9), done.[K
remote: Compressing objects: 100% (8/8), done.[K
remote: Total 92 (delta 3), reused 3 (delta 1), pack-reused 83[K
Unpacking objects: 100% (92/92), done.


In [0]:
!wget "https://download.01.org/opencv/openvino_training_extensions/models/human_pose_estimation/checkpoint_iter_370000.pth"

--2020-03-29 05:28:07--  https://download.01.org/opencv/openvino_training_extensions/models/human_pose_estimation/checkpoint_iter_370000.pth
Resolving download.01.org (download.01.org)... 104.124.235.58, 2600:1417:76:480::4b21, 2600:1417:76:483::4b21
Connecting to download.01.org (download.01.org)|104.124.235.58|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 87959810 (84M)
Saving to: ‘checkpoint_iter_370000.pth’


2020-03-29 05:28:08 (269 MB/s) - ‘checkpoint_iter_370000.pth’ saved [87959810/87959810]



In [0]:
!ls

checkpoint_iter_370000.pth  lightweight-human-pose-estimation.pytorch
data			    output.jpg
img1.jpg		    sample_data


In [0]:
!cp lightweight-human-pose-estimation.pytorch/demo.py lightweight-human-pose-estimation.pytorch/demo2.py

In [0]:
!wget -o image.jpg "https://image.shutterstock.com/image-photo/full-body-young-man-standing-260nw-235695058.jpg"

In [0]:
!mkdir data

In [0]:
!cp data/img1.jpg img1.jpg

In [0]:
!python3 lightweight-human-pose-estimation.pytorch/demo2.py --checkpoint-path checkpoint_iter_370000.pth --image data

## **Part 2:**
Setting up the test bed 
* Importing libraries
* Adding desired path
* Writing desired classes
* Modifying the run function as per requirements


In [0]:
%matplotlib inline
from matplotlib import pyplot as plt

import sys
sys.path.append('./lightweight-human-pose-estimation.pytorch/')

import cv2
import numpy as np
import torch

from models.with_mobilenet import PoseEstimationWithMobileNet
from modules.keypoints import extract_keypoints, group_keypoints
from modules.load_state import load_state
from modules.pose import Pose, track_poses
from val import normalize, pad_width

Writing necessary classes and methods

In [0]:
# Image reader class
class ImageReader(object):
    def __init__(self, file_names):
        self.file_names = file_names
        self.max_idx = len(file_names)

    def __iter__(self):
        self.idx = 0
        return self

    def __next__(self):
        if self.idx == self.max_idx:
            raise StopIteration
        img = cv2.imread(self.file_names[self.idx], cv2.IMREAD_COLOR)
        if img.size == 0:
            raise IOError('Image {} cannot be read'.format(self.file_names[self.idx]))
        self.idx = self.idx + 1
        return img

class VideoReader(object):
    def __init__(self, file_name):
        self.file_name = file_name
        try:  # OpenCV needs int to read from webcam
            self.file_name = int(file_name)
        except ValueError:
            pass
        self.cap = cv2.VideoCapture(self.file_name)
        self.h = self.cap.get(3)
        self.w = self.cap.get(4)
        print(self.h)
        print(self.w)

    def __iter__(self):
        if not self.cap.isOpened():
            raise IOError('Video {} cannot be opened'.format(self.file_name))
        return self

    def __next__(self):
        was_read, img = self.cap.read()
        if not was_read:
            raise StopIteration
        return img

def infer_fast(net, img, net_input_height_size, stride, upsample_ratio, cpu,
               pad_value=(0, 0, 0), img_mean=(128, 128, 128), img_scale=1/256):
    height, width, _ = img.shape
    scale = net_input_height_size / height

    scaled_img = cv2.resize(img, (0, 0), fx=scale, fy=scale, interpolation=cv2.INTER_CUBIC)
    scaled_img = normalize(scaled_img, img_mean, img_scale)
    min_dims = [net_input_height_size, max(scaled_img.shape[1], net_input_height_size)]
    padded_img, pad = pad_width(scaled_img, stride, pad_value, min_dims)

    tensor_img = torch.from_numpy(padded_img).permute(2, 0, 1).unsqueeze(0).float()
    if not cpu:
        tensor_img = tensor_img.cuda()

    stages_output = net(tensor_img)

    stage2_heatmaps = stages_output[-2]
    heatmaps = np.transpose(stage2_heatmaps.squeeze().cpu().data.numpy(), (1, 2, 0))
    heatmaps = cv2.resize(heatmaps, (0, 0), fx=upsample_ratio, fy=upsample_ratio, interpolation=cv2.INTER_CUBIC)

    stage2_pafs = stages_output[-1]
    pafs = np.transpose(stage2_pafs.squeeze().cpu().data.numpy(), (1, 2, 0))
    pafs = cv2.resize(pafs, (0, 0), fx=upsample_ratio, fy=upsample_ratio, interpolation=cv2.INTER_CUBIC)

    return heatmaps, pafs, scale, pad

def run_demo_img(net, img, height_size, cpu, track, smooth):
    net = net.eval()
    if not cpu:
        net = net.cuda()

    stride = 8
    upsample_ratio = 4
    num_keypoints = Pose.num_kpts
    previous_poses = []
    delay = 33
    # img = cv2.imread("./data/img1.jpg")
    
    orig_img = img.copy()
    heatmaps, pafs, scale, pad = infer_fast(net, img, height_size, stride, upsample_ratio, cpu)

    total_keypoints_num = 0
    all_keypoints_by_type = []
    for kpt_idx in range(num_keypoints):  # 19th for bg
        total_keypoints_num += extract_keypoints(heatmaps[:, :, kpt_idx], all_keypoints_by_type, total_keypoints_num)

    pose_entries, all_keypoints = group_keypoints(all_keypoints_by_type, pafs, demo=True)
    for kpt_id in range(all_keypoints.shape[0]):
        all_keypoints[kpt_id, 0] = (all_keypoints[kpt_id, 0] * stride / upsample_ratio - pad[1]) / scale
        all_keypoints[kpt_id, 1] = (all_keypoints[kpt_id, 1] * stride / upsample_ratio - pad[0]) / scale
    current_poses = []
    for n in range(len(pose_entries)):
        if len(pose_entries[n]) == 0:
            continue
        pose_keypoints = np.ones((num_keypoints, 2), dtype=np.int32) * -1
        for kpt_id in range(num_keypoints):
            if pose_entries[n][kpt_id] != -1.0:  # keypoint was found
                pose_keypoints[kpt_id, 0] = int(all_keypoints[int(pose_entries[n][kpt_id]), 0])
                pose_keypoints[kpt_id, 1] = int(all_keypoints[int(pose_entries[n][kpt_id]), 1])
        pose = Pose(pose_keypoints, pose_entries[n][18])
        current_poses.append(pose)

    if track:
        track_poses(previous_poses, current_poses, smooth=smooth)
        previous_poses = current_poses
    for pose in current_poses:
        pose.draw(img)
    img = cv2.addWeighted(orig_img, 0.6, img, 0.4, 0)
    for pose in current_poses:
        cv2.rectangle(img, (pose.bbox[0], pose.bbox[1]),
                        (pose.bbox[0] + pose.bbox[2], pose.bbox[1] + pose.bbox[3]), (0, 255, 0))
        if track:
            cv2.putText(img, 'id: {}'.format(pose.id), (pose.bbox[0], pose.bbox[1] - 16),
                        cv2.FONT_HERSHEY_COMPLEX, 0.5, (0, 0, 255))
        
    # cv2.imwrite("output.jpg",img)
    return img


In [0]:
# Main code goes here
from tqdm import tqdm
import glob

checkpoint_path = "checkpoint_iter_370000.pth"

# Creating the netork object
net = PoseEstimationWithMobileNet()
checkpoint = torch.load(checkpoint_path, map_location='cpu')
load_state(net, checkpoint)

input_img_height = 256
on_cpu = False
track = 1
smooth = 1

# output = run_demo_img(net, img, input_img_height, on_cpu, track, smooth)

video = 1
vids = "./vids/*.mp4"
video_paths = glob.glob(vids)

for i, video_path in enumerate(video_paths):
    cap = cv2.VideoCapture(video_path)
    fps = 30
    output_filename = "./output_vids/output%d.mp4"%i
    w = int(cap.get(3))
    h = int(cap.get(4))
    fourcc=cv2.VideoWriter_fourcc(*'XVID')
    out=cv2.VideoWriter(output_filename,fourcc,fps,(w,h))
    num_frames = int(cap.get(7))

    for j in tqdm(range(num_frames)):
        ret,img = cap.read()
        output = run_demo_img(net, img, input_img_height, on_cpu, track, smooth)
        out.write(output)

    out.release()


100%|██████████| 365/365 [00:15<00:00, 23.05it/s]
100%|██████████| 816/816 [00:34<00:00, 23.44it/s]


In [0]:
from google.colab import files

# Make sure you allow multiple files to download in your web browser
output_folder = "./output_vids"

files_ = glob.glob(output_folder+"/*.mp4")
for out_file in files_:
    files.download(out_file)