In [None]:
import os
import cv2
from IPython.display import clear_output, display
from IPython.display import Image as IPyImage
from PIL import Image, ImageFont, ImageDraw
import numpy as np
import time


qualify_feedback_file = "quality_feedback.csv"
video_path = "./renames/step1"

In [None]:
# 获取所有已有视频，构建字典
# key 为 `{pid}_{camera}_{action}`
# vavlue 为对应视频的路径
video_index_name_to_full_path_dict = {}
for dirname, _, file_names in os.walk(video_path):
    for file_name in file_names:
        if not file_name.endswith(".mp4"):
            continue
        row = file_name.split('_')
        key = row[-1][:5] + '_' + row[1] + '_' + row[0]
        video_index_name_to_full_path_dict[key] = os.path.join(dirname, file_name)

In [None]:
actions_chinese_to_english_dict = {
    # 跌倒类型
    "原地软倒": "stillfall",
    "行进软倒": "walkingfall",
    "推倒": "pushoverfall",
    "绊倒": "tripfall",

    # 其他动作
    "吃药": "medicine",
    "吃饭": "eating",
    "喝水": "drinking",
    "拿手机": "takephone",
    "拿水杯": "takecup",
    "磕碰": "knock",
    "关门": "close",
    "开门": "open",
}

In [None]:
with open(qualify_feedback_file, 'r') as f:
    lines = f.readlines()
useless_list = []
remade_list = []
typo_list = []

sample_type = -1
for line in lines:
    if line.startswith("无效数据"):
        sample_type = 1
    elif line.startswith("需要返工"):
        sample_type = 2
    elif line.startswith("有小问题"):
        sample_type = 3
    elif line.startswith("P"):
        row = line.split(" ")
        key = row[0] + '_' + actions_chinese_to_english_dict[row[1]]
        details = line
        pair = (video_index_name_to_full_path_dict[key], details)
        if sample_type == 1:
            useless_list.append(pair)
        elif sample_type == 2:
            remade_list.append(pair)
        elif sample_type == 3:
            typo_list.append(pair)

In [None]:
def _add_chinese_in_image(img, context, color=(255, 0, 0)):
    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    img = Image.fromarray(np.array(img))

    draw = ImageDraw.Draw(img)
    font = ImageFont.truetype("MSYH.TTF", 20, encoding="utf-8")
    draw.text((0, 0), context, color, font=font)

    img = cv2.cvtColor(np.array(img), cv2.COLOR_RGB2BGR)
    return img

def _process_img(img, img_type='.jpg'):
    ret, img = cv2.imencode(img_type, img)
    # encoded = base64.b64encode(img)
    return IPyImage(data=img)

# img = _add_chinese_in_image(cv2.imread('./1.png'), "测试")
# display(_process_img(img))

In [None]:
def display_single_video(video_path, context, color=(255, 0, 0), resize_size=(720, 480)):
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        print('video {} doesn\'t exists.'.format(video_path))
        return
    while True:
        ret, frame = cap.read()
        if not ret:
            break
        clear_output()
        if resize_size is not None:
            frame = cv2.resize(frame, resize_size)
        display(_process_img(_add_chinese_in_image(frame, context, color)))
        time.sleep(0.1)
    cap.release()

In [None]:
for sample in useless_list:
    display_single_video(sample[0], sample[1])

In [None]:
for sample in remade_list:
    display_single_video(sample[0], sample[1])