In [None]:
!pip install mediapipe

In [3]:
from matplotlib import pyplot as plt
import os
import numpy as np
import csv
import cv2
from PIL import Image,ImageDraw
import sys
from tqdm import tqdm
from mediapipe.python.solutions import drawing_utils as mp_drawing
from mediapipe.python.solutions import pose as mp_pose
import io
from PIL import Image
from PIL import ImageFont
from PIL import ImageDraw
import requests
import json
import time

In [4]:
#图片可视化
def show_image(img,figsize=(10,10)):
    plt.figure(figsize=figsize)
    plt.imshow(img)
    plt.show()

In [5]:
#人体姿态编码
class FullBodyPoseEmbedder():

    def __init__(self,sport_type,torso_size_multiplier=2.5):
        #torso_size_multiplier:身体尺寸乘数
        self._torso_size_multiplier = torso_size_multiplier

        #运动类型
        self._sport_type = sport_type

        #_landmark_names:关键点名称列表
        self._landmark_names = [
            'nose',
            'left_eye_inner','left_eye','left_eye_outer',
            'right_eye_inner','right_eye','right_eye_outer',
            'left_ear','right_ear',
            'mouth_left','mouth_right',
            'left_shoulder','right_shoulder',
            #手肘
            'left_elbow','right_elbow',
            #手腕
            'left_wrist','right_wrist',
            'left_pinky_1','right_pinky_1',
            'left_index_1','right_index_1',
            'left_thumb_2','right_thumb_2',
            #臀部
            'left_hip','right_hip',
            'left_knee','right_knee',
            #脚踝
            'left_ankle','right_ankle',
            'left_heel','right_heel',
            'left_foot_index','right_foot_index'
        ]

        #TODO:局部关键点数组

    #landmarks:关键点数组
    def __call__(self,landmarks):
        #landmarks.shape[0]:关键点数量
        assert landmarks.shape[0] == len(self._landmark_names),'Unexpected number of landmarks: {}'.format(landmarks.shape[0])

        #Get pose landmarks.
        landmarks = np.copy(landmarks)

        landmarks = self._normalize_pose_landmarks(landmarks)

        embedding = self._get_pose_distance_embedding(landmarks,self._sport_type)

        return embedding

    #标准化关键点位置
    def _normalize_pose_landmarks(self,landmarks):
        landmarks = np.copy(landmarks)

        #Normalize translation.
        pose_center = self._get_pose_center(landmarks)
        #将关键点平移到该中心
        landmarks -= pose_center

        #Normalize scale.
        pose_size = self._get_pose_size(landmarks,self._torso_size_multiplier)
        #归一化
        landmarks /= pose_size
        #非必须,但方便处理
        landmarks *= 100

        return landmarks

    #获取姿态中心点
    def _get_pose_center(self,landmarks):
        left_hip = landmarks[self._landmark_names.index('left_hip')]
        right_hip = landmarks[self._landmark_names.index('right_hip')]
        center = (left_hip + right_hip) * 0.5
        return center

    #获取姿态尺寸
    def _get_pose_size(self,landmarks,torso_size_multiplier):

        landmarks = landmarks[:,:2]

        #臀部中点
        left_hip = landmarks[self._landmark_names.index('left_hip')]
        right_hip = landmarks[self._landmark_names.index('right_hip')]
        hips = (left_hip + right_hip) * 0.5

        #肩部中点
        left_shoulder = landmarks[self._landmark_names.index('left_shoulder')]
        right_shoulder = landmarks[self._landmark_names.index('right_shoulder')]
        shoulders = (left_shoulder + right_shoulder) * 0.5

        #计算欧几里得距离
        hip_distance = np.linalg.norm(hips - shoulders)
        pose_size = hip_distance * torso_size_multiplier
        return pose_size

    def _get_pose_distance_embedding(self,landmarks,sport_type):
        # 初始化列表来存储距离
        if sport_type == 'squat':
          distances = [
              self._get_distance_by_names(landmarks,'left_shoulder','left_ankle'),
              self._get_distance_by_names(landmarks,'right_shoulder','right_ankle'),

              self._get_distance_by_names(landmarks,'left_shoulder','left_knee'),
              self._get_distance_by_names(landmarks,'right_shoulder','right_knee'),

              self._get_distance_by_names(landmarks,'left_hip','left_ankle'),
              self._get_distance_by_names(landmarks,'right_hip','right_ankle'),

              self._get_distance_by_names(landmarks,'left_knee','left_ankle'),
              self._get_distance_by_names(landmarks,'right_knee','right_ankle'),

              self._get_distance_by_names(landmarks,'left_hip','left_wrist'),
              self._get_distance_by_names(landmarks,'right_hip','right_wrist'),

              self._get_distance_by_names(landmarks,'left_elbow','right_elbow'),
              self._get_distance_by_names(landmarks,'left_knee','right_knee'),

              self._get_distance_by_names(landmarks,'left_hip','left_knee'),
              self._get_distance_by_names(landmarks,'right_hip','right_knee')
          ]
        elif sport_type == 'push-up':
          distances = [
              self._get_distance_by_names(landmarks,'left_shoulder','left_ankle'),
              self._get_distance_by_names(landmarks,'right_shoulder','right_ankle'),

              self._get_distance_by_names(landmarks,'left_hip','left_wrist'),
              self._get_distance_by_names(landmarks,'right_hip','right_wrist'),

              self._get_distance_by_names(landmarks,'left_shoulder','left_elbow'),
              self._get_distance_by_names(landmarks,'right_shoulder','right_elbow'),

              self._get_distance_by_names(landmarks,'left_shoulder','left_wrist'),
              self._get_distance_by_names(landmarks,'right_shoulder','right_wrist'),

              self._get_distance_by_names(landmarks,'left_wrist','left_elbow'),
              self._get_distance_by_names(landmarks,'right_wrist','right_elbow')
          ]
        elif sport_type == 'sit-up':
          distances = [
              self._get_distance_by_names(landmarks,'left_shoulder','left_ankle'),
              self._get_distance_by_names(landmarks,'right_shoulder','right_ankle'),

              self._get_distance_by_names(landmarks,'left_hip','left_wrist'),
              self._get_distance_by_names(landmarks,'right_hip','right_wrist'),

              self._get_distance_by_names(landmarks,'left_shoulder','left_knee'),
              self._get_distance_by_names(landmarks,'right_shoulder','right_knee'),

              self._get_distance_by_names(landmarks,'left_knee','left_wrist'),
              self._get_distance_by_names(landmarks,'right_knee','right_wrist'),

              self._get_distance_by_names(landmarks,'left_wrist','left_ankle'),
              self._get_distance_by_names(landmarks,'right_wrist','right_ankle')
          ]
        elif sport_type == 'dumbbell-curl':
          distances = [
              self._get_distance_by_names(landmarks,'left_shoulder','left_wrist'),
              self._get_distance_by_names(landmarks,'right_shoulder','right_wrist'),

              self._get_distance_by_names(landmarks,'left_hip','left_wrist'),
              self._get_distance_by_names(landmarks,'right_hip','right_wrist'),

              self._get_distance_by_names(landmarks,'left_knee','left_wrist'),
              self._get_distance_by_names(landmarks,'right_knee','right_wrist'),

              self._get_distance_by_names(landmarks,'left_wrist','left_ankle'),
              self._get_distance_by_names(landmarks,'right_wrist','right_ankle'),

              self._get_distance_by_names(landmarks,'left_wrist','left_elbow'),
              self._get_distance_by_names(landmarks,'right_wrist','right_elbow'),

              self._get_distance_by_names(landmarks,'left_wrist','left_heel'),
              self._get_distance_by_names(landmarks,'right_wrist','right_heel')
          ]
        elif sport_type == 'fly':
          distances = [
              self._get_distance_by_names(landmarks,'left_shoulder','left_wrist'),
              self._get_distance_by_names(landmarks,'right_shoulder','right_wrist'),

              self._get_distance_by_names(landmarks,'left_hip','left_wrist'),
              self._get_distance_by_names(landmarks,'right_hip','right_wrist'),

              self._get_distance_by_names(landmarks,'left_knee','left_wrist'),
              self._get_distance_by_names(landmarks,'right_knee','right_wrist'),

              self._get_distance_by_names(landmarks,'left_wrist','left_ankle'),
              self._get_distance_by_names(landmarks,'right_wrist','right_ankle'),

              self._get_distance_by_names(landmarks,'left_wrist','left_elbow'),
              self._get_distance_by_names(landmarks,'right_wrist','right_elbow'),

              self._get_distance_by_names(landmarks,'left_wrist','left_heel'),
              self._get_distance_by_names(landmarks,'right_wrist','right_heel')
          ]
        elif sport_type == 'folding-blade-push-ups':
          distances = [
              self._get_distance_by_names(landmarks,'left_shoulder','left_ankle'),
              self._get_distance_by_names(landmarks,'right_shoulder','right_ankle'),

              self._get_distance_by_names(landmarks,'left_hip','left_wrist'),
              self._get_distance_by_names(landmarks,'right_hip','right_wrist'),

              self._get_distance_by_names(landmarks,'left_shoulder','left_elbow'),
              self._get_distance_by_names(landmarks,'right_shoulder','right_elbow'),

              self._get_distance_by_names(landmarks,'left_shoulder','left_wrist'),
              self._get_distance_by_names(landmarks,'right_shoulder','right_wrist'),

              self._get_distance_by_names(landmarks,'left_wrist','left_elbow'),
              self._get_distance_by_names(landmarks,'right_wrist','right_elbow')
          ]
        elif sport_type == 'leg-raise':
          distances = [
              self._get_distance_by_names(landmarks,'left_shoulder','left_ankle'),
              self._get_distance_by_names(landmarks,'right_shoulder','right_ankle'),

              self._get_distance_by_names(landmarks,'left_hip','left_ankle'),
              self._get_distance_by_names(landmarks,'right_hip','right_ankle'),

              self._get_distance_by_names(landmarks,'left_shoulder','left_knee'),
              self._get_distance_by_names(landmarks,'right_shoulder','right_knee'),

              self._get_distance_by_names(landmarks,'left_hip','left_knee'),
              self._get_distance_by_names(landmarks,'right_hip','right_knee')
          ]
        elif sport_type == 'jumping-jack':
          distances = [
              self._get_distance_by_names(landmarks,'left_shoulder','left_wrist'),
              self._get_distance_by_names(landmarks,'right_shoulder','right_wrist'),

              self._get_distance_by_names(landmarks,'left_hip','left_wrist'),
              self._get_distance_by_names(landmarks,'right_hip','right_wrist'),

              self._get_distance_by_names(landmarks,'left_knee','left_wrist'),
              self._get_distance_by_names(landmarks,'right_knee','right_wrist'),

              self._get_distance_by_names(landmarks,'left_wrist','left_ankle'),
              self._get_distance_by_names(landmarks,'right_wrist','right_ankle'),

              self._get_distance_by_names(landmarks,'left_wrist','left_elbow'),
              self._get_distance_by_names(landmarks,'right_wrist','right_elbow'),

              self._get_distance_by_names(landmarks,'left_wrist','left_heel'),
              self._get_distance_by_names(landmarks,'right_wrist','right_heel')
          ]
        elif sport_type == 'all':
          distances = []
          for i in range(len(self._landmark_names)):
            for j in range(i+1,len(self._landmark_names)):
              
              #TODO:关键点筛选
              
              #TODO:局部关键点判断

              #TODO:关键点加权


              distances.append(self._get_distance_by_names(landmarks,self._landmark_names[i],self._landmark_names[j]))
        # 将距离列表转换为NumPy数组，形成嵌入向量
        embedding = np.array(distances)
        return embedding

    #根据给定的两个关键点名称，计算这两个点的坐标平均值
    def _get_average_by_name(self,landmarks,name_from,name_to):
        lmk_from = landmarks[self._landmark_names.index(name_from)]
        lmk_to = landmarks[self._landmark_names.index(name_to)]
        return (lmk_from + lmk_to) * 0.5

    def _get_distance_by_names(self,landmarks,name_from,name_to):
        lmk_from = landmarks[self._landmark_names.index(name_from)]
        lmk_to = landmarks[self._landmark_names.index(name_to)]
        return self._get_distance(lmk_from,lmk_to)

    def _get_distance(self,lmk_from,lmk_to):
      return lmk_to - lmk_from

In [6]:
#异常样本检测
class PoseSample():

    def __init__(self,name,landmarks,class_name,embedding):
        self.name = name
        self.landmarks = landmarks
        self.class_name = class_name

        self.embedding = embedding

class PoseSampleOutlier():

    def __init__(self,sample,detected_class,all_classes):
        self.sample = sample
        self.detected_class = detected_class
        self.all_classes = all_classes

class PoseClassifier():

  def __init__(self,
        pose_samples_folder,    #存放姿态样本的文件路径
        pose_embedder,       #嵌入器
        mean_dist_heap={},
        file_extension='csv',   #姿势样本文件的扩展名
        file_separator=',',    #用于分割CSV文件中数据的字符
        n_landmarks=33,      #姿势中的关键点数量
        n_dimensions=3,      #每个关键点的维度数量
        top_n_by_max_distance=30, #基于最大距离选择的顶部样本数量
        top_n_by_mean_distance=10,#基于平均距离选择的顶部样本数量
        axes_weights=(1.0,1.0,0.2)):#各轴的权重
    self._pose_embedder = pose_embedder
    self._n_landmarks = n_landmarks
    self._n_dimensions = n_dimensions
    self._top_n_by_max_distance = top_n_by_max_distance
    self._top_n_by_mean_distance = top_n_by_mean_distance
    self._axes_weights = axes_weights
    self.mean_dist_heap = mean_dist_heap


    #加载的姿态样本
    self._pose_samples = self._load_pose_samples(pose_samples_folder,
                            file_extension,
                            file_separator,
                            n_landmarks,
                            n_dimensions,
                            pose_embedder)

  #加载姿态样本
  def _load_pose_samples(self,
              pose_samples_folder,#存放姿势样本文件的路径
              file_extension,     #姿势样本文件的扩展名
              file_separator,     #CSV文件中用于分隔数据的字符
              n_landmarks,        #关键点的数量
              n_dimensions,       #每个关键点的维度数量
              pose_embedder):     #用于嵌入姿势的嵌入器

    #遍历所有指定路径下指定扩展名结尾的文件
    file_names = [name for name in os.listdir(pose_samples_folder) if name.endswith(file_extension)]

    #计算每个姿态的嵌入向量
    pose_samples = []
    for file_name in file_names:
      #切片获取文件名(去掉扩展名)
      class_name = file_name[:-(len(file_extension)+1)]

      #打开文件
      with open(os.path.join(pose_samples_folder,file_name)) as csv_file:
        csv_reader = csv.reader(csv_file,delimiter=file_separator)
        #遍历CSV文件的每一行数据
        for row in csv_reader:
          assert len(row) == n_landmarks * n_dimensions +1,'Wrong number of values: {}'.format(len(row))
          #提取姿态关键点
          landmarks = np.array(row[1:],np.float32).reshape([n_landmarks,n_dimensions])
          pose_samples.append(PoseSample(
              name=row[0],
              landmarks=landmarks,
              class_name=class_name,
              embedding=pose_embedder(landmarks)
          ))

    return pose_samples

  #查找姿势样本中的异常值
  def find_pose_sample_outliers(self):
    outliers = []
    for sample in self._pose_samples:
        pose_landmarks = sample.landmarks.copy()
        pose_classification = self.__call__(pose_landmarks)
        class_names = [class_name for class_name,count in pose_classification.items() if count == max(pose_classification.values())]

        if sample.class_name not in class_names or len(class_names) != 1:
            outliers.append(PoseSampleOutlier(sample,class_names,pose_classification))

    return outliers

  def __call__(self,pose_landmarks,flag=3):
    #检查姿势关键点数组形状是否符合预期
    assert pose_landmarks.shape == (self._n_landmarks,self._n_dimensions),'Unexpected shape: {}'.format(pose_landmarks.shape)

    #计算输入姿势的嵌入向量
    pose_embedding = self._pose_embedder(pose_landmarks)
    #翻转后的姿势嵌入向量(沿x轴)
    flipped_pose_embedding = self._pose_embedder(pose_landmarks * np.array([-1,1,1]))

    if flag == 1:
      max_dist_heap = {}
      for sample_idx,sample in enumerate(self._pose_samples):
        max_dist = min(
            np.max(np.abs(sample.embedding - pose_embedding)*self._axes_weights),
            np.max(np.abs(sample.embedding - flipped_pose_embedding)*self._axes_weights)
        )
        max_dist_heap[sample_idx] = max_dist
      for key,value in max_dist_heap.items():
          sample = self._pose_samples[key]
          mean_dist = min(
              np.mean(np.abs(sample.embedding - pose_embedding)*self._axes_weights),
              np.mean(np.abs(sample.embedding - flipped_pose_embedding)*self._axes_weights)
          )
          self.mean_dist_heap[self._pose_samples[key].name] = mean_dist
      self.mean_dist_heap = sorted(self.mean_dist_heap.items(),key=lambda item:item[1])[:self._top_n_by_mean_distance]
      self.mean_dist_heap = {k:v for k,v in self.mean_dist_heap}

    elif flag == 2:
      #筛选平均距离堆
      self.mean_dist_heap = sorted(self.mean_dist_heap.items(),key=lambda item:item[1])[:self._top_n_by_mean_distance]

      #统计类名数量
      class_names = []
      for i in self.mean_dist_heap:
        key = i[0].split('.')[0]
        key = key.split(' ')[0]
        class_names.append(key)
      result = {class_name:class_names.count(class_name) for class_name in set(class_names)}

      return result
    else:
      #计算每个样本的嵌入向量与输入姿势嵌入向量之间的最大加权距离
      max_dist_heap = []
      #遍历预定义姿势样本
      for sample_idx,sample in enumerate(self._pose_samples):
        max_dist = min(
            np.max(np.abs(sample.embedding - pose_embedding)*self._axes_weights),
            np.max(np.abs(sample.embedding - flipped_pose_embedding)*self._axes_weights)
        )
        max_dist_heap.append([max_dist,sample_idx])

      #筛选最大距离堆
      max_dist_heap = sorted(max_dist_heap,key=lambda x:x[0])
      max_dist_heap = max_dist_heap[:self._top_n_by_max_distance]

      #计算平均距离堆
      mean_dist_heap = []
      for _,sample_idx in max_dist_heap:
          sample = self._pose_samples[sample_idx]
          mean_dist = min(
              np.mean(np.abs(sample.embedding - pose_embedding)*self._axes_weights),
              np.mean(np.abs(sample.embedding - flipped_pose_embedding)*self._axes_weights)
          )
          mean_dist_heap.append([mean_dist,sample_idx])

      #筛选平均距离堆
      mean_dist_heap = sorted(mean_dist_heap,key=lambda x:x[0])
      mean_dist_heap = mean_dist_heap[:self._top_n_by_mean_distance]

      #统计类名数量
      class_names = [self._pose_samples[sample_idx].class_name for _, sample_idx in mean_dist_heap]
      result = {class_name:class_names.count(class_name) for class_name in set(class_names)}

      return result


In [7]:
#姿态分类结果平滑
class EMADicSmoothing():

  def __init__(self,window_size=10,alpha=0.2):
    self._window_size = window_size
    self._alpha = alpha

    self._data_in_window = []

  def __call__(self,data):
    self._data_in_window.insert(0,data)
    self._data_in_window = self._data_in_window[:self._window_size]

    keys = set([key for data in self._data_in_window for key, _ in data.items()])

    smoothed_data = dict()
    for key in keys:
      factor = 1.0
      top_sum = 0.0
      bottom_sum = 0.0
      for data in self._data_in_window:
        value = data[key] if key in data else 0.0

        top_sum += factor * value
        bottom_sum += factor

        factor *= (1.0 - self._alpha)

      smoothed_data[key] = top_sum / bottom_sum

    return smoothed_data

In [8]:
#动作分类
class ActionClassifier():
  def __init__(self):
    self.total = {'push-up':0,'squat':0,'sit-up':0,'dumbbell-curl':0,'fly':0,'folding-blade-push-ups':0,'leg-raise':0,'jumping-jack':0}
    self.sport_types = ['push-up','squat','sit-up','dumbbell-curl','fly','folding-blade-push-ups','leg-raise','jumping-jack']
    self.flag = False
    self.pose_embedder = FullBodyPoseEmbedder(sport_type='all')
    self.mean_dist_heap = {}
    self.start = 0

  def __call__(self,pose_landmarks):
    if self.flag:
      return self.sport_type
    for i in self.sport_types:
      pose_samples_folder = 'csvs_out/' + i

      pose_classifier = PoseClassifier(
          mean_dist_heap=self.mean_dist_heap,
          pose_samples_folder=pose_samples_folder,
          pose_embedder=self.pose_embedder,
          top_n_by_max_distance=30,
          top_n_by_mean_distance=10)

      pose_classifier(pose_landmarks,1)
      self.mean_dist_heap = pose_classifier.mean_dist_heap

    pose_classification_2 = pose_classifier(pose_landmarks,2)
    if self.start == 1:
      if pose_classification_2 != self.pose_classification:
        self.start = 0
    if self.start == 0:
      self.up = 0
      self.down = 0
      for i in self.sport_types:
        self.total[i] += pose_classification_2.get(i+'_'+'down',0) + pose_classification_2.get(i+'_'+'up',0)
        self.up += pose_classification_2.get(i+'_'+'up',0)
        self.down += pose_classification_2.get(i+'_'+'down',0)
      if self.down > self.up:
        self.flag = True
        n = 0
        for i in self.sport_types:
          if self.total[i] > n:
            self.sport_type = i
            n = self.total[i]
    if self.start == 0:
      self.pose_classification = pose_classification_2
      self.start = 1

    return pose_classification_2







In [9]:
#计数器
class RepetitionCounter():

  def __init__(self,class_name,enter_threshold=6,exit_threshold=4):
    self._class_name = class_name

    self._enter_threshold = enter_threshold
    self._exit_threshold = exit_threshold

    self._pose_entered = False

    self._n_repeats = 0

  def n_repeats(self):
    return self._n_repeats

  def __call__(self,pose_classification):
    pose_confidence = 0.0
    if self._class_name in pose_classification:
      pose_confidence = pose_classification[self._class_name]

    if not self._pose_entered:
      self._pose_entered = pose_confidence > self._enter_threshold
      return self._n_repeats

    if pose_confidence < self._exit_threshold:
      self._n_repeats += 1
      self._pose_entered = False

    return self._n_repeats

In [26]:
#可视化模块
class PoseClassificationVisualizer():

  def __init__(self,
         class_name,     #类别名称
         plot_location_x=0.0,#绘图位置
         plot_location_y=0.0,#绘图位置
         plot_max_width=0.6, #绘图大小
         plot_max_height=0.6,#绘图大小
         plot_figsize=(9,4), #图形大小
         plot_x_max=None,  #x轴最大值
         plot_y_max=None,  #y轴最大值
         counter_location_x=0.85,#计数器在图上位置
         counter_location_y=0.05,#计数器在图上位置
         counter_font_path='https://github.com/googlefonts/roboto/blob/main/src/hinted/Roboto-Regular.ttf?raw=true',#计数器字体路径
         counter_font_color='red',#计数器字体颜色
         counter_font_size=0.15#计数器字体大小
  ):
    self._class_name = class_name
    self._plot_location_x = plot_location_x
    self._plot_location_y = plot_location_y
    self._plot_max_width = plot_max_width
    self._plot_max_height = plot_max_height
    self._plot_figsize = plot_figsize
    self._plot_x_max = plot_x_max
    self._plot_y_max = plot_y_max
    self._counter_location_x = counter_location_x
    self._counter_location_y = counter_location_y
    self._counter_font_path = counter_font_path
    self._counter_font_color = counter_font_color
    self._counter_font_size = counter_font_size

    self._counter_font = None

    self._pose_classification_history = []
    self._pose_classification_filtered_history = []

  def __call__(self,
        frame,           #输入图像帧
        pose_classification,    #当前帧的姿势分类结果
        pose_classification_filtered,#过滤后的姿势分类结果
        repetitions_count#重复次数的计数
  ):
    self._pose_classification_history.append(pose_classification)
    self._pose_classification_filtered_history.append(pose_classification_filtered)

    output_img = Image.fromarray(frame)

    output_width = output_img.size[0]
    output_height = output_img.size[1]

    img = self._plot_classification_history(output_width,output_height)
    img.thumbnail((int(output_width * self._plot_max_width),
                   int(output_height * self._plot_max_width)))
    output_img.paste(img,
            (int(output_width * self._plot_location_x),
            int(output_height * self._plot_location_y)))

    output_img_draw = ImageDraw.Draw(output_img)
    if self._counter_font is None:
      font_size = int(output_height * self._counter_font_size)
      font_request = requests.get(self._counter_font_path,allow_redirects=True)
      self._counter_font = ImageFont.truetype(io.BytesIO(font_request.content),size=font_size)
    output_img_draw.text((output_width * self._counter_location_x,
                          output_height * self._counter_location_y),
                         str(repetitions_count),
                         font=self._counter_font,
                         fill=self._counter_font_color)

    return output_img

  def _plot_classification_history(self,output_width,output_height):
    #创建图表
    fig = plt.figure(figsize=self._plot_figsize)

    for classification_history in [self._pose_classification_history,
                     self._pose_classification_filtered_history]:
      y = []
      for classification in classification_history:
        if classification is None:
          y.append(None)
        elif self._class_name in classification:
          y.append(classification[self._class_name])
        else:
          y.append(0)
      plt.plot(y,linewidth=7)

    plt.grid(axis='y',alpha=0.75)
    plt.xlabel('Frame')
    plt.ylabel('Confidence')
    plt.title('Classification heistory for `{}`'.format(self._class_name))

    if self._plot_y_max is not None:
      plt.ylim(top=self._plot_y_max)
    if self._plot_x_max is not None:
      plt.xlim(right=self._plot_x_max)

    buf = io.BytesIO()
    dpi = min(
        output_width * self._plot_max_width / float(self._plot_figsize[0]),
        output_height * self._plot_max_height / float(self._plot_figsize[1]))
    fig.savefig(buf,dpi=dpi)
    buf.seek(0)
    img = Image.open(buf)
    plt.close()

    return img

In [11]:
#样本写入
class BootstrapHelper():

  def __init__(self,
        images_in_folder,
        images_out_folder,
        csvs_out_folder,
        sport_type):
    self._images_in_folder = images_in_folder
    self._images_out_folder = images_out_folder
    self._csvs_out_folder = csvs_out_folder
    self._sport_type = sport_type

    self._pose_class_names = sorted([n for n in os.listdir(self._images_in_folder)])

  def bootstrap(self,per_pose_class_limit=None):
    if not os.path.exists(self._csvs_out_folder):
      os.makedirs(self._csvs_out_folder)

    for pose_class_name in self._pose_class_names:
      print('Bootstrapping',pose_class_name,file=sys.stderr)

      images_in_folder = os.path.join(self._images_in_folder,pose_class_name)
      images_out_folder = os.path.join(self._images_out_folder,pose_class_name)
      csv_out_path = os.path.join(self._csvs_out_folder,pose_class_name + '.csv')
      if not os.path.exists(images_out_folder):
        os.makedirs(images_out_folder)

      with open(csv_out_path,'w') as csv_out_file:
        csv_out_writer = csv.writer(csv_out_file,delimiter=',',quoting=csv.QUOTE_MINIMAL)
        image_names = sorted([n for n in os.listdir(images_in_folder) if not n.startswith('.')])
        if per_pose_class_limit is not None:
          image_names = image_names[:per_pose_class_limit]

        for image_name in tqdm(image_names):
          input_frame = cv2.imread(os.path.join(images_in_folder,image_name))
          input_frame = cv2.cvtColor(input_frame,cv2.COLOR_BGR2RGB)

          with mp_pose.Pose() as pose_tracker:
            result = pose_tracker.process(image=input_frame)
            pose_landmarks = result.pose_landmarks

          output_frame = input_frame.copy()
          if pose_landmarks is not None:
            mp_drawing.draw_landmarks(
                image=output_frame,
                landmark_list=pose_landmarks,
                connections=mp_pose.POSE_CONNECTIONS)
          output_frame = cv2.cvtColor(output_frame,cv2.COLOR_RGB2BGR)
          cv2.imwrite(os.path.join(images_out_folder,self._sport_type + '_' + image_name),output_frame)

          if pose_landmarks is not None:
            frame_height,frame_width = output_frame.shape[0],output_frame.shape[1]
            pose_landmarks = np.array(
                [[lmk.x * frame_width,lmk.y * frame_height,lmk.z * frame_width]
                 for lmk in pose_landmarks.landmark],
                dtype=np.float32)
            assert pose_landmarks.shape == (33,3),'Unexpected landmarks shape:{}'.format(pose_landmarks.shape)
            csv_out_writer.writerow([self._sport_type + '_' + image_name] + pose_landmarks.flatten().astype(np.str_).tolist())

          projection_xz = self._draw_xz_projection(
              output_frame=output_frame,pose_landmarks=pose_landmarks)
          output_frame = np.concatenate((output_frame,projection_xz),axis=1)

  #绘制关键点
  def _draw_xz_projection(self,output_frame,pose_landmarks,r=0.5,color='red'):
    #创建空白图
    frame_height,frame_width = output_frame.shape[0],output_frame.shape[1]
    img = Image.new('RGB',(frame_width,frame_height),color='white')

    if pose_landmarks is None:
      return np.asarray(img)

    r *= frame_width * 0.01

    draw = ImageDraw.Draw(img)
    for idx_1,idx_2 in mp_pose.POSE_CONNECTIONS:
      x1,y1,z1 = pose_landmarks[idx_1] * [1,1,-1] + [0,0,frame_height * 0.5]
      x2,y2,z2 = pose_landmarks[idx_2] * [1,1,-1] + [0,0,frame_height * 0.5]

      draw.ellipse([x1 - r,z1 - r,x1 + r,z1 + r],fill=color)
      draw.ellipse([x2 - r,z2 - r,x2 + r,z2 + r],fill=color)
      draw.line([x1,z1,x2,z2],width=int(r),fill=color)

    return np.asarray(img)

  def align_image_and_csvs(self,print_removed_items=False):

    for pose_class_name in self._pose_class_names:
      images_out_folder = os.path.join(self._images_out_folder,pose_class_name)
      csv_out_path = os.path.join(self._csvs_out_folder,pose_class_name + '.csv')

      rows = []
      with open(csv_out_path) as csv_out_file:
        csv_out_reader = csv.reader(csv_out_file,delimiter=',')
        for row in csv_out_reader:
          rows.append(row)

      image_names_in_csv = []

      with open(csv_out_path,'w') as csv_out_file:
        csv_out_writer = csv.writer(csv_out_file,delimiter=',',quoting=csv.QUOTE_MINIMAL)
        for row in rows:
          image_name = row[0]
          image_path = os.path.join(images_out_folder,image_name)
          if os.path.exists(image_path):
            image_names_in_csv.append(image_name)
            csv_out_writer.writerow(row)
          elif print_removed_items:
            print('Removed image from CSV: ',image_path)

      for image_name in os.listdir(images_out_folder):
        if image_name not in image_names_in_csv:
          image_path = os.path.join(images_out_folder,image_name)
          os.remove(image_path)
          if print_removed_items:
            print('Removed image from folder: ',image_path)

  def analyze_outliers(self,outliers):

    for outlier in outliers:
      image_path = os.path.join(self._images_out_folder,outlier.sample.class_name,outlier.sample.name)

      print('Outlier')
      print(' sample path = ',image_path)
      print(' sample class = ',outlier.sample.class_name)
      print(' detected class = ',outlier.detected_class)
      print(' all classes = ',outlier.all_classes)

      img = cv2.imread(image_path)
      img = cv2.cvtColor(img,cv2.COLOR_BGR2RGB)
      show_image(img,figsize=(20,20))
  def remove_outliers(self,outliers):
    for outlier in outliers:
      image_path = os.path.join(self._images_out_folder,outlier.sample.class_name,outlier.sample.name)
      os.remove(image_path)

  def print_images_in_statistics(self):
    self._print_images_statistics(self._images_in_folder,self._pose_class_names)

  def print_images_out_statistics(self):
    self._print_images_statistics(self._images_out_folder,self._pose_class_names)

  def _print_images_statistics(self,images_folder,pose_class_names):
    print('Number of images per pose class:')
    for pose_class_name in pose_class_names:
      n_images = len([
          n for n in os.listdir(os.path.join(images_folder,pose_class_name))
          if not n.startswith('.')])
      print(' {}:{}'.format(pose_class_name,n_images))





In [None]:
from google.colab import drive
from google.colab import files
drive.mount("/content/drive",force_remount=True)

In [None]:
import os
os.getcwd()

In [None]:
os.listdir('./')

In [None]:
os.chdir('./drive/MyDrive/check/')

In [None]:
os.listdir()

In [None]:
sport_type = 'jumping-jack'
bootstrap_images_in_folder = 'image/' + sport_type

bootstrap_images_out_folder = 'images_out/' + sport_type
bootstrap_csvs_out_folder = 'csvs_out/' + sport_type

In [None]:
bootstrap_helper = BootstrapHelper(
    images_in_folder=bootstrap_images_in_folder,
    images_out_folder=bootstrap_images_out_folder,
    csvs_out_folder=bootstrap_csvs_out_folder,
    sport_type=sport_type
)

In [None]:
bootstrap_helper.print_images_in_statistics()

In [None]:
bootstrap_helper.bootstrap(per_pose_class_limit=None)

In [None]:
bootstrap_helper.print_images_out_statistics()

In [None]:
bootstrap_helper.align_image_and_csvs(print_removed_items=False)
bootstrap_helper.print_images_out_statistics()

In [None]:
pose_embedder = FullBodyPoseEmbedder(sport_type=sport_type)

pose_classifier = PoseClassifier(
    pose_samples_folder=bootstrap_csvs_out_folder,
    pose_embedder=pose_embedder,
    top_n_by_max_distance=30,
    top_n_by_mean_distance=10)

outliers = pose_classifier.find_pose_sample_outliers()
print('Number of outliers: ',len(outliers))

In [None]:
bootstrap_helper.analyze_outliers(outliers)

In [None]:
bootstrap_helper.remove_outliers(outliers)

In [None]:
bootstrap_helper.align_image_and_csvs(print_removed_items=False)
bootstrap_helper.print_images_out_statistics()

In [None]:
def dump_for_the_app():
  pose_samples_folder = 'csvs_out/' + sport_type
  pose_samples_csv_path = sport_type + '_csvs_out_basic.csv'
  file_extension = 'csv'
  file_separator = ','

  file_names = [name for name in os.listdir(pose_samples_folder) if name.endswith(file_extension)]

  with open(pose_samples_csv_path,'w') as csv_out:
    csv_out_writer = csv.writer(csv_out,delimiter=file_separator,quoting=csv.QUOTE_MINIMAL)
    for file_name in file_names:
      class_name = sport_type + '_' + file_name[:-(len(file_extension) + 1)]

      with open(os.path.join(pose_samples_folder,file_name)) as csv_in:
        csv_in_reader = csv.reader(csv_in,delimiter=file_separator)
        for row in csv_in_reader:
          row.insert(1,class_name)
          csv_out_writer.writerow(row)

  files.download(pose_samples_csv_path)

dump_for_the_app()

In [None]:
video_path = '/content/drive/MyDrive/check/video/jumping-jack.mp4'
class_name = 'down'
out_video_path = '/content/drive/MyDrive/check/video/out_jumping-jack.mp4'

video_cap = cv2.VideoCapture(video_path)

video_n_frames = video_cap.get(cv2.CAP_PROP_FRAME_COUNT)
video_fps = video_cap.get(cv2.CAP_PROP_FPS)
video_width = int(video_cap.get(cv2.CAP_PROP_FRAME_WIDTH))
video_height = int(video_cap.get(cv2.CAP_PROP_FRAME_HEIGHT))


pose_samples_folder = 'csvs_out/' + sport_type

pose_tracker = mp_pose.Pose()


pose_embedder = FullBodyPoseEmbedder(sport_type=sport_type)


pose_classifier = PoseClassifier(
    pose_samples_folder=pose_samples_folder,
    pose_embedder=pose_embedder,
    top_n_by_max_distance=30,
    top_n_by_mean_distance=10)

pose_classification_filter = EMADicSmoothing(
    window_size=10,
    alpha=0.2
)

repetition_counter = RepetitionCounter(
    class_name=class_name,
    enter_threshold=6,
    exit_threshold=4)

pose_classification_visualizer = PoseClassificationVisualizer(
    class_name=class_name,
    plot_x_max=video_n_frames,
    plot_y_max=10)

out_video = cv2.VideoWriter(out_video_path,cv2.VideoWriter_fourcc(*'mp4v'),video_fps,(video_width,video_height))

frame_idx = 0
output_frame = None
with tqdm(total=video_n_frames) as pbar:
  while video_cap.isOpened():
    success,input_frame = video_cap.read()
    if not success:
      break

    input_frame = cv2.cvtColor(input_frame,cv2.COLOR_BGR2RGB)
    result = pose_tracker.process(image=input_frame)
    pose_landmarks = result.pose_landmarks

    output_frame = input_frame.copy()
    if pose_landmarks is not None:
      mp_drawing.draw_landmarks(
          image=output_frame,
          landmark_list=pose_landmarks,
          connections=mp_pose.POSE_CONNECTIONS)

    if pose_landmarks is not None:
      frame_height,frame_width = output_frame.shape[0],output_frame.shape[1]
      pose_landmarks = np.array([[lmk.x * frame_width,lmk.y * frame_height,lmk.z * frame_width]
                    for lmk in pose_landmarks.landmark],dtype=np.float32)
      assert pose_landmarks.shape == (33,3),'Unexpected landmarks shape: {}'.format(pose_landmarks.shape)

      pose_classification = pose_classifier(pose_landmarks)
      pose_classification_filtered = pose_classification_filter(pose_classification)
      print(pose_classification_filtered)
      repetitions_count = repetition_counter(pose_classification_filtered)
    else:
      pose_classification = None

      pose_classification_filtered = pose_classification_filter(dict())
      pose_classification_filtered = None

      repetitions_count = repetition_counter._n_repeats

    output_frame = pose_classification_visualizer(
        frame=output_frame,
        pose_classification=pose_classification,
        pose_classification_filtered=pose_classification_filtered,
        repetitions_count=repetitions_count)

    out_video.write(cv2.cvtColor(np.array(output_frame),cv2.COLOR_RGB2BGR))

    if frame_idx % 50 == 0:
      show_image(output_frame)

    frame_idx += 1
    pbar.update(1)

out_video.release()
pose_tracker.close()

if output_frame is not None:
  show_image(output_frame)


In [None]:
#使用PIL生成带有中文的文本图像
def draw_chinese_text(text, pos, img_PIL, font_path, font_size=40, color=(0, 0, 255)):
    font = ImageFont.truetype(font_path, font_size, encoding="utf-8")
    fillColor = color
    position = pos
    draw = ImageDraw.Draw(img_PIL)
    draw.text(position, text, font=font, fill=fillColor)
    return img_PIL

In [None]:
video_path = '/content/drive/MyDrive/check/video/深蹲1.mp4'
class_name = 'down'
out_video_path = '/content/drive/MyDrive/check/video/out_深蹲1.mp4'

In [None]:
video_cap = cv2.VideoCapture(video_path)

video_n_frames = video_cap.get(cv2.CAP_PROP_FRAME_COUNT)
video_fps = video_cap.get(cv2.CAP_PROP_FPS)
video_width = int(video_cap.get(cv2.CAP_PROP_FRAME_WIDTH))
video_height = int(video_cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

In [None]:
pose_actionclassifier = ActionClassifier()

pose_tracker = mp_pose.Pose()

pose_classification_filter = EMADicSmoothing(
              window_size=10,
              alpha=0.2
          )

repetition_counter = RepetitionCounter(
              class_name=class_name,
              enter_threshold=6,
              exit_threshold=4)

pose_classification_visualizer = PoseClassificationVisualizer(
              class_name=class_name,
              plot_x_max=video_n_frames,
              plot_y_max=10)


In [None]:
flag = True
out_video = cv2.VideoWriter(out_video_path,cv2.VideoWriter_fourcc(*'mp4v'),video_fps,(video_width,video_height))

frame_idx = 0
output_frame = None
with tqdm(total=video_n_frames) as pbar:
  while video_cap.isOpened():
    success,input_frame = video_cap.read()
    if not success:
      break

    input_frame = cv2.cvtColor(input_frame,cv2.COLOR_BGR2RGB)
    result = pose_tracker.process(image=input_frame)
    pose_landmarks = result.pose_landmarks

    output_frame = input_frame.copy()
    if pose_landmarks is not None:
      mp_drawing.draw_landmarks(
          image=output_frame,
          landmark_list=pose_landmarks,
          connections=mp_pose.POSE_CONNECTIONS)

    if pose_landmarks is not None:
      frame_height,frame_width = output_frame.shape[0],output_frame.shape[1]
      pose_landmarks = np.array([[lmk.x * frame_width,lmk.y * frame_height,lmk.z * frame_width]
                    for lmk in pose_landmarks.landmark],dtype=np.float32)
      assert pose_landmarks.shape == (33,3),'Unexpected landmarks shape: {}'.format(pose_landmarks.shape)

      if pose_actionclassifier.flag:
        if flag:
          sport_type = pose_actionclassifier(pose_landmarks)
          if sport_type == 'sit-up':
            name = '仰卧起坐'
          elif sport_type == 'push-up':
            name = '俯卧撑'
          elif sport_type == 'squat':
            name = '深蹲'
          elif sport_type == 'dumbbell-curl':
            name = '哑铃弯举'
          elif sport_type == 'leg-raise':
            name = '仰卧举腿'
          elif sport_type == 'folding-blade-push-ups':
            name = '折刀俯卧撑'
          elif sport_type == 'fly':
            name = '侧平举'
          elif sport_type == 'jumping-jack':
            name = '开合跳'

          print('------------------------------------------'+sport_type+'----------------------------------------------------------')
          pose_samples_folder = 'csvs_out/' + sport_type

          pose_embedder = FullBodyPoseEmbedder(sport_type=sport_type)

          pose_classifier = PoseClassifier(
              pose_samples_folder=pose_samples_folder,
              pose_embedder=pose_embedder,
              top_n_by_max_distance=30,
              top_n_by_mean_distance=10)

          flag = False

        pose_classification = pose_classifier(pose_landmarks)

        pose_classification_filtered = pose_classification_filter(pose_classification)
        print(pose_classification_filtered)

        repetitions_count = repetition_counter(pose_classification_filtered)
      else:
        pose_classification = pose_actionclassifier(pose_landmarks)
        print(pose_classification)
        pose_classification_filtered = pose_classification_filter(pose_classification)
        repetitions_count = repetition_counter(pose_classification_filtered)
    else:
      pose_classification = None

      pose_classification_filtered = pose_classification_filter(dict())
      pose_classification_filtered = None

      repetitions_count = repetition_counter._n_repeats

    output_frame = pose_classification_visualizer(
        frame=output_frame,
        pose_classification=pose_classification,
        pose_classification_filtered=pose_classification_filtered,
        repetitions_count=repetitions_count)
    output_frame = np.array(output_frame)
    if flag == False:
      # 转换为PIL图像
      img_PIL = Image.fromarray(cv2.cvtColor(output_frame, cv2.COLOR_BGR2RGB))

      # 设置文本内容、位置、字体等
      text = name
      pos = (50, 50)
      font_path = 'code/STSONG.TTF'

      # 使用PIL在图像上绘制中文
      img_PIL_with_text = draw_chinese_text(text, pos, img_PIL, font_path)

      # 将PIL图像转回OpenCV格式并显示
      output_frame = cv2.cvtColor(np.asarray(img_PIL_with_text), cv2.COLOR_RGB2BGR)
    out_video.write(cv2.cvtColor(output_frame,cv2.COLOR_RGB2BGR))

    if frame_idx % 50 == 0:
      show_image(output_frame)

    frame_idx += 1
    pbar.update(1)

out_video.release()
pose_tracker.close()

if output_frame is not None:
  show_image(output_frame)


In [None]:
files.download(out_video_path)

# 实时动作检测

In [None]:
#导入模型
pose = mp_pose.Pose(smooth_landmarks=True,       #是否平滑关键点
                    enable_segmentation=True,    #是否人体抠图
                    min_detection_confidence=0.5,#置信度阈值
                    min_tracking_confidence=0.5) #追踪阈值

#处理单帧的函数
def process_frame(img):

    #记录该帧开始处理的时间
    start_time = time.time()

    #获取图像宽高
    h,w = img.shape[0],img.shape[1]

    #BGR转RGB
    img_RGB = cv2.cvtColor(img,cv2.COLOR_BGR2RGB)

    #将RGB图像输入模型,获取预测结果
    results = pose.process(img_RGB)

    #若检测出人体关键点
    if results.pose_landmarks:

        #可视化关键点及骨架连线
        mp_drawing.draw_landmarks(img,results.pose_landmarks,mp_pose.POSE_CONNECTIONS)

        #遍历所有33个关键点,可视化
        for i in range(33):

            #获取该关键点的三维坐标
            cx = int(results.pose_landmarks.landmark[i].x * w)
            cy = int(results.pose_landmarks.landmark[i].y * h)
            cz = results.pose_landmarks.landmark[i].z

            radius = 10

            #鼻尖
            if i == 0:
                img = cv2.circle(img,(cx,cy),radius,(0,0,255),-1)
            
            #肩膀
            elif i in [11,12]:
                img = cv2.circle(img,(cx,cy),radius,(223,155,6),-1)
            #髋关节
            elif i in [23,24]:
                img = cv2.circle(img,(cx,cy),radius,(1,240,255),-1)
            #胳臂肘
            elif i in [13,14]:
                img = cv2.circle(img,(cx,cy),radius,(140,47,240),-1)
            #膝盖
            elif i in [25,26]:
                img = cv2.circle(img,(cx,cy),radius,(0,0,255),-1)
            #手腕和脚婉
            elif i in [15,16,27,28]:
                img = cv2.circle(img,(cx,cy),radius,(223,155,60),-1)
            #左手
            elif i in [17,19,21]:
                img = cv2.circle(img,(cx,cy),radius,(94,218,121),-1)
            #右手
            elif i in [18,20,22]:
                img = cv2.circle(img,(cx,cy),radius,(16,144,247),-1)
            #左脚
            elif i in [27,29,31]:
                img = cv2.circle(img,(cx,cy),radius,(29,123,243),-1)
            #右脚
            elif i in [28,30,32]:
                img = cv2.circle(img,(cx,cy),radius,(193,182,255),-1)
            #嘴
            elif i in [9,10]:
                img = cv2.circle(img,(cx,cy),radius,(205,235,255),-1)
            #眼及脸颊
            elif i in [1,2,3,4,5,6,7,8]:
                img = cv2.circle(img,(cx,cy),radius,(94,218,121),-1)
            #其他关键点
            else:
                img = cv2.circle(img,(cx,cy),radius,(0,255,0),-1)
    else:
        scaler = 1
        failure_str = 'No Person'
        img = cv2.putText(img,failure_str,(25*scaler,100*scaler),cv2.FONT_HERSHEY_SIMPLEX,1.25*scaler,(255,0,0),3*scaler)
    
    #记录该帧处理完毕的时间
    end_time = time.time()

    #计算每秒处理图像帧数FPS
    FPS = 1/(end_time - start_time)

    scaler = 1

    #在图像上写FPS数值,参数依次为:图片,添加的文字,左上角坐标,字体,字体大小,颜色,字体粗细
    img = cv2.putText(img,'FPS '+str(int(FPS)),(25*scaler,50*scaler),cv2.FONT_HERSHEY_SIMPLEX,1.25*scaler,(255,0,0),3*scaler)
    return img

#获取摄像头,传入0表示获取系统默认摄像头
cap = cv2.VideoCapture(0)

#打开cap
cap.open(0)

video_n_frames = cap.get(cv2.CAP_PROP_FRAME_COUNT)

sport_type = "squat"
class_name = 'down'

pose_samples_folder = 'C:/Users/xhy/Documents/python/check/csvs_out/squat'

pose_tracker = mp_pose.Pose()


pose_embedder = FullBodyPoseEmbedder(sport_type=sport_type)


pose_classifier = PoseClassifier(
    pose_samples_folder=pose_samples_folder,
    pose_embedder=pose_embedder,
    top_n_by_max_distance=30,
    top_n_by_mean_distance=10)

pose_classification_filter = EMADicSmoothing(
    window_size=10,
    alpha=0.2
)

repetition_counter = RepetitionCounter(
    class_name=class_name,
    enter_threshold=6,
    exit_threshold=4)


output_frame = None
    




for i in range(3, 0, -1):  
    print(f"倒计时: {i} 秒")  
    time.sleep(1)  
print("开始吧")  

#无限循环,直到break被触发
while cap.isOpened():
    #获取画面
    success,frame = cap.read()
    if not success:
        print('Error')
        break
    
    input_frame = frame
    result = pose_tracker.process(image=input_frame)
    pose_landmarks = result.pose_landmarks

    output_frame = input_frame.copy()
    if pose_landmarks is not None:
      mp_drawing.draw_landmarks(
          image=output_frame,
          landmark_list=pose_landmarks,
          connections=mp_pose.POSE_CONNECTIONS)

    if pose_landmarks is not None:
      frame_height,frame_width = output_frame.shape[0],output_frame.shape[1]
      pose_landmarks = np.array([[lmk.x * frame_width,lmk.y * frame_height,lmk.z * frame_width]
                    for lmk in pose_landmarks.landmark],dtype=np.float32)
      assert pose_landmarks.shape == (33,3),'Unexpected landmarks shape: {}'.format(pose_landmarks.shape)

      pose_classification = pose_classifier(pose_landmarks)
      pose_classification_filtered = pose_classification_filter(pose_classification)
      print(pose_classification_filtered)
      repetitions_count = repetition_counter(pose_classification_filtered)
    else:
      pose_classification = None

      pose_classification_filtered = pose_classification_filter(dict())
      pose_classification_filtered = None

      repetitions_count = repetition_counter._n_repeats


    frame = np.array(output_frame) 

    frame = cv2.putText(frame,str(repetitions_count),(25,50),cv2.FONT_HERSHEY_SIMPLEX,1.25,(255,0,0),3)

    #展示处理后的三通道图片
    cv2.imshow('my_window',frame)

    #按键盘上的q或esc退出(在英文输入法下)
    if cv2.waitKey(1) in [ord('q'),27]:
        break

#关闭摄像头
cap.release()

#关闭图像窗口
cv2.destroyAllWindows()

# 调用星火大模型API

In [1]:
# coding: utf-8
import _thread as thread
import os
import time
import base64

import base64
import datetime
import hashlib
import hmac
import json
from urllib.parse import urlparse
import ssl
from datetime import datetime
from time import mktime
from urllib.parse import urlencode
from wsgiref.handlers import format_date_time

import websocket
import openpyxl
from concurrent.futures import ThreadPoolExecutor, as_completed
import os


class Ws_Param(object):
    # 初始化
    def __init__(self, APPID, APIKey, APISecret, gpt_url):
        self.APPID = APPID
        self.APIKey = APIKey
        self.APISecret = APISecret
        self.host = urlparse(gpt_url).netloc
        self.path = urlparse(gpt_url).path
        self.gpt_url = gpt_url

    # 生成url
    def create_url(self):
        # 生成RFC1123格式的时间戳
        now = datetime.now()
        date = format_date_time(mktime(now.timetuple()))

        # 拼接字符串
        signature_origin = "host: " + self.host + "\n"
        signature_origin += "date: " + date + "\n"
        signature_origin += "GET " + self.path + " HTTP/1.1"

        # 进行hmac-sha256进行加密
        signature_sha = hmac.new(self.APISecret.encode('utf-8'), signature_origin.encode('utf-8'),
                                 digestmod=hashlib.sha256).digest()

        signature_sha_base64 = base64.b64encode(signature_sha).decode(encoding='utf-8')

        authorization_origin = f'api_key="{self.APIKey}", algorithm="hmac-sha256", headers="host date request-line", signature="{signature_sha_base64}"'

        authorization = base64.b64encode(authorization_origin.encode('utf-8')).decode(encoding='utf-8')

        # 将请求的鉴权参数组合为字典
        v = {
            "authorization": authorization,
            "date": date,
            "host": self.host
        }
        # 拼接鉴权参数，生成url
        url = self.gpt_url + '?' + urlencode(v)
        # 此处打印出建立连接时候的url,参考本demo的时候可取消上方打印的注释，比对相同参数时生成的url与自己代码生成的url是否一致
        return url


# 收到websocket错误的处理
def on_error(ws, error):
    print("### error:", error)


# 收到websocket关闭的处理
def on_close(ws):
    print("### closed ###")


# 收到websocket连接建立的处理
def on_open(ws):
    thread.start_new_thread(run, (ws,))


def run(ws, *args):
    data = json.dumps(gen_params(appid=ws.appid, query=ws.query, domain=ws.domain))
    ws.send(data)


# 收到websocket消息的处理
def on_message(ws, message):
    # print(message)
    data = json.loads(message)
    code = data['header']['code']
    if code != 0:
        print(f'请求错误: {code}, {data}')
        ws.close()
    else:
        choices = data["payload"]["choices"]
        status = choices["status"]
        content = choices["text"][0]["content"]
        print(content,end='')
        if status == 2:
            print("#### 关闭会话")
            ws.close()


def gen_params(appid, query, domain):
    """
    通过appid和用户的提问来生成请参数
    """

    data = {
        "header": {
            "app_id": appid,
            "uid": "1234",           
            # "patch_id": []    #接入微调模型，对应服务发布后的resourceid          
        },
        "parameter": {
            "chat": {
                "domain": domain,
                "temperature": 0.5,
                "max_tokens": 4096,
                "auditing": "default",
            }
        },
        "payload": {
            "message": {
                "text": [
                    {"role":"system","content":"你现在扮演的角色为运动健身教练。"},
                    {"role": "user", "content": query}
                    ]
            }
        }
    }
    return data


def main(appid, api_secret, api_key, Spark_url, domain, query):
    wsParam = Ws_Param(appid, api_key, api_secret, Spark_url)
    websocket.enableTrace(False)
    wsUrl = wsParam.create_url()

    ws = websocket.WebSocketApp(wsUrl, on_message=on_message, on_error=on_error, on_close=on_close, on_open=on_open)
    ws.appid = appid
    ws.query = query
    ws.domain = domain
    ws.run_forever(sslopt={"cert_reqs": ssl.CERT_NONE})


if __name__ == "__main__":
    main(
        appid="c7fb7b72",
        api_secret="MTlhOTRmNDM2MDViYjFiZmIwYmM4NTk2",
        api_key="7427defc0eb1d9c0cd24e74479f512c3",
		Spark_url = "wss://spark-api.xf-yun.com/v4.0/chat",
		domain = "4.0Ultra",
        query=""
    )


作为一个认知智能模型，我无法直接处理视频文件。但是，我可以提供一些关于如何编辑和处理视频的建议：

1. 使用视频编辑软件：有许多视频编辑软件可以帮助您剪辑、合并、添加特效等。例如，Adobe Premiere Pro、Final Cut Pro、iMovie等。

2. 学习视频编辑技巧：观看在线教程或参加课程，学习如何使用视频编辑软件的基本功能和高级技巧。这将帮助您更好地处理和编辑视频。

3. 了解视频格式和编码：了解不同的视频格式和编码方式，以便在编辑过程中做出正确的选择。这将有助于确保您的视频在不同设备和平台上都能正常播放。

4. 优化视频质量：在编辑过程中，注意调整视频的亮度、对比度、饱和度等参数，以提高视频质量。此外，还可以使用稳定器等工具减少抖动，使视频更加稳定。

5. 添加音频和字幕：为视频添加背景音乐、音效和字幕，以增强观感和传达信息。确保音频和字幕与视频内容同步，以便观众更好地理解。

6. 导出和分享：在完成编辑后，将视频导出为合适的格式和分辨率。然后，您可以将视频分享到社交媒体、视频网站或其他平台，让更多人欣赏您的作品。#### 关闭会话
### error: on_close() takes 1 positional argument but 3 were given
