In [1]:
import os
import cv2
import os.path as osp
import decord
import copy as cp
import numpy as np
# import matplotlib.pyplot as plt
import urllib
import moviepy.editor as mpy
# import random as rd

# from mmpose.apis import vis_pose_result
# from mmpose.models import TopDown
# from mmpose.models.pose_estimators import TopdownPoseEstimator as TopDown

from mmengine import load #, dump

from mmaction.datasets import (CenterCrop,
                               GeneratePoseTarget,
                               PoseCompact, PoseDecode,
                               Resize)

# We assume the annotation is already prepared
gym_ann_file = '../data/gym/gym_hrnet.pkl'  # https://download.openmmlab.com/mmaction/pyskl/data/gym/gym_hrnet.pkl
ntu60_ann_file = '../data/nturgbd/ntu60_hrnet.pkl'  # https://download.openmmlab.com/mmaction/pyskl/data/nturgbd/ntu60_hrnet.pkl

In [2]:
FONTFACE = cv2.FONT_HERSHEY_DUPLEX
FONTSCALE = 0.6
FONTCOLOR = (255, 255, 255)
BGBLUE = (0, 119, 182)
THICKNESS = 1
LINETYPE = 1

In [3]:
# def vis_pose_result(frame,
#                     result,
#                     skeleton,
#                     kpt_score_thr=0.3,
#                     pose_kpt_color=None,
#                     pose_limb_color=None,
#                     radius=4,
#                     thickness=1):
#     pose_result = []
#     for res in result:
#         pose_result.append(res['keypoints'])
#     img_h, img_w, _ = frame.shape

#     for kpts in pose_result:
#         # draw each point on image
#         if pose_kpt_color is not None:
#             assert len(pose_kpt_color) == len(kpts)
#             for kid, kpt in enumerate(kpts):
#                 x_coord, y_coord, kpt_score = int(kpt[0]), int(kpt[1]), kpt[2]
#                 if kpt_score > kpt_score_thr:
#                     r, g, b = pose_kpt_color[kid]
#                     cv2.circle(frame, (int(x_coord), int(y_coord)), radius,
#                                (int(r), int(g), int(b)), -1)

#         # draw limbs
#         if skeleton is not None and pose_limb_color is not None:
#             assert len(pose_limb_color) == len(skeleton)
#             for sk_id, sk in enumerate(skeleton):
#                 pos1 = (int(kpts[sk[0] - 1, 0]), int(kpts[sk[0] - 1, 1]))
#                 pos2 = (int(kpts[sk[1] - 1, 0]), int(kpts[sk[1] - 1, 1]))
#                 if (pos1[0] > 0 and pos1[0] < img_w and pos1[1] > 0
#                         and pos1[1] < img_h and pos2[0] > 0 and pos2[0] < img_w
#                         and pos2[1] > 0 and pos2[1] < img_h
#                         and kpts[sk[0] - 1, 2] > kpt_score_thr
#                         and kpts[sk[1] - 1, 2] > kpt_score_thr):
#                     r, g, b = pose_limb_color[sk_id]
#                     cv2.line(
#                         frame,
#                         pos1,
#                         pos2, (int(r), int(g), int(b)),
#                         thickness=thickness)


def add_label(frame, label, BGCOLOR=BGBLUE):
    threshold = 30
    def split_label(label):
        label = label.split()
        lines, cline = [], ''
        for word in label:
            if len(cline) + len(word) < threshold:
                cline = cline + ' ' + word
            else:
                lines.append(cline)
                cline = word
        if cline != '':
            lines += [cline]
        return lines
    
    if len(label) > 30:
        label = split_label(label)
    else:
        label = [label]
    label = ['Action: '] + label
    
    sizes = []
    for line in label:
        sizes.append(cv2.getTextSize(line, FONTFACE, FONTSCALE, THICKNESS)[0])
    box_width = max([x[0] for x in sizes]) + 10
    text_height = sizes[0][1]
    box_height = len(sizes) * (text_height + 6)
    
    cv2.rectangle(frame, (0, 0), (box_width, box_height), BGCOLOR, -1)
    for i, line in enumerate(label):
        location = (5, (text_height + 6) * i + text_height + 3)
        cv2.putText(frame, line, location, FONTFACE, FONTSCALE, FONTCOLOR, THICKNESS, LINETYPE)
    return frame
    

# def vis_skeleton(vid_path, anno, category_name=None, ratio=0.5):
#     vid = decord.VideoReader(vid_path)
#     frames = [x.asnumpy() for x in vid]
    
#     h, w, _ = frames[0].shape
#     new_shape = (int(w * ratio), int(h * ratio))
#     frames = [cv2.resize(f, new_shape) for f in frames]
    
#     assert len(frames) == anno['total_frames']
#     # The shape is N x T x K x 3
#     kps = np.concatenate([anno['keypoint'], anno['keypoint_score'][..., None]], axis=-1)
#     kps[..., :2] *= ratio
#     # Convert to T x N x K x 3
#     kps = kps.transpose([1, 0, 2, 3])
#     vis_frames = []

#     # we need an instance of TopDown model, so build a minimal one
#     model = TopDown(backbone=dict(type='ShuffleNetV1'))

#     for f, kp in zip(frames, kps):
#         bbox = np.zeros([0, 4], dtype=np.float32)
#         result = [dict(bbox=bbox, keypoints=k) for k in kp]
#         vis_frame = vis_pose_result(model, f, result)
        
#         if category_name is not None:
#             vis_frame = add_label(vis_frame, category_name)
        
#         vis_frames.append(vis_frame)
#     return vis_frames

In [4]:
keypoint_pipeline = [
    dict(type=PoseDecode),
    dict(type=PoseCompact, hw_ratio=1., allow_imgpad=True),
    dict(type=Resize, scale=(-1, 64)),
    dict(type=CenterCrop, crop_size=64),
    dict(type=GeneratePoseTarget, sigma=0.6, use_score=True, with_kp=True, with_limb=False)
]

limb_pipeline = [
    dict(type=PoseDecode),
    dict(type=PoseCompact, hw_ratio=1., allow_imgpad=True),
    dict(type=Resize, scale=(-1, 64)),
    dict(type=CenterCrop, crop_size=64),
    dict(type=GeneratePoseTarget, sigma=0.6, use_score=True, with_kp=False, with_limb=True)
]

from mmengine.dataset import Compose
def get_pseudo_heatmap(anno, flag='keypoint'):  # maybe related to class GeneratePoseTarget: def generate_a_heatmap()
    assert flag in ['keypoint', 'limb']
    pipeline = Compose(keypoint_pipeline if flag == 'keypoint' else limb_pipeline)
    return pipeline(anno)['imgs']

def vis_heatmaps(heatmaps, channel=-1, ratio=8):
    import matplotlib.cm as cm
    
    if ( 0 <= channel <= heatmaps.shape[1]-1 ):
        heatmaps = [heatmaps[x][channel] for x in range(heatmaps.shape[0])]
    else:  # draw all keypoints / limbs on the same map
        heatmaps = [np.max(x, axis=0) for x in heatmaps]
    
    h, w = heatmaps[0].shape    
    newh, neww = int(h * ratio), int(w * ratio)
    
    cmap = cm.viridis
    heatmaps = [(cmap(x)[..., :3] * 255).astype(np.uint8) for x in heatmaps]

    heatmaps = [cv2.resize(x, (neww, newh)) for x in heatmaps]
    return heatmaps

## GYM

In [None]:
# Load GYM annotations
lines = list(urllib.request.urlopen('https://sdolivia.github.io/FineGym/resources/dataset/gym99_categories.txt'))
gym_categories = [x.decode().strip().split('; ')[-1] for x in lines]
gym_annos = load(gym_ann_file)['annotations']

In [None]:
# download sample videos of GYM
!wget https://download.openmmlab.com/mmaction/posec3d/gym_samples.tar
!tar -xf gym_samples.tar
!rm gym_samples.tar

In [None]:
gym_root = 'gym_samples/'
gym_vids = os.listdir(gym_root)
# visualize pose of which video? index in 0 - 50.
idx = 1
vid = gym_vids[idx]

frame_dir = vid.split('.')[0]
vid_path = osp.join(gym_root, vid)
anno = [x for x in gym_annos if x['frame_dir'] == frame_dir][0]

In [None]:
vid_path

In [None]:
# Visualize Skeleton
# vis_frames = vis_skeleton(vid_path, anno, gym_categories[anno['label']])
# vid = mpy.ImageSequenceClip(vis_frames, fps=24)
# vid.ipython_display()

In [None]:
keypoint_heatmap = get_pseudo_heatmap(anno)
keypoint_mapvis = vis_heatmaps(keypoint_heatmap)
keypoint_mapvis = [add_label(f, gym_categories[anno['label']]) for f in keypoint_mapvis]
vid = mpy.ImageSequenceClip(keypoint_mapvis, fps=24)
vid.ipython_display()

In [None]:
limb_heatmap = get_pseudo_heatmap(anno, 'limb')
limb_mapvis = vis_heatmaps(limb_heatmap)
limb_mapvis = [add_label(f, gym_categories[anno['label']]) for f in limb_mapvis]
vid = mpy.ImageSequenceClip(limb_mapvis, fps=24)
vid.ipython_display()

## NTU60

In [None]:
# The name list of 
ntu_categories = ['drink water', 'eat meal/snack', 'brushing teeth', 'brushing hair', 'drop', 'pickup', 
                  'throw', 'sitting down', 'standing up (from sitting position)', 'clapping', 'reading', 
                  'writing', 'tear up paper', 'wear jacket', 'take off jacket', 'wear a shoe', 
                  'take off a shoe', 'wear on glasses', 'take off glasses', 'put on a hat/cap', 
                  'take off a hat/cap', 'cheer up', 'hand waving', 'kicking something', 
                  'reach into pocket', 'hopping (one foot jumping)', 'jump up', 
                  'make a phone call/answer phone', 'playing with phone/tablet', 'typing on a keyboard', 
                  'pointing to something with finger', 'taking a selfie', 'check time (from watch)', 
                  'rub two hands together', 'nod head/bow', 'shake head', 'wipe face', 'salute', 
                  'put the palms together', 'cross hands in front (say stop)', 'sneeze/cough', 
                  'staggering', 'falling', 'touch head (headache)', 'touch chest (stomachache/heart pain)', 
                  'touch back (backache)', 'touch neck (neckache)', 'nausea or vomiting condition', 
                  'use a fan (with hand or paper)/feeling warm', 'punching/slapping other person', 
                  'kicking other person', 'pushing other person', 'pat on back of other person', 
                  'point finger at the other person', 'hugging other person', 
                  'giving something to other person', "touch other person's pocket", 'handshaking', 
                  'walking towards each other', 'walking apart from each other']
ntu_annos = load(ntu60_ann_file)

In [None]:
ntu_annos

In [None]:
# download sample videos of NTU-60
# !wget https://download.openmmlab.com/mmaction/posec3d/ntu_samples.tar
!tar -xf ntu_samples.tar
!rm ntu_samples.tar

In [None]:
ntu_root = 'ntu_samples/'
ntu_vids = os.listdir(ntu_root)
# visualize pose of which video? index in 0 - 50.
idx = 20
vid = ntu_vids[idx]

frame_dir = vid.split('.')[0]
vid_path = osp.join(ntu_root, vid)
anno = [x for x in ntu_annos if x['frame_dir'] == frame_dir.split('_')[0]][0]

In [None]:
anno

In [None]:
vid_path

In [None]:
# vis_frames = vis_skeleton(vid_path, anno, ntu_categories[anno['label']])
# vid = mpy.ImageSequenceClip(vis_frames, fps=24)
# vid.ipython_display()

In [None]:
keypoint_heatmap = get_pseudo_heatmap(anno)
keypoint_mapvis = vis_heatmaps(keypoint_heatmap)
keypoint_mapvis = [add_label(f, gym_categories[anno['label']]) for f in keypoint_mapvis]
vid = mpy.ImageSequenceClip(keypoint_mapvis, fps=24)
vid.ipython_display()

In [None]:
limb_heatmap = get_pseudo_heatmap(anno, 'limb')
limb_mapvis = vis_heatmaps(limb_heatmap)
limb_mapvis = [add_label(f, gym_categories[anno['label']]) for f in limb_mapvis]
vid = mpy.ImageSequenceClip(limb_mapvis, fps=24)
vid.ipython_display()

# NTU60_2D_Single_Person

In [None]:
ntu60_2d_ann_file = 'mmaction2/tools/data/skeleton/S001C001P001R001A001_rgb.pkl'

In [None]:
ntu_categories = ['drink water', 'eat meal/snack', 'brushing teeth', 'brushing hair', 'drop', 'pickup', 
                  'throw', 'sitting down', 'standing up (from sitting position)', 'clapping', 'reading', 
                  'writing', 'tear up paper', 'wear jacket', 'take off jacket', 'wear a shoe', 
                  'take off a shoe', 'wear on glasses', 'take off glasses', 'put on a hat/cap', 
                  'take off a hat/cap', 'cheer up', 'hand waving', 'kicking something', 
                  'reach into pocket', 'hopping (one foot jumping)', 'jump up', 
                  'make a phone call/answer phone', 'playing with phone/tablet', 'typing on a keyboard', 
                  'pointing to something with finger', 'taking a selfie', 'check time (from watch)', 
                  'rub two hands together', 'nod head/bow', 'shake head', 'wipe face', 'salute', 
                  'put the palms together', 'cross hands in front (say stop)', 'sneeze/cough', 
                  'staggering', 'falling', 'touch head (headache)', 'touch chest (stomachache/heart pain)', 
                  'touch back (backache)', 'touch neck (neckache)', 'nausea or vomiting condition', 
                  'use a fan (with hand or paper)/feeling warm', 'punching/slapping other person', 
                  'kicking other person', 'pushing other person', 'pat on back of other person', 
                  'point finger at the other person', 'hugging other person', 
                  'giving something to other person', "touch other person's pocket", 'handshaking', 
                  'walking towards each other', 'walking apart from each other']
# ntu2d_annos = load(ntu60_2d_ann_file)['annotations']

In [None]:
ntu2d_root = 'mmaction2/tools/data/skeleton/'
vid = 'S001C001P001R001A001_rgb.avi'

# frame_dir = vid.split('.')[0]
vid_path = osp.join(ntu2d_root, vid)
# anno = [x for x in ntu2d_annos if x['frame_dir'] == frame_dir.split('_')[0]][0]
anno = load(ntu60_2d_ann_file)

In [None]:
anno

In [None]:
keypoint_heatmap = get_pseudo_heatmap(anno)
keypoint_mapvis = vis_heatmaps(keypoint_heatmap, channel=-1)
keypoint_mapvis = [add_label(f, ntu_categories[anno['label']]) for f in keypoint_mapvis]
vid = mpy.ImageSequenceClip(keypoint_mapvis, fps=24)

# CHECK ndarray.shape, RESTART KERNEL if ERROR!
print(keypoint_heatmap.shape)  # frames X kpts X width X heighs

vid.write_videofile("S001C001P001R001A001_rgb_heatmap.mp4", remove_temp=True)
vid.ipython_display()

In [None]:
limb_heatmap = get_pseudo_heatmap(anno, 'limb')
limb_mapvis = vis_heatmaps(limb_heatmap)
limb_mapvis = [add_label(f, ntu_categories[anno['label']]) for f in limb_mapvis]
vid = mpy.ImageSequenceClip(limb_mapvis, fps=24)

# CHECK ndarray.shape, RESTART KERNEL if ERROR!
print(limb_heatmap.shape[0])  # frames X kpts X width X heighs

vid.ipython_display()

# NTU60_2D_Multi_Persons

In [None]:
ntu60_2d_ann_file = 'mmaction2/tools/data/skeleton/S013C002P018R001A060_rgb.pkl'

In [None]:
ntu_categories = ['drink water', 'eat meal/snack', 'brushing teeth', 'brushing hair', 'drop', 'pickup', 
                  'throw', 'sitting down', 'standing up (from sitting position)', 'clapping', 'reading', 
                  'writing', 'tear up paper', 'wear jacket', 'take off jacket', 'wear a shoe', 
                  'take off a shoe', 'wear on glasses', 'take off glasses', 'put on a hat/cap', 
                  'take off a hat/cap', 'cheer up', 'hand waving', 'kicking something', 
                  'reach into pocket', 'hopping (one foot jumping)', 'jump up', 
                  'make a phone call/answer phone', 'playing with phone/tablet', 'typing on a keyboard', 
                  'pointing to something with finger', 'taking a selfie', 'check time (from watch)', 
                  'rub two hands together', 'nod head/bow', 'shake head', 'wipe face', 'salute', 
                  'put the palms together', 'cross hands in front (say stop)', 'sneeze/cough', 
                  'staggering', 'falling', 'touch head (headache)', 'touch chest (stomachache/heart pain)', 
                  'touch back (backache)', 'touch neck (neckache)', 'nausea or vomiting condition', 
                  'use a fan (with hand or paper)/feeling warm', 'punching/slapping other person', 
                  'kicking other person', 'pushing other person', 'pat on back of other person', 
                  'point finger at the other person', 'hugging other person', 
                  'giving something to other person', "touch other person's pocket", 'handshaking', 
                  'walking towards each other', 'walking apart from each other']
# ntu2d_annos = load(ntu60_2d_ann_file)['annotations']

In [None]:
ntu2d_root = '../cut'
vid = 'DJI_0013_12r_10s_2.mp4'
out_filename = 'data/DJI_0013_12r_10s_2_heatmap_17.MP4'

# frame_dir = vid.split('.')[0]
vid_path = osp.join(ntu2d_root, vid)
# anno = [x for x in ntu2d_annos if x['frame_dir'] == frame_dir.split('_')[0]][0]
anno = load(ntu60_2d_ann_file)

In [None]:
anno

In [None]:
with open('S013C002P018R001A060_rgb.txt','w') as data:  
      data.write(str(anno))

In [None]:
keypoint_heatmap = get_pseudo_heatmap(anno)
keypoint_mapvis = vis_heatmaps(keypoint_heatmap, channel=16)
# keypoint_mapvis = [add_label(f, ntu_categories[anno['label']]) for f in keypoint_mapvis]
vid = mpy.ImageSequenceClip(keypoint_mapvis, fps=12)

# CHECK ndarray.shape, RESTART KERNEL if ERROR!
print(keypoint_heatmap.shape)  # frames X kpts X width X heighs

vid.write_videofile(out_filename, remove_temp=True)
# vid.ipython_display()

# CIIS

In [5]:
ciis_ann_file = '../../../Downloads/k400_2d.pkl'

In [None]:
# ciis_categories = ['berdiri', 'berjalan', 'berjongkok', 'merayap', 'melempar', 'membidik (l. panjang)', 
#                   'membidik (l. pendek)', 'memukul', 'menendang', 'menusuk']
ciis_annos = load(ciis_ann_file)

In [None]:
ciis_annos[:10]

In [None]:
ciis_annos[1000:2000]

In [None]:
ciis_annos[2000:3000]

In [None]:
ciis_annos[3000:4000]

In [None]:
ciis_annos[4000:5000]

In [None]:
ciis_annos[5000:6000]

In [None]:
ciis_annos[6000:]

In [None]:
# ciis_root = 'data/video'
# ciis_vids = os.listdir(ciis_root)
# visualize pose of which video? index in 0 - 50.
# idx = 0
# vid = ciis_vids[idx]

frame_dir = '30d_1s1_206.002_2'
# vid_path = osp.join(ciis_root, vid)
anno = [x for x in ciis_annos if x['frame_dir'] == frame_dir][0]

In [None]:
anno

In [None]:
keypoint_heatmap = get_pseudo_heatmap(anno)
keypoint_mapvis = vis_heatmaps(keypoint_heatmap)
keypoint_mapvis = [add_label(f, ciis_categories[anno['label']]) for f in keypoint_mapvis]
vid = mpy.ImageSequenceClip(keypoint_mapvis, fps=24)
vid.ipython_display()

In [None]:
combined_pkl

# ntu60_2d.pkl

In [None]:
load('data/skeleton/ntu60_2d.pkl')