# 动作捕捉并储存

In [None]:
"""
基于阈值的方法：
你可以定义一些阈值条件，例如当手部关键点的移动速度低于某个阈值时，认为动作已经结束。
这种方法的优点是可以更准确地划分动作，但缺点是可能会受到噪声和其他干扰的影响。
"""

import cv2
import mediapipe as mp
import pyrealsense2 as rs
import pandas as pd
import numpy as np
import os
import time
import warnings
from datetime import datetime

warnings.filterwarnings('ignore')

def get_current_time():
    return datetime.now().strftime('%Y%m%d_%H%M%S')

def is_start_gesture(hand_landmarks):
    # 拇指尖的坐标
    thumb_tip = [hand_landmarks.landmark[mp_hands.HandLandmark.THUMB_TIP].x,
                 hand_landmarks.landmark[mp_hands.HandLandmark.THUMB_TIP].y,
                 hand_landmarks.landmark[mp_hands.HandLandmark.THUMB_TIP].z]

    # 食指尖的坐标
    index_finger_tip = [hand_landmarks.landmark[mp_hands.HandLandmark.INDEX_FINGER_TIP].x,
                        hand_landmarks.landmark[mp_hands.HandLandmark.INDEX_FINGER_TIP].y,
                        hand_landmarks.landmark[mp_hands.HandLandmark.INDEX_FINGER_TIP].z]

    # 计算拇指尖和食指尖之间的欧氏距离
    distance = np.sqrt(np.sum(np.square(np.subtract(thumb_tip, index_finger_tip))))

    # 如果距离小于一定的阈值，认为是"开始"手势
    return distance < 0.015

def is_end_gesture(hand_landmarks):
    # 拇指尖的坐标
    thumb_tip = [hand_landmarks.landmark[mp_hands.HandLandmark.THUMB_TIP].x,
                 hand_landmarks.landmark[mp_hands.HandLandmark.THUMB_TIP].y,
                 hand_landmarks.landmark[mp_hands.HandLandmark.THUMB_TIP].z]

    # 小指尖的坐标
    pinky_finger_tip = [hand_landmarks.landmark[mp_hands.HandLandmark.PINKY_TIP].x,
                        hand_landmarks.landmark[mp_hands.HandLandmark.PINKY_TIP].y,
                        hand_landmarks.landmark[mp_hands.HandLandmark.PINKY_TIP].z]

    # 计算拇指尖和小指尖之间的欧氏距离
    distance = np.sqrt(np.sum(np.square(np.subtract(thumb_tip, pinky_finger_tip))))

    # 如果距离小于一定的阈值，认为是"结束"手势
    return distance < 0.015

# 初始化 MediaPipe 和 RealSense
mp_drawing = mp.solutions.drawing_utils
mp_hands = mp.solutions.hands
pipeline = rs.pipeline()
config = rs.config()
config.enable_stream(rs.stream.depth, 640, 480, rs.format.z16, 30)
config.enable_stream(rs.stream.color, 640, 480, rs.format.bgr8, 30)
pipeline.start(config)

# 创建文件夹
if not os.path.exists('vid'):
    os.makedirs('vid')
if not os.path.exists('excel'):
    os.makedirs('excel')

# 初始化视频文件和 DataFrame
out = None
df_list = []
recording = False
start_detected = False


# 捕获和处理视频帧
with mp_hands.Hands(min_detection_confidence=0.5, min_tracking_confidence=0.5) as hands:
    while True:
        frames = pipeline.wait_for_frames()
        color_frame = frames.get_color_frame()
        if not color_frame:
            continue
        image = np.asanyarray(color_frame.get_data())

        # 将图像转为RGB色彩空间，进行姿势估计
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        results = hands.process(image)

        # 将图像转回BGR色彩空间，进行绘制
        image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
        if results.multi_hand_landmarks:
            for hand_landmarks in results.multi_hand_landmarks:
                mp_drawing.draw_landmarks(image, hand_landmarks, mp_hands.HAND_CONNECTIONS)

                # 判断是否是开始手势
                if is_start_gesture(hand_landmarks):
                    if not recording and not start_detected:
                        print('Start gesture detected.')
                        out = cv2.VideoWriter('vid/{}.avi'.format(get_current_time()), cv2.VideoWriter_fourcc(*'XVID'), 20.0, (640, 480))
                        df_list = []
                        recording = True
                        start_detected = True

                # 判断是否是结束手势
                elif is_end_gesture(hand_landmarks):
                    if recording and start_detected:
                        print('End gesture detected.')
                        out.release()
                        df = pd.DataFrame(df_list)
                        df.to_csv('excel/{}.csv'.format(get_current_time()), index=False)
                        recording = False
                        start_detected = False

                # 提取关键点值并保存
                if recording:
                    hand_data = []
                    for lm in hand_landmarks.landmark:
                        hand_data.append([lm.x, lm.y, lm.z])
                    df_list.append(hand_data)

        # 将帧写入视频文件
        if recording:
            out.write(image)

        # 显示图像
        cv2.imshow('MediaPipe Hands', image)

        # 如果按下ESC键，退出循环
        if cv2.waitKey(5) & 0xFF == 27:
            break

# 如果在退出循环时仍在录制，保存当前的视频和 DataFrame
if recording:
    # 如果录制超过0.5秒，才保存
    if len(df_list) > 15: # 假设帧率为30fps，那么0.5秒会有15帧
        out.release()
        df = pd.DataFrame(df_list)
        df.to_csv('excel/{}.csv'.format(get_current_time()), index=False)

# 释放资源
pipeline.stop()
cv2.destroyAllWindows()


# 分类信息读取

In [None]:
import os
import subprocess

print("检查 'vid' 文件夹内容：")
print(os.listdir('vid'))

def rename_file(original_name, append_string):
    # 定义视频和csv文件的路径
    video_path = os.path.join('vid', original_name)
    csv_path = os.path.join('excel', original_name.split('.')[0] + ".csv")

    # 修改视频名
    if os.path.exists(video_path):
        video_new_name = original_name.split('.')[0] + append_string + "." + original_name.split('.')[1]
        video_new_path = os.path.join('vid', video_new_name)
        os.rename(video_path, video_new_path)

    # 修改csv文件名
    if os.path.exists(csv_path):
        csv_new_name = original_name.split('.')[0] + append_string + ".csv"
        csv_new_path = os.path.join('excel', csv_new_name)
        os.rename(csv_path, csv_new_path)

def delete_file(file_name):
    video_path = os.path.join('vid', file_name)
    csv_path = os.path.join('excel', file_name.split('.')[0] + ".csv")

    if os.path.exists(video_path):
        os.remove(video_path)
        print(f"视频 '{file_name}' 已删除.")

    if os.path.exists(csv_path):
        os.remove(csv_path)
        print(f"CSV 文件 '{file_name.split('.')[0]}.csv' 已删除.")

def play_video_with_default_player(video_path):
    if os.name == 'nt':  # for Windows
        os.startfile(video_path)
    else:
        opener = "open" if sys.platform == "darwin" else "xdg-open"  # for macOS and Linux
        subprocess.call([opener, video_path])

def main():
    print("开始执行 main 函数...")
    for video_name in os.listdir('vid'):
        if "_edited" not in video_name:  # 更改筛选条件
            video_path = os.path.join('vid', video_name)
            play_video_with_default_player(video_path)

            print(f"视频 {video_name} 预览完毕!")
            option = input("请选择操作：\n1. 重命名文件\n2. 删除文件\n输入选项（1或2）：")

            if option == '1':
                append_string = "_edited"  # 添加特定后缀表示文件已编辑
                rename_file(video_name, append_string)
            elif option == '2':
                delete_file(video_name)
            else:
                print("无效的选项!")
    print("结束 main 函数执行。")

if __name__ == "__main__":
    main()


In [None]:
import os
def sync_excel_to_vid():
    # 获取vid和excel目录下的所有文件
    vid_files = os.listdir('vid')
    excel_files = os.listdir('excel')
    
    for excel_file in excel_files:
        # 去掉扩展名的文件名
        base_name = os.path.splitext(excel_file)[0]
        
        # 检查是否有相对应的视频文件
        matched_video_files = [v for v in vid_files if base_name in v]
        
        # 如果找到了匹配的视频文件
        if matched_video_files:
            # 确保只有一个匹配的视频文件
            if len(matched_video_files) == 1:
                matched_video_file = matched_video_files[0]
                new_excel_name = os.path.splitext(matched_video_file)[0] + '.csv'
                
                # 重命名excel文件
                os.rename(os.path.join('excel', excel_file), os.path.join('excel', new_excel_name))
            else:
                print(f"在'vid'目录中找到了多个与 '{base_name}' 匹配的文件，无法确定要使用哪一个。")
        # 如果在vid文件夹中找不到匹配的文件
        else:
            os.remove(os.path.join('excel', excel_file))
            print(f"文件 '{excel_file}' 已从 'excel' 文件夹中删除。")
sync_excel_to_vid()

# 识别模型搭建

## 数据预处理

In [23]:
import os
import ast
import torch
import numpy as np
import pandas as pd
import torch.nn as nn
import torch.optim as optim
from sklearn.model_selection import train_test_split
from torch.utils.data import TensorDataset, DataLoader
os.environ['KMP_DUPLICATE_LIB_OK'] = 'True'


# ---------------------------------- 数据预处理 ----------------------------------

# functions for data preprocessing
def get_label_from_filename(filename):
    if 'drinkWater' in filename:
        return 'drinkWater'
    elif 'reachOut' in filename:
        return 'reachOut'
    elif 'getPhone' in filename:
        return 'getPhone'
    else:
        return None

def parse_entry(entry):
    return np.array(ast.literal_eval(entry))

# 将字符串转换为np.array并计算差分
def compute_difference(df):
    df = df[0].apply(lambda x: np.array(ast.literal_eval(x)))
    diff = df.diff().dropna()
    return diff

# 基于差分结果，计算每一行的模
def compute_magnitude(diff):
    magnitude = diff.apply(lambda x: np.linalg.norm(x))
    return magnitude

# 根据平均序列长度设置一个阈值，并标记显著性差异
def significant_changes(magnitude, average_length):
    threshold = 1 / average_length
    significant = magnitude > threshold
    return significant

# 根据显著的差异来选择关键帧
def get_keyframes(df, target_length):
    diff = compute_difference(df)
    magnitude = compute_magnitude(diff)
    significant = significant_changes(magnitude, target_length)
    
    return df.iloc[significant.nlargest(target_length).index]

# 加载**训练**数据并进行预处理
def preprocess_data(directory):
    # Preprocess the data
    file_paths = [os.path.join(directory, file) for file in os.listdir(directory) if file.endswith('.csv')]

    # Read the data and get each file's length
    lengths = []
    for file in file_paths:
        df = pd.read_csv(file, header=None)
        lengths.append(len(df))
    average_length = int(np.mean(lengths))

    processed_data = []
    processed_labels = []

    for file in file_paths:
        df = pd.read_csv(file, header=None)
        current_length = len(df)

        if current_length > average_length:
            df = get_keyframes(df, average_length)  # Assuming get_keyframes is a function you've defined elsewhere
        processed_data.append(df)
        
        # Get labels
        label = os.path.basename(file).split('.')[0]
        label = label.split("_")[-1]
        processed_labels.append(label)


    for idx, df in enumerate(processed_data):
        # Convert df to a nested list
        df_list = df.values.tolist()
        new_df_list = []  # Will contain the filtered rows

        for j in range(len(df_list)):
            delete_row = False  # flag to decide whether to delete the row or not

            for k in range(len(df_list[j])):
                value = df_list[j][k]
                if isinstance(value, str) and value.isdigit():  # if it's a string representation of an integer
                    delete_row = True
                    break  # exit the inner loop early
                else:
                    df_list[j][k] = ast.literal_eval(value)  # conversion as before

            if not delete_row:  # if the flag is still False, keep the row
                new_df_list.append(df_list[j])

        # Pad the data which is shorter than the average length
        while len(new_df_list) < average_length:
            new_df_list.append(new_df_list[-1])

        processed_data[idx] = new_df_list

    return processed_data, processed_labels, average_length

# 加载**测试**数据并进行预处理
def preprocess_data_test(directory, keyframe):
    # Preprocess the data
    file_paths = [os.path.join(directory, file) for file in os.listdir(directory) if file.endswith('.csv')]

    # Read the data and get each file's length
    for file in file_paths:
        df = pd.read_csv(file, header=None)

    processed_data = []
    processed_labels = []

    for file in file_paths:
        df = pd.read_csv(file, header=None)
        current_length = len(df)

        if current_length > keyframe:
            df = get_keyframes(df, keyframe)  # Assuming get_keyframes is a function you've defined elsewhere
        processed_data.append(df)
        
        # Get labels
        label = os.path.basename(file).split('.')[0]
        label = label.split("_")[-1]
        processed_labels.append(label)


    for idx, df in enumerate(processed_data):
        # Convert df to a nested list
        df_list = df.values.tolist()
        new_df_list = []  # Will contain the filtered rows

        for j in range(len(df_list)):
            delete_row = False  # flag to decide whether to delete the row or not

            for k in range(len(df_list[j])):
                value = df_list[j][k]
                if isinstance(value, str) and value.isdigit():  # if it's a string representation of an integer
                    delete_row = True
                    break  # exit the inner loop early
                else:
                    df_list[j][k] = ast.literal_eval(value)  # conversion as before

            if not delete_row:  # if the flag is still False, keep the row
                new_df_list.append(df_list[j])

        # Pad the data which is shorter than the average length
        while len(new_df_list) < keyframe:
            new_df_list.append(new_df_list[-1])

        processed_data[idx] = new_df_list


    return processed_data, processed_labels

# ---------------------------------- 数据增强 ----------------------------------

# 加噪声
def add_noise(points, sigma=0.01):
    points_np = np.array(points)
    noise = np.random.normal(0, sigma, points_np.shape)
    return points_np + noise

# 放大缩小
def scale(points, scale_factor=None):
    points_np = np.array(points)
    if scale_factor is None:
        scale_factor = np.random.uniform(0.9, 1.1)
    return points_np * scale_factor

# 旋转
def rotate(points, degree_range=10):
    points_np = np.array(points)
    
    if points_np.shape[-1] != 3:  # 只对三维数据执行旋转操作
        return points_np
    
    angle_x = np.radians(np.random.uniform(-degree_range, degree_range))
    angle_y = np.radians(np.random.uniform(-degree_range, degree_range))
    angle_z = np.radians(np.random.uniform(-degree_range, degree_range))
    
    rotation_matrix_x = np.array([
        [1, 0, 0],
        [0, np.cos(angle_x), -np.sin(angle_x)],
        [0, np.sin(angle_x), np.cos(angle_x)]
    ])
    
    rotation_matrix_y = np.array([
        [np.cos(angle_y), 0, np.sin(angle_y)],
        [0, 1, 0],
        [-np.sin(angle_y), 0, np.cos(angle_y)]
    ])
    
    rotation_matrix_z = np.array([
        [np.cos(angle_z), -np.sin(angle_z), 0],
        [np.sin(angle_z), np.cos(angle_z), 0],
        [0, 0, 1]
    ])
    
    rotation_matrix = np.dot(rotation_matrix_z, np.dot(rotation_matrix_y, rotation_matrix_x))
    return np.dot(points_np, rotation_matrix.T)

# 移动
def translate(points, max_translation=0.1):
    points_np = np.array(points)
    
    if points_np.shape[-1] != 3:  # 对非三维数据返回原始数据
        return points_np
    
    dx, dy, dz = np.random.uniform(-max_translation, max_translation, 3)
    return points_np + np.array([dx, dy, dz])

# 增强某个动作
def augment_single_action(action, times=5):
    """
    对单一动作数据进行多次增强。
    
    参数:
    - action: 原始的动作数据
    - times: 增强的次数
    
    返回值:
    - 一个增强后的动作数据列表
    """
    augmented_actions = [action]  # 包括原始数据
    
    for _ in range(times):
        augmented_action = []
        for keyframe in action:
            keyframe = add_noise(keyframe)
            keyframe = scale(keyframe)
            keyframe = rotate(keyframe)
            keyframe = translate(keyframe)
            augmented_action.append(keyframe)
        augmented_actions.append(augmented_action)
    
    return augmented_actions

# 增强数据集
def augment_data_and_labels(data, labels, times=5):
    """
    对整个数据集和标签进行多次增强。
    
    参数:
    - data: 原始的动作数据列表
    - labels: 对应的标签列表
    - times: 每个动作增强的次数
    
    返回值:
    - 增强后的数据和标签列表
    """
    augmented_data = []
    augmented_labels = []

    for action, label in zip(data, labels):
        new_actions = augment_single_action(action, times)
        augmented_data.extend(new_actions)
        augmented_labels.extend([label] * len(new_actions))

    return augmented_data, augmented_labels


# ---------------------------- 模型定义及训练工具函数 ----------------------------

# 定义LSTM模型：LSTM架构的浅层RNN
class ActionClassifier(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim, num_layers):
        super(ActionClassifier, self).__init__()
        self.lstm = nn.LSTM(input_dim, hidden_dim, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_dim, output_dim)
    
    def forward(self, x):
        out, _ = self.lstm(x)
        out = self.fc(out[:, -1, :])
        return out

# 检测输入数据的结构是否符合预设
def check_shape(data, desired_shape):
    # 每个动作的关键帧长度是否相同
    for i, action in enumerate(data):
        if len(action) != desired_shape[0]:
            print(f"Action at index {i} has {len(action)} keyframes instead of {desired_shape[0]}.")

        # 每个关键帧的关键点数量是否相同
        for j, keyframe in enumerate(action):
            if len(keyframe) != desired_shape[1]:
                print(f"Keyframe {j} in action at index {i} has {len(keyframe)} keypoints instead of {desired_shape[1]}.")

            # 每个关键点的数据输入是否是三维的
            for k, keypoint in enumerate(keyframe):
                        try:
                            if len(keypoint) != desired_shape[2]:
                                print(f"Keypoint {k} in keyframe {j} of action at index {i} has a shape of {len(keypoint)} instead of {desired_shape[2]}.")
                        except:
                            print(f"Keypoint {k} in keyframe {j} of action at index {i} is {keyframe} instead of list of length {desired_shape[2]}.")


# ----------------------------- 模型训练与评估函数 ------------------------------

# 检测错误分类的结果
def get_wrongly_classified_info(outputs, labels):
    _, predicted = torch.max(outputs.data, 1)
    wrong_indices = (predicted != labels).nonzero(as_tuple=True)[0]
    wrong_predictions = predicted[wrong_indices]
    return wrong_indices.tolist(), wrong_predictions.tolist()

# 分割训练集和测试集
def split_data_for_training(data, labels, test_size=0.1):
    train_data, val_data, train_labels, val_labels = train_test_split(data, labels, test_size=test_size)
    return train_data, val_data, train_labels, val_labels

def train_model(model, criterion, optimizer, train_loader, val_loader, num_epochs, device, early_stop_patience=10):
    best_val_loss = float('inf')
    patience_counter = 0
    best_model = None

    for epoch in range(num_epochs):
        # Training phase
        model.train()
        train_loss = 0.0

        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            train_loss += loss.item() * inputs.size(0)

        train_loss /= len(train_loader.dataset)

        # Validation phase
        model.eval()
        val_loss = 0.0

        with torch.no_grad():
            for inputs, labels in val_loader:
                inputs, labels = inputs.to(device), labels.to(device)
                outputs = model(inputs)
                loss = criterion(outputs, labels)
                val_loss += loss.item() * inputs.size(0)

        val_loss /= len(val_loader.dataset)

        print(f"Epoch {epoch}/{num_epochs - 1}, Train Loss: {train_loss:.4f}, Validation Loss: {val_loss:.4f}")

        # Check for early stopping
        if val_loss < best_val_loss:
            best_val_loss = val_loss
            best_model = model.state_dict()
            patience_counter = 0
        else:
            patience_counter += 1
            if patience_counter == early_stop_patience:
                print("Early stopping triggered.")
                break

    # Load best model weights
    model.load_state_dict(best_model)
    return model



# body part code
# 1. data preprocessing
train_directory = "excel"
data, labels, keyframe = preprocess_data(train_directory)

# data augmentation
augmented_data, augmented_labels = augment_data_and_labels(data, labels, times=5)

# ckeck shape of input training
desired_shape = (keyframe, 21, 3)
check_shape(data,desired_shape)

# 数据处理：转换为 [batch, seq_len, input_size] 的格式
data = [[[coord for keypoint in frame for coord in keypoint] for frame in action] for action in data]

# 创建label到整数的映射
label_to_int = {label: idx for idx, label in enumerate(set(labels))}
int_to_label = {idx: label for label, idx in label_to_int.items()}

# 打印编码情况
print(label_to_int)

# 将字符串标签编码为整数
encoded_labels = [label_to_int[label] for label in labels]

# 将嵌套的列表结构转换为torch tensor
data_tensor = torch.tensor(data, dtype=torch.float32)
labels_tensor = torch.tensor(encoded_labels, dtype=torch.long)

dataset = TensorDataset(data_tensor, labels_tensor)
train_loader = DataLoader(dataset, batch_size=64, shuffle=True)


# 2. 定义LSTM模型
input_dim = 63  # 展平后的关键点维度
hidden_dim = 128
output_dim = len(label_to_int)
num_layers = 2

model = ActionClassifier(input_dim, hidden_dim, output_dim, num_layers)


# 3. 定义损失函数和优化器
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)


# 4. 训练模型，并记录训练误差, 同时也记录错分类的数据索引和预测值
num_epochs = 10
train_errors = []

wrongly_classified_train_info = []

for epoch in range(num_epochs):
    epoch_error = 0.0
    for i, (inputs, labels) in enumerate(train_loader):
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        epoch_error += loss.item()

        # Collect wrongly classified information
        wrong_indices, wrong_predictions = get_wrongly_classified_info(outputs, labels)
        wrongly_classified_train_info.extend(zip(wrong_indices, wrong_predictions))

        if (i + 1) % 10 == 0:
            print(f'Epoch [{epoch+1}/{num_epochs}], Step [{i+1}/{len(train_loader)}], Loss: {loss.item():.4f}')

    epoch_error /= len(train_loader)
    train_errors.append(epoch_error)
    print(f'Epoch [{epoch+1}/{num_epochs}], Average Loss: {epoch_error:.4f}')

# 打印错分类的训练数据信息
print("Error Information for Training samples:")
for idx, prediction in wrongly_classified_train_info:
    print(f"Index: {idx}, Original Label is: {labels[idx]} Predicted Label: {int_to_label[prediction]}")

with open('train_errors.txt', 'w') as f:
    for error in train_errors:
        f.write(f"{error}\n")
print("\n\n")


# 5. 测试函数
def evaluate_model(model, test_loader):
    correct = 0
    total = 0
    with torch.no_grad():
        for inputs, labels in test_loader:
            outputs = model(inputs)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    return 100 * correct / total


# 6. 为新的CSV文件测试模型
test_directory = 'data_test'
test_data, test_labels = preprocess_data_test(test_directory,keyframe)
encoded_test_labels = [label_to_int[label] for label in test_labels]
test_data = [[[coord for keypoint in frame for coord in keypoint] for frame in action] for action in test_data]

test_data_tensor = torch.tensor(test_data, dtype=torch.float32)
test_labels_tensor = torch.tensor(encoded_test_labels, dtype=torch.long)

test_dataset = TensorDataset(test_data_tensor, test_labels_tensor)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)

accuracy = evaluate_model(model, test_loader)

# Collect wrongly classified test data information
wrongly_classified_test_info = []
with torch.no_grad():
    for i, (inputs, labels) in enumerate(test_loader):
        outputs = model(inputs)
        wrong_indices, wrong_predictions = get_wrongly_classified_info(outputs, labels)
        wrongly_classified_test_info.extend(zip(wrong_indices, wrong_predictions))

# 打印错分类的测试数据信息
print("Error Information for Testing Samples:")
for idx, prediction in wrongly_classified_train_info:
    print(f"Index: {idx}, Original Label is: {test_labels[idx]} Predicted Label: {int_to_label[prediction]}")

for idx, prediction in wrongly_classified_test_info:
    print(f"Index: {idx}, Predicted Label: {int_to_label[prediction]}")

print(f'Accuracy on the test data: {accuracy:.2f}%')

{'drinkWater': 0, 'reachOut': 1, 'getPhone': 2}
Epoch [1/10], Average Loss: 1.0916
Epoch [2/10], Average Loss: 1.0828
Epoch [3/10], Average Loss: 1.0670
Epoch [4/10], Average Loss: 1.0537
Epoch [5/10], Average Loss: 1.0435
Epoch [6/10], Average Loss: 1.0291
Epoch [7/10], Average Loss: 1.0289
Epoch [8/10], Average Loss: 1.0132
Epoch [9/10], Average Loss: 1.0162
Epoch [10/10], Average Loss: 1.0126
Index: 1, Predicted Label: getPhone
Index: 3, Predicted Label: getPhone
Index: 4, Predicted Label: getPhone
Index: 5, Predicted Label: getPhone
Index: 6, Predicted Label: getPhone
Index: 7, Predicted Label: getPhone
Index: 8, Predicted Label: getPhone
Index: 10, Predicted Label: getPhone
Index: 11, Predicted Label: getPhone
Index: 12, Predicted Label: getPhone
Index: 13, Predicted Label: getPhone
Index: 0, Predicted Label: reachOut
Index: 1, Predicted Label: getPhone
Index: 2, Predicted Label: reachOut
Index: 4, Predicted Label: reachOut
Index: 5, Predicted Label: getPhone
Index: 6, Predicted L

* $data$: dataset of actions
* $data[i]$: $i^{th}$ action
* $data[i][j]$: $j^{th}$ keyframe of $i^{th}$ action
* $data[i][j][k]$: the $k^{th}$ key point's information for the $j^{th}$ keyframe of $i^{th}$ action

In [22]:
import pandas as pd
import numpy as np
import os
import ast
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset

# 定义LSTM模型
class ActionClassifier(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim, num_layers):
        super(ActionClassifier, self).__init__()
        self.lstm = nn.LSTM(input_dim, hidden_dim, num_layers, batch_first=True, dropout=0.5)
        self.fc = nn.Linear(hidden_dim, output_dim)

    def forward(self, x):
        out, _ = self.lstm(x)
        out = self.fc(out[:, -1, :])
        return out

# functions for data preprocessing
def get_label_from_filename(filename):
    if 'drinkWater' in filename:
        return 'drinkWater'
    elif 'reachOut' in filename:
        return 'reachOut'
    elif 'getPhone' in filename:
        return 'getPhone'
    else:
        return None

def parse_entry(entry):
    return np.array(ast.literal_eval(entry))

# 将字符串转换为np.array并计算差分
def compute_difference(df):
    df = df[0].apply(lambda x: np.array(ast.literal_eval(x)))
    diff = df.diff().dropna()
    return diff

# 基于差分结果，计算每一行的模
def compute_magnitude(diff):
    magnitude = diff.apply(lambda x: np.linalg.norm(x))
    return magnitude

# 根据平均序列长度设置一个阈值，并标记显著性差异
def significant_changes(magnitude, average_length):
    threshold = 1 / average_length
    significant = magnitude > threshold
    return significant

# 根据显著的差异来选择关键帧
def get_keyframes(df, target_length):
    diff = compute_difference(df)
    magnitude = compute_magnitude(diff)
    significant = significant_changes(magnitude, target_length)
    
    return df.iloc[significant.nlargest(target_length).index]

def preprocess_data(directory):
    # Preprocess the data
    file_paths = [os.path.join(directory, file) for file in os.listdir(directory) if file.endswith('.csv')]

    # Read the data and get each file's length
    lengths = []
    for file in file_paths:
        df = pd.read_csv(file, header=None)
        lengths.append(len(df))
    average_length = int(np.mean(lengths))

    processed_data = []
    processed_labels = []

    for file in file_paths:
        df = pd.read_csv(file, header=None)
        current_length = len(df)

        if current_length > average_length:
            df = get_keyframes(df, average_length)  # Assuming get_keyframes is a function you've defined elsewhere
        processed_data.append(df)
        
        # Get labels
        label = os.path.basename(file).split('.')[0]
        label = label.split("_")[-1]
        processed_labels.append(label)


    for idx, df in enumerate(processed_data):
        # Convert df to a nested list
        df_list = df.values.tolist()
        new_df_list = []  # Will contain the filtered rows

        for j in range(len(df_list)):
            delete_row = False  # flag to decide whether to delete the row or not

            for k in range(len(df_list[j])):
                value = df_list[j][k]
                if isinstance(value, str) and value.isdigit():  # if it's a string representation of an integer
                    delete_row = True
                    break  # exit the inner loop early
                else:
                    df_list[j][k] = ast.literal_eval(value)  # conversion as before

            if not delete_row:  # if the flag is still False, keep the row
                new_df_list.append(df_list[j])

        # Pad the data which is shorter than the average length
        while len(new_df_list) < average_length:
            new_df_list.append(new_df_list[-1])

        processed_data[idx] = new_df_list


    return processed_data, processed_labels, average_length


# functions for data augmentation
def add_noise(points, sigma=0.01):
    points_np = np.array(points)
    noise = np.random.normal(0, sigma, points_np.shape)
    return points_np + noise

def scale(points, scale_factor=None):
    points_np = np.array(points)
    if scale_factor is None:
        scale_factor = np.random.uniform(0.9, 1.1)
    return points_np * scale_factor

def rotate(points, degree_range=10):
    points_np = np.array(points)
    
    if points_np.shape[-1] != 3:  # 只对三维数据执行旋转操作
        return points_np
    
    angle_x = np.radians(np.random.uniform(-degree_range, degree_range))
    angle_y = np.radians(np.random.uniform(-degree_range, degree_range))
    angle_z = np.radians(np.random.uniform(-degree_range, degree_range))
    
    rotation_matrix_x = np.array([
        [1, 0, 0],
        [0, np.cos(angle_x), -np.sin(angle_x)],
        [0, np.sin(angle_x), np.cos(angle_x)]
    ])
    
    rotation_matrix_y = np.array([
        [np.cos(angle_y), 0, np.sin(angle_y)],
        [0, 1, 0],
        [-np.sin(angle_y), 0, np.cos(angle_y)]
    ])
    
    rotation_matrix_z = np.array([
        [np.cos(angle_z), -np.sin(angle_z), 0],
        [np.sin(angle_z), np.cos(angle_z), 0],
        [0, 0, 1]
    ])
    
    rotation_matrix = np.dot(rotation_matrix_z, np.dot(rotation_matrix_y, rotation_matrix_x))
    return np.dot(points_np, rotation_matrix.T)

def translate(points, max_translation=0.1):
    points_np = np.array(points)
    
    if points_np.shape[-1] != 3:  # 对非三维数据返回原始数据
        return points_np
    
    dx, dy, dz = np.random.uniform(-max_translation, max_translation, 3)
    return points_np + np.array([dx, dy, dz])

def augment_single_action(action, times=5):
    """
    对单一动作数据进行多次增强。
    
    参数:
    - action: 原始的动作数据
    - times: 增强的次数
    
    返回值:
    - 一个增强后的动作数据列表
    """
    augmented_actions = [action]  # 包括原始数据
    
    for _ in range(times):
        augmented_action = []
        for keyframe in action:
            keyframe = add_noise(keyframe)
            keyframe = scale(keyframe)
            keyframe = rotate(keyframe)
            keyframe = translate(keyframe)
            augmented_action.append(keyframe)
        augmented_actions.append(augmented_action)
    
    return augmented_actions

def augment_data_and_labels(data, labels, times=5):
    """
    对整个数据集和标签进行多次增强。
    
    参数:
    - data: 原始的动作数据列表
    - labels: 对应的标签列表
    - times: 每个动作增强的次数
    
    返回值:
    - 增强后的数据和标签列表
    """
    augmented_data = []
    augmented_labels = []

    for action, label in zip(data, labels):
        new_actions = augment_single_action(action, times)
        augmented_data.extend(new_actions)
        augmented_labels.extend([label] * len(new_actions))

    return augmented_data, augmented_labels


# check the input shape of training data
def check_shape(data,desired_shape):
    # 遍历每个动作
    for i, action in enumerate(data):
        # 检查动作的关键帧数量
        if len(action) != desired_shape[0]:
            print(f"Action at index {i} has {len(action)} keyframes instead of {desired_shape[0]}.")
        else:
            # 如果关键帧数量符合，则进一步检查每个关键帧的关键点数量
            for j, keyframe in enumerate(action):
                if len(keyframe) != desired_shape[1]:
                    print(f"Keyframe {j} in action at index {i} has {len(keyframe)} keypoints instead of {desired_shape[1]}.")
                else:
                    # 最后检查每个关键点的维度
                    for k, keypoint in enumerate(keyframe):
                        try:
                            if len(keypoint) != desired_shape[2]:
                                print(f"Keypoint {k} in keyframe {j} of action at index {i} has a shape of {len(keypoint)} instead of {desired_shape[2]}.")
                        except:
                            print(f"Keypoint {k} in keyframe {j} of action at index {i} is {keyframe} instead of list of length {desired_shape[2]}.")


# prepare data for testing
def preprocess_data_test(directory, keyframe):
    # Preprocess the data
    file_paths = [os.path.join(directory, file) for file in os.listdir(directory) if file.endswith('.csv')]

    # Read the data and get each file's length
    for file in file_paths:
        df = pd.read_csv(file, header=None)

    processed_data = []
    processed_labels = []

    for file in file_paths:
        df = pd.read_csv(file, header=None)
        current_length = len(df)

        if current_length > keyframe:
            df = get_keyframes(df, keyframe)  # Assuming get_keyframes is a function you've defined elsewhere
        processed_data.append(df)
        
        # Get labels
        label = os.path.basename(file).split('.')[0]
        label = label.split("_")[-1]
        processed_labels.append(label)


    for idx, df in enumerate(processed_data):
        # Convert df to a nested list
        df_list = df.values.tolist()
        new_df_list = []  # Will contain the filtered rows

        for j in range(len(df_list)):
            delete_row = False  # flag to decide whether to delete the row or not

            for k in range(len(df_list[j])):
                value = df_list[j][k]
                if isinstance(value, str) and value.isdigit():  # if it's a string representation of an integer
                    delete_row = True
                    break  # exit the inner loop early
                else:
                    df_list[j][k] = ast.literal_eval(value)  # conversion as before

            if not delete_row:  # if the flag is still False, keep the row
                new_df_list.append(df_list[j])

        # Pad the data which is shorter than the average length
        while len(new_df_list) < keyframe:
            new_df_list.append(new_df_list[-1])

        processed_data[idx] = new_df_list


    return processed_data, processed_labels


# body part code
# 1. data preprocessing
train_directory = "excel"
data, labels, keyframe = preprocess_data(train_directory)

# data augmentation
augmented_data, augmented_labels = augment_data_and_labels(data, labels, times=5)

# ckeck shape of input training
desired_shape = (keyframe, 21, 3)
check_shape(data,desired_shape)

# 数据处理：转换为 [batch, seq_len, input_size] 的格式
data = [[[coord for keypoint in frame for coord in keypoint] for frame in action] for action in data]

# 创建label到整数的映射
label_to_int = {label: idx for idx, label in enumerate(set(labels))}
int_to_label = {idx: label for label, idx in label_to_int.items()}

# 打印编码情况
print(label_to_int)

# 将字符串标签编码为整数
encoded_labels = [label_to_int[label] for label in labels]

# 将嵌套的列表结构转换为torch tensor
data_tensor = torch.tensor(data, dtype=torch.float32)
labels_tensor = torch.tensor(encoded_labels, dtype=torch.long)

dataset = TensorDataset(data_tensor, labels_tensor)
train_loader = DataLoader(dataset, batch_size=64, shuffle=True)


# 2. 定义LSTM模型
input_dim = 63  # 展平后的关键点维度
hidden_dim = 128
output_dim = len(label_to_int)
num_layers = 2

model = ActionClassifier(input_dim, hidden_dim, output_dim, num_layers)


# 3. 定义损失函数和优化器
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)


# 4. 训练模型，并记录训练误差
num_epochs = 10
train_errors = []

for epoch in range(num_epochs):
    epoch_error = 0.0
    for i, (inputs, labels) in enumerate(train_loader):
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        epoch_error += loss.item()

        if (i + 1) % 10 == 0:
            print(f'Epoch [{epoch+1}/{num_epochs}], Step [{i+1}/{len(train_loader)}], Loss: {loss.item():.4f}')

    epoch_error /= len(train_loader)
    train_errors.append(epoch_error)
    print(f'Epoch [{epoch+1}/{num_epochs}], Average Loss: {epoch_error:.4f}')

with open('train_errors.txt', 'w') as f:
    for error in train_errors:
        f.write(f"{error}\n")
print("Training complete.")


# 5. 测试函数
def evaluate_model(model, test_loader):
    correct = 0
    total = 0
    with torch.no_grad():
        for inputs, labels in test_loader:
            outputs = model(inputs)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    return 100 * correct / total

# 6. 为新的CSV文件测试模型
test_directory = 'data_test'
test_data, test_labels = preprocess_data_test(test_directory,keyframe)
encoded_test_labels = [label_to_int[label] for label in test_labels]
test_data = [[[coord for keypoint in frame for coord in keypoint] for frame in action] for action in test_data]

test_data_tensor = torch.tensor(test_data, dtype=torch.float32)
test_labels_tensor = torch.tensor(encoded_test_labels, dtype=torch.long)

test_dataset = TensorDataset(test_data_tensor, test_labels_tensor)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)

accuracy = evaluate_model(model, test_loader)
print(f'Accuracy on the test data: {accuracy:.2f}%')

{'drinkWater': 0, 'reachOut': 1, 'getPhone': 2}
Epoch [1/10], Average Loss: 1.0989
Epoch [2/10], Average Loss: 1.0858
Epoch [3/10], Average Loss: 1.0697
Epoch [4/10], Average Loss: 1.0591
Epoch [5/10], Average Loss: 1.0450
Epoch [6/10], Average Loss: 1.0305
Epoch [7/10], Average Loss: 1.0150
Epoch [8/10], Average Loss: 1.0121
Epoch [9/10], Average Loss: 1.0170
Epoch [10/10], Average Loss: 1.0097
Training complete.
Accuracy on the test data: 53.33%
