### 自定义手势-宏  
配置文件`config.json`:
``` json
[
    {
        "id":"id",
        "name":"name",
        "data_type":["right_hand","left_hand","body","face"]/"time",
        "match_data":"path-to-json"/"contain"/time(ms),
        "command":"",
        "sensetive":100,
        "start":true/false,
        "next_tasks":[
            {"operator":"start","id":"id"},
            {"operator":"stop","id":"id"}
        ]
    },
    ...
]
```
通过手势识别结合定时任务组成的任务链以完成复杂手势指令  
每个`task`可以选择匹配的动作与部位，抑或选择等待时间  
`task`满足对应条件后可以调整其他任务激活状态并执行命令  
依靠此任务链系统可以完成复杂逻辑的编写  

#### 导入必要包

In [1]:
import os
import cv2 as cv
import json
import numpy as np
import mediapipe as mp
import mediapipe.python.solutions as sol
from IPython.display import clear_output
from datetime import datetime
data_dir=r".\data"
if not os.path.exists(os.path.join(data_dir,"config.json")):
    with open(os.path.join(data_dir,"config.json"),"w") as f:
        f.write("[]")

#### 动作归中，任务控制器与任务结构体

In [6]:
def normalize_points(points):
    res=[]
    maxx,maxy,minx,miny=-0.6,-0.6,0.6,0.6
    for point in points:
        maxx=max(maxx,point[0])
        maxy=max(maxy,point[1])
        minx=min(minx,point[0])
        miny=min(miny,point[1])
    max_delta=max(maxx-minx,maxy-miny)
    if max_delta<=1e-5:
        return [[0,0]]*21
    for point in points:
        newx=(point[0]-(maxx+minx)/2)/max_delta
        newy=(point[1]-(maxy+miny)/2)/max_delta
        if newx<-0.5-1e-6 or newx>0.5+1e-6 or newy<-0.5-1e-6 or newy>0.5+1e-6:
            print([point[0],point[1],newx,newy,max_delta,maxx,maxy,minx,miny])
            raise ValueError
        res.append([newx,newy])
    return res

class Task_controller:
    def __init__(self):
        self.tasks={}
        self.Activate={}
    def listen(self,x):
        for i in self.tasks.keys():
            if self.Activate[i]:
                self.tasks[i].listen(x)
    def activate(self,id):
        self.Activate[id]=True
        self.tasks[id].activate()
    def deactivate(self,id):
        self.Activate[id]=False
        self.tasks[id].deactivate()
    def add_task(self,id,name,data_type,match_data,sensetive,command="",start=True,next_tasks=[]):
        a=Task(self,id,name,data_type,match_data,sensetive,command,start,next_tasks)
        self.tasks[id]=a
        self.Activate[id]=start

class Task:
    def __init__(self,controller,id,name,data_type,match_data,sensetive,command="",start=True,next_tasks=[]):
        self.controller=controller
        self.id=id
        self.name=name
        if data_type=="time":
            self.data_type=data_type
            self.match_data=match_data
            self.activate_time=0
        else:
            self.data_type=data_type
            if match_data=="contain":
                self.match_data=match_data
            else:
                print(f"reading {match_data}")
                with open(match_data,"r") as f:
                    self.match_data=json.loads(f.read())
                self.pose_path=match_data
        self.sensetive=sensetive
        self.command=command
        self.start=start
        self.next_tasks=next_tasks
    def process(self):
        print(f"processing {self.name}")
        os.system(self.command)
        for i in self.next_tasks:
            if i['operator']=='start':
                self.controller.activate(i['id'])
            if i['operator']=='stop':
                self.controller.deactivate(i['id'])
    def activate(self):
        if self.data_type=="time":
            self.activate_time=datetime.now().timestamp()*1000
    def deactivate(self):
        pass
    def listen(self,x):
        print(f"listening {self.name}",end="")
        if self.data_type=="time":
            print(f", time last {self.match_data-(datetime.now().timestamp()*1000-self.activate_time)}")
            if datetime.now().timestamp()*1000-self.activate_time>=self.match_data:
                self.activate_time=0
                self.process()
            return
        if self.match_data=="contain":
            print(f", contain {self.data_type}")
            for data in self.data_type:
                if x[data][0][0]<=1e-6 and x[data][0][1]<=1e-6:
                    print("")
                    return
            self.process()
            return
        delta=0
        for data in self.data_type:
            if x[data][0][0]<=1e-6 and x[data][0][1]<=1e-6:
                print("")
                return
            points=normalize_points(x[data])
            match=normalize_points(self.match_data[data])
            if len(points)!=len(match):
                raise ValueError(f"Point count unmatch! Expect {len(match)} but find {len(points)}")
            for i in range(len(points)):
                delta+=((points[i][0]-match[i][0])**2+(points[i][1]-match[i][1])**2)**0.5
        print(f", pose {self.pose_path.split('/')[-1].split('\\')[-1]}, delta {delta:.5f}")
        if delta<self.sensetive:
            self.process()

#### 从`config`中读取`task`列表

In [16]:
with open(os.path.join(data_dir,"config.json"),"r") as f:
    config=json.loads(f.read())
controller=Task_controller()
for task in config:
    print(task)
    controller.add_task(task['id'],task['name'],task['data_type'],task['match_data'],
                        task['sensetive'],task['command'],task['start'],task['next_tasks'])

{'id': '1', 'name': 'prepare start calc', 'data_type': ['left_hand'], 'match_data': 'D:\\sjtu\\project\\continus\\hand1.json', 'command': '', 'sensetive': 5, 'start': True, 'next_tasks': [{'operator': 'stop', 'id': '1'}, {'operator': 'start', 'id': '2'}, {'operator': 'start', 'id': '3'}]}
reading D:\sjtu\project\continus\hand1.json
{'id': '2', 'name': 'start calc', 'data_type': 'time', 'match_data': 10000, 'command': 'start calc', 'sensetive': 3, 'start': False, 'next_tasks': [{'operator': 'stop', 'id': '2'}, {'operator': 'stop', 'id': '3'}, {'operator': 'start', 'id': '4'}]}
{'id': '3', 'name': 'abort start', 'data_type': ['right_hand'], 'match_data': 'D:\\sjtu\\project\\continus\\hand2.json', 'command': '', 'sensetive': 3, 'start': False, 'next_tasks': [{'operator': 'stop', 'id': '2'}, {'operator': 'stop', 'id': '3'}, {'operator': 'start', 'id': '1'}]}
reading D:\sjtu\project\continus\hand2.json
{'id': '4', 'name': 'close calc', 'data_type': ['right_hand'], 'match_data': 'D:\\sjtu\\p

#### 关键点识别可视化&识别主循环

In [12]:
def draw_styled_landmarks(image, results):
    # Draw face connections
    sol.drawing_utils.draw_landmarks(image, results.face_landmarks, sol.holistic.FACEMESH_TESSELATION,
                              landmark_drawing_spec=None,
                              connection_drawing_spec=sol.drawing_styles.get_default_face_mesh_tesselation_style())
    sol.drawing_utils.draw_landmarks(image, results.face_landmarks, sol.holistic.FACEMESH_CONTOURS,
                              landmark_drawing_spec=None,
                              connection_drawing_spec=sol.drawing_styles.get_default_face_mesh_contours_style()) 
    # Draw pose connections
    sol.drawing_utils.draw_landmarks(image, results.pose_landmarks, sol.holistic.POSE_CONNECTIONS,
                              sol.drawing_styles.get_default_pose_landmarks_style()) 
    # Draw left hand connections
    sol.drawing_utils.draw_landmarks(image, results.left_hand_landmarks, sol.holistic.HAND_CONNECTIONS,
                             sol.drawing_styles.get_default_hand_landmarks_style(),sol.drawing_styles.get_default_hand_connections_style()) 
    # Draw right hand connections  
    sol.drawing_utils.draw_landmarks(image, results.right_hand_landmarks, sol.holistic.HAND_CONNECTIONS,
                              sol.drawing_styles.get_default_hand_landmarks_style(),sol.drawing_styles.get_default_hand_connections_style()) 
def extract_landmarks(x):
    res={}
    if not x.pose_landmarks is None:
        a=x.pose_landmarks.landmark
        b=[]
        for i in range(len(a)):
            b.append([a[i].x,a[i].y,a[i].z])
        res['pose']=b
    else:
        res['pose']=[[0,0,0]]*33
    if not x.left_hand_landmarks is None:
        a=x.left_hand_landmarks.landmark
        b=[]
        for i in range(len(a)):
            b.append([a[i].x,a[i].y,a[i].z])
        res['right_hand']=b
    else:
        res['right_hand']=[[0,0,0]]*21
    if not x.right_hand_landmarks is None:
        a=x.right_hand_landmarks.landmark
        b=[]
        for i in range(len(a)):
            b.append([a[i].x,a[i].y,a[i].z])
        res['left_hand']=b
    else:
        res['left_hand']=[[0,0,0]]*21
    if not x.face_landmarks is None:
        a=x.face_landmarks.landmark
        b=[]
        for i in range(len(a)):
            b.append([a[i].x,a[i].y,a[i].z])
        res['face']=b
    else:
        res['face']=[[0,0,0]]*468
    return res
    
def start_listen(control):
    camera=cv.VideoCapture(0,cv.CAP_DSHOW)
    camera.set(cv.CAP_PROP_FRAME_WIDTH, 1920)
    camera.set(cv.CAP_PROP_FRAME_HEIGHT, 1080)
    camera.set(cv.CAP_PROP_FPS,60)
    with sol.holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5, model_complexity=2) as holistic:
        seq=[]
        while camera.isOpened():
            ret, frame = camera.read()
            clear_output(wait=True)
            frame=frame[:,::-1,:]
            
            image = cv.cvtColor(frame, cv.COLOR_BGR2RGB) # COLOR CONVERSION BGR 2 RGB
            image.flags.writeable = False                  # Image is no longer writeable
            results = holistic.process(image)                 # Make prediction
            image.flags.writeable = True                   # Image is now writeable 
            image = cv.cvtColor(image, cv.COLOR_RGB2BGR) # COLOR COVERSION RGB 2 BGR
    
            draw_styled_landmarks(image, results)
            control.listen(extract_landmarks(results))
            
            cv.imshow('OpenCV Feed', image)
    
            if cv.waitKey(20) & 0xFF == ord('q'):
                break
        camera.release()
        cv.destroyAllWindows()

#### 工具，用于生成用于对比的姿势文件

In [11]:
def get_data():
    camera=cv.VideoCapture(0,cv.CAP_DSHOW)
    camera.set(cv.CAP_PROP_FRAME_WIDTH, 1920)
    camera.set(cv.CAP_PROP_FRAME_HEIGHT, 1080)
    camera.set(cv.CAP_PROP_FPS,60)
    res={}
    with sol.holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5, model_complexity=2) as holistic:
        seq=[]
        while camera.isOpened():
            ret, frame = camera.read()
            clear_output(wait=True)
            frame=frame[:,::-1,:]
            
            image = cv.cvtColor(frame, cv.COLOR_BGR2RGB) # COLOR CONVERSION BGR 2 RGB
            image.flags.writeable = False                  # Image is no longer writeable
            results = holistic.process(image)                 # Make prediction
            image.flags.writeable = True                   # Image is now writeable 
            image = cv.cvtColor(image, cv.COLOR_RGB2BGR) # COLOR COVERSION RGB 2 BGR
    
            draw_styled_landmarks(image, results)
            
            cv.imshow('OpenCV Feed', image)
    
            if cv.waitKey(20) & 0xFF == ord('q'):
                break
            if cv.waitKey(20) & 0xFF == ord(' '):
                res=extract_landmarks(results)
                break
        camera.release()
        cv.destroyAllWindows()
    with open("./output.json","w") as f:
        f.write(json.dumps(res))

In [None]:
# get_data() 

#### 启动识别

In [17]:
start_listen(controller)

listening prepare start calc


#### Demo config
``` json
[
    {
        "id":"1",
        "name":"prepare start calc",
        "data_type":["left_hand"],
        "match_data":"D:\\sjtu\\project\\continus\\hand1.json",
        "command":"",
        "sensetive":5,
        "start":true,
        "next_tasks":[{"operator":"stop","id":"1"},{"operator":"start","id":"2"},{"operator":"start","id":"3"}]
    },
    {
        "id":"2",
        "name":"start calc",
        "data_type":"time",
        "match_data":10000,
        "command":"start calc",
        "sensetive":3,
        "start":false,
        "next_tasks":[{"operator":"stop","id":"2"},{"operator":"stop","id":"3"},{"operator":"start","id":"4"}]
    },
    {
        "id":"3",
        "name":"abort start",
        "data_type":["right_hand"],
        "match_data":"D:\\sjtu\\project\\continus\\hand2.json",
        "command":"",
        "sensetive":3,
        "start":false,
        "next_tasks":[{"operator":"stop","id":"2"},{"operator":"stop","id":"3"},{"operator":"start","id":"1"}]
    },
    {
        "id":"4",
        "name":"close calc",
        "data_type":["right_hand"],
        "match_data":"D:\\sjtu\\project\\continus\\hand2.json",
        "command":"taskkill /F /IM CalculatorApp.exe",
        "sensetive":3,
        "start":false,
        "next_tasks":[{"operator":"stop","id":"4"},{"operator":"start","id":"1"}]
    },
    {
        "id":"5",
        "name":"open cmd",
        "data_type":["right_hand"],
        "match_data":"contain",
        "command":"start explorer.exe"q,
        "sensetive":3,
        "start":true,
        "next_tasks":[{"operator":"stop","id":"5"}]
    }
]

```