In [2]:
import torch.nn as nn
import os 
import numpy as np

In [3]:
def read_sensor_data(file_path):
    with open(file_path, 'r') as f:
        data = f.readlines()

    angular_velocity = []  # 存储角速度数据
    acceleration = []  # 存储加速度数据
    pressure = []  # 存储气压数据

    current_type = None
    for line in data:
        line = line.strip()

        # 跳过时间戳行和空行
        if line.startswith("Time:") or not line:
            continue

        if line.startswith("DataType:"):
            current_type = line.split(":")[1].strip()  # 获取数据类型
        else:
            try:
                if current_type == "angular velocity":
                    angular_velocity.append(list(map(float, line.split(','))))  # 角速度数据
                elif current_type == "acceleration":
                    acceleration.append(list(map(float, line.split(','))))  # 加速度数据
                elif current_type == "pressure":
                    pressure.append(float(line.strip()))  # 气压数据
            except ValueError:
                continue

    return np.array(angular_velocity), np.array(acceleration), np.array(pressure)

In [4]:
def resample_data(sensor_data, current_length, target_length):
    time_original = np.linspace(0, current_length / 100, current_length)  # 假设原始数据的时间步长是100Hz
    time_target = np.linspace(0, current_length / 100, target_length)  # 目标时间步数
    interpolator = interp1d(time_original, sensor_data, kind='linear', axis=0, fill_value="extrapolate")
    return interpolator(time_target)


# 数据标准化函数
def standardize_data(X_data):
    # 对每个样本的每个特征进行标准化
    scaler = StandardScaler()
    # 将每个特征按列进行标准化
    X_data = np.array([scaler.fit_transform(sample) for sample in X_data])
    return X_data


# 数据加载函数
def load_data(base_folder):
    categories = ['Normal', 'Dropping', 'Kicking', 'Throwing']
    #categories = ['Normal', 'Dropping', 'Kicking', 'Throwing', 'Kicking_rolling', 'Kicking_hitting', 'Throwing_hitting']
    label_dict = {category: idx for idx, category in enumerate(categories)}
    X_data_combined, y_data = [], []

    for category in categories:
        category_path = os.path.join(base_folder, category)
        label = label_dict[category]

        for root, _, files in os.walk(category_path):
            for file in files:
                if file.endswith('.txt'):
                    file_path = os.path.join(root, file)
                    print(f"Processing {file_path}...")

                    # 读取传感器数据
                    angular_velocity, acceleration, pressure = read_sensor_data(file_path)

                    # 重采样角速度数据，使其与气压数据的长度一致
                    angular_velocity_resampled = resample_data(angular_velocity, len(angular_velocity), len(pressure))

                    # 将气压数据转化为二维数组，形状为 (len(pressure), 1)
                    pressure_resampled = np.expand_dims(pressure, axis=1)

                    # 合并角速度数据和气压数据
                    combined_angular_velocity_pressure = np.concatenate(
                        [angular_velocity_resampled, pressure_resampled], axis=1)

                    # 重采样加速度数据，使其与角速度+气压数据的长度一致
                    acceleration_resampled = resample_data(acceleration, len(acceleration),
                                                           len(angular_velocity_resampled))

                    # 合并加速度数据和角速度+气压数据
                    combined_6axis_pressure_data = np.concatenate(
                        [acceleration_resampled, combined_angular_velocity_pressure], axis=1)

                    # 添加合并后的六轴和气压数据与标签
                    X_data_combined.append(combined_6axis_pressure_data)
                    y_data.append(label)

    # 转换为 NumPy 数组
    X_data_combined = np.array(X_data_combined, dtype=np.float32)
    y_data = np.array(y_data, dtype=np.int32)

    # 转换标签为 one-hot 编码
    y_data = to_categorical(y_data, num_classes=len(categories))

    print(f"Combined 6-axis and pressure data shape: {X_data_combined.shape}")
    print(f"Labels shape: {y_data.shape}")

    return X_data_combined, y_data