In [78]:
import os
import numpy as np
import pandas as pd
import json
from typing import List
from collections import defaultdict

class DataProcessor:
    def __init__(self, root_dir: str, output_dir: str, sequence_lengths: List[int], 
                 slide_ratio: float, pos_neg_ratio: int, pos_user_id: str, 
                 pos_csv_path: str, columns: List[str] = None):
        """
        初始化数据处理器

        Args:
            root_dir: 根目录路径
            output_dir: 输出目录路径
            sequence_lengths: 窗口长度列表（每个样本包含的数据点数量）
            slide_ratio: 滑动窗口的移动比例
            pos_neg_ratio: 正负样本比例
            pos_user_id: 正样本用户ID
            pos_csv_path: 正样本CSV文件路径
            columns: 需要提取的列名
        """
        self.root_dir = root_dir
        self.base_output_dir = output_dir
        self.sequence_lengths = sequence_lengths
        self.slide_ratio = slide_ratio
        self.pos_neg_ratio = pos_neg_ratio
        self.pos_user_id = pos_user_id
        self.pos_csv_path = pos_csv_path
        
        if columns is None:
            self.columns = ['x', 'y', 'velocity', 'acceleration', 'curvature',
                           'angle_change', 'x_velocity', 'y_velocity', 
                           'x_acceleration', 'y_acceleration', 'press_duration']
        else:
            self.columns = columns
        
        # 自动获取 total_sequence_length
        self.total_sequence_length = self._get_total_sequence_length()
        print(f"自动获取的 total_sequence_length: {self.total_sequence_length}")
        
        # 为每个窗口长度创建单独的输出目录
        self.output_dirs = {}
        for seq_len in self.sequence_lengths:
            dir_path = os.path.join(output_dir, f"processed_data_user{pos_user_id}")
            os.makedirs(dir_path, exist_ok=True)
            self.output_dirs[seq_len] = dir_path
        
        # 为每个窗口长度存储统计信息
        self.sample_stats = {seq_len: defaultdict(int) for seq_len in sequence_lengths}

    def _get_total_sequence_length(self) -> int:
        """
        自动获取 total_sequence_length，基于 pos_csv_path 的数据行数（不包括标题）

        Returns:
            int: 总序列长度
        """
        try:
            df = pd.read_csv(self.pos_csv_path)
            # 确保所有需要的列都存在
            missing_columns = [col for col in self.columns if col not in df.columns]
            if missing_columns:
                raise ValueError(f"正样本数据文件缺少以下列: {missing_columns}")
            total_length = len(df)
            return total_length
        except Exception as e:
            print(f"获取 total_sequence_length 时出错: {str(e)}")
            raise

    def load_user_data(self, user_id: str) -> pd.DataFrame:
        """
        加载指定用户的数据文件

        Args:
            user_id: 用户ID

        Returns:
            pd.DataFrame: 用户的数据
        """
        try:
            user_dir = os.path.join(self.root_dir, f"user{user_id}")
            
            if not os.path.exists(user_dir):
                raise FileNotFoundError(f"找不到用户 {user_id} 的目录: {user_dir}")
            
            csv_files = [f for f in os.listdir(user_dir) if f.endswith('.csv')]
            
            if not csv_files:
                raise FileNotFoundError(f"用户 {user_id} 的目录中没有CSV文件: {user_dir}")
            
            csv_file = sorted(csv_files)[-1]
            file_path = os.path.join(user_dir, csv_file)
            
            df = pd.read_csv(file_path)
            
            missing_columns = [col for col in self.columns if col not in df.columns]
            if missing_columns:
                raise ValueError(f"数据文件缺少以下列: {missing_columns}")
                
            if len(df) > self.total_sequence_length:
                df = df.iloc[:self.total_sequence_length]
                
            return df
            
        except Exception as e:
            print(f"加载用户 {user_id} 数据时出错: {str(e)}")
            raise

    def load_positive_data(self) -> pd.DataFrame:
        """
        加载正样本的数据文件（从指定的CSV路径）

        Returns:
            pd.DataFrame: 正样本的数据
        """
        try:
            if not os.path.exists(self.pos_csv_path):
                raise FileNotFoundError(f"正样本的CSV文件不存在: {self.pos_csv_path}")
            
            df = pd.read_csv(self.pos_csv_path)
            
            missing_columns = [col for col in self.columns if col not in df.columns]
            if missing_columns:
                raise ValueError(f"正样本数据文件缺少以下列: {missing_columns}")
                
            if len(df) > self.total_sequence_length:
                df = df.iloc[:self.total_sequence_length]
                
            return df
            
        except Exception as e:
            print(f"加载正样本数据时出错: {str(e)}")
            raise

    def create_sequences(self, df: pd.DataFrame, user_id: str, sequence_length: int) -> np.ndarray:
        """创建指定窗口长度的样本序列"""
        sequences = []
        data = df[self.columns].values
        
        slide_window = max(1, int(sequence_length * self.slide_ratio))
        
        max_start_idx = min(self.total_sequence_length - sequence_length, 
                          len(data) - sequence_length)
        
        for i in range(0, max_start_idx + 1, slide_window):
            sequence = data[i:i + sequence_length]
            if len(sequence) == sequence_length:
                sequences.append(sequence)
        
        sequences = np.array(sequences)
        self.sample_stats[sequence_length][f"user{user_id}"] = len(sequences)
        
        return sequences

    def save_samples(self, samples: np.ndarray, filename: str, sequence_length: int):
        """
        保存样本到JSON文件

        Args:
            samples: 形状为 (n_samples, sequence_length, n_features) 的样本数组
            filename: 文件名前缀
            sequence_length: 序列长度
        """
        if len(samples) == 0:
            print(f"警告: {filename} 没有样本可以保存")
            return
            
        filename_prefix = filename.split('_')[0]
        output_path = os.path.join(
            self.output_dirs[sequence_length], 
            f"{filename_prefix}_samples_user{self.pos_user_id}_{sequence_length}.json"
        )
        
        # 构建JSON数据结构
        data = {
            'metadata': {
                'shape': list(samples.shape),
                'feature_names': self.columns,
                'sequence_length': sequence_length
            },
            'samples': []
        }
        
        # 将每个样本转换为列表并添加到samples列表中
        for sample in samples:
            # sample的形状是(sequence_length, n_features)
            sample_list = []
            for timestep in sample:
                # timestep的形状是(n_features,)
                feature_dict = {feature: float(value) for feature, value in zip(self.columns, timestep)}
                sample_list.append(feature_dict)
            data['samples'].append(sample_list)
        
        # 保存为JSON文件
        with open(output_path, 'w') as f:
            json.dump(data, f, indent=2)
            
        print(f"已保存 {len(samples)} 个样本到 {output_path}")
        print(f"数据形状: {samples.shape}")

    def load_samples(self, filepath: str) -> np.ndarray:
        """
        从JSON文件加载样本数据

        Args:
            filepath: JSON文件路径

        Returns:
            np.ndarray: 形状为 (n_samples, sequence_length, n_features) 的数组
        """
        with open(filepath, 'r') as f:
            data = json.load(f)
        
        # 从metadata获取信息
        shape = data['metadata']['shape']
        feature_names = data['metadata']['feature_names']
        
        # 将JSON数据转换回numpy数组
        samples_list = []
        for sample in data['samples']:
            # 转换单个样本
            sample_array = np.zeros((len(sample), len(feature_names)))
            for i, timestep in enumerate(sample):
                for j, feature in enumerate(feature_names):
                    sample_array[i, j] = timestep[feature]
            samples_list.append(sample_array)
        
        # 转换为numpy数组并确保形状正确
        samples = np.array(samples_list)
        assert samples.shape == tuple(shape), f"加载的数据形状 {samples.shape} 与预期形状 {shape} 不符"
        
        print(f"已加载数据，形状: {samples.shape}")
        return samples

    def save_sample_stats(self, sequence_length: int):
        """保存样本统计信息"""
        stats_path = os.path.join(self.output_dirs[sequence_length], 'sample_statistics.txt')
        with open(stats_path, 'w', encoding='utf-8') as f:
            f.write("样本统计信息:\n")
            f.write("-" * 50 + "\n")
            f.write(f"窗口长度: {sequence_length}\n")
            f.write(f"总序列长度: {self.total_sequence_length}\n")
            f.write(f"滑动步长: {max(1, int(sequence_length * self.slide_ratio))}\n\n")
            
            f.write("每个用户可用的样本总数:\n")
            for user_id, count in sorted(self.sample_stats[sequence_length].items()):
                if not user_id.endswith('_selected'):
                    f.write(f"{user_id}: {count}\n")
            
            f.write("\n实际选择的样本数:\n")
            for user_id, count in sorted(self.sample_stats[sequence_length].items()):
                if user_id.endswith('_selected'):
                    f.write(f"{user_id.replace('_selected', '')}: {count}\n")

    def process_data(self, neg_user_ids: List[str], 
                    pred_user_ids: List[str]) -> dict:
        """处理所有数据"""
        results = {}
        
        for sequence_length in self.sequence_lengths:
            print(f"\n处理窗口长度 {sequence_length}:")
            
            # 获取正样本
            df_pos = self.load_positive_data()
            positive_samples = self.create_sequences(df_pos, self.pos_user_id, sequence_length)
            
            if len(positive_samples) == 0:
                print(f"警告: 窗口长度 {sequence_length} 未能生成正样本")
                continue

            n_positive = len(positive_samples)
            n_negative = max(1, n_positive // self.pos_neg_ratio)
            n_predict = max(1, n_positive // self.pos_neg_ratio)
            
            # 获取负样本
            negative_samples = []
            samples_per_user = max(1, n_negative // len(neg_user_ids))
            for user_id in neg_user_ids:
                df_neg = self.load_user_data(user_id)
                sequences = self.create_sequences(df_neg, user_id, sequence_length)
                if len(sequences) >= samples_per_user:
                    indices = np.linspace(0, len(sequences)-1, samples_per_user, dtype=int)
                    selected_samples = sequences[indices]
                else:
                    indices = np.random.choice(len(sequences), samples_per_user, replace=True)
                    selected_samples = sequences[indices]
                negative_samples.extend(selected_samples)
            negative_samples = np.array(negative_samples)
            
            # 获取预测样本
            predict_samples = []
            samples_per_user = max(1, n_predict // len(pred_user_ids))
            for user_id in pred_user_ids:
                df_pred = self.load_user_data(user_id)
                sequences = self.create_sequences(df_pred, user_id, sequence_length)
                if len(sequences) >= samples_per_user:
                    indices = np.linspace(0, len(sequences)-1, samples_per_user, dtype=int)
                    selected_samples = sequences[indices]
                else:
                    indices = np.random.choice(len(sequences), samples_per_user, replace=True)
                    selected_samples = sequences[indices]
                predict_samples.extend(selected_samples)
            predict_samples = np.array(predict_samples)
            
            # 保存样本
            self.save_samples(positive_samples, 'positive', sequence_length)
            self.save_samples(negative_samples, 'negative', sequence_length)
            self.save_samples(predict_samples, 'predict', sequence_length)
            
            # 保存统计信息
            self.save_sample_stats(sequence_length)
            
            results[sequence_length] = {
                'positive': positive_samples,
                'negative': negative_samples,
                'predict': predict_samples
            }
            
            print(f"窗口长度 {sequence_length} 处理完成:")
            print(f"正样本形状: {positive_samples.shape}")
            print(f"负样本形状: {negative_samples.shape}")
            print(f"预测样本形状: {predict_samples.shape}")
        
        return results


from typing import List, Tuple

def get_random_users(all_users: List[str], pos_user_id: str) -> Tuple[List[str], List[str]]:
    """
    从所有用户中随机选择负样本用户和预测用户
    
    Args:
        all_users: 所有可用的用户ID列表
        pos_user_id: 正样本用户ID
    
    Returns:
        Tuple[List[str], List[str]]: (负样本用户列表, 预测用户列表)
    """
    # 移除正样本用户
    available_users = [uid for uid in all_users if uid != pos_user_id]
    
    # 首先随机决定预测用户数量（1或2）
    n_pred = np.random.randint(1, 3)
    
    # 根据预测用户数量确定负样本用户数量
    n_neg = 8 if n_pred == 1 else 7
    
    # 随机打乱可用用户顺序
    shuffled_users = np.random.permutation(available_users)
    
    # 分配用户
    neg_users = shuffled_users[:n_neg].tolist()
    pred_users = shuffled_users[n_neg:n_neg+n_pred].tolist()
    
    return neg_users, pred_users

# 使用示例
if __name__ == "__main__":
    root_dir = r"D:/datauser/NEW_merged_files"
    output_dir = r"D:/论文数据/mouse/data"
    sequence_lengths = [50, 100, 150, 200, 250, 300,350,400]
    slide_ratio = 0.1
    pos_neg_ratio = 10
    pos_user_id = "9"
    pos_csv_path = f"user{pos_user_id}_combined_output.csv"

    # 所有可用的用户ID
    all_user_ids = ["7", "9", "12", "15", "16", "20", "21", "23", "29", "35"]
    
    try:
        processor = DataProcessor(
            root_dir=root_dir,
            output_dir=output_dir,
            sequence_lengths=sequence_lengths,
            slide_ratio=slide_ratio,
            pos_neg_ratio=pos_neg_ratio,
            pos_user_id=pos_user_id,
            pos_csv_path=pos_csv_path
        )

        # 随机选择用户
        neg_user_ids, pred_user_ids = get_random_users(all_user_ids, pos_user_id)
        
        print(f"随机选择的负样本用户 ({len(neg_user_ids)}个): {neg_user_ids}")
        print(f"随机选择的预测用户 ({len(pred_user_ids)}个): {pred_user_ids}")

        results = processor.process_data(neg_user_ids, pred_user_ids)
        print("\n所有窗口长度处理完成")
        
    except Exception as e:
        print(f"错误: {str(e)}")

自动获取的 total_sequence_length: 61145
随机选择的负样本用户 (8个): ['12', '15', '21', '16', '7', '20', '23', '29']
随机选择的预测用户 (1个): ['35']

处理窗口长度 50:
已保存 12220 个样本到 D:/论文数据/mouse/data\processed_data_user9\positive_samples_user9_50.json
数据形状: (12220, 50, 11)
已保存 1216 个样本到 D:/论文数据/mouse/data\processed_data_user9\negative_samples_user9_50.json
数据形状: (1216, 50, 11)
已保存 1222 个样本到 D:/论文数据/mouse/data\processed_data_user9\predict_samples_user9_50.json
数据形状: (1222, 50, 11)
窗口长度 50 处理完成:
正样本形状: (12220, 50, 11)
负样本形状: (1216, 50, 11)
预测样本形状: (1222, 50, 11)

处理窗口长度 100:
已保存 6105 个样本到 D:/论文数据/mouse/data\processed_data_user9\positive_samples_user9_100.json
数据形状: (6105, 100, 11)
已保存 608 个样本到 D:/论文数据/mouse/data\processed_data_user9\negative_samples_user9_100.json
数据形状: (608, 100, 11)
已保存 610 个样本到 D:/论文数据/mouse/data\processed_data_user9\predict_samples_user9_100.json
数据形状: (610, 100, 11)
窗口长度 100 处理完成:
正样本形状: (6105, 100, 11)
负样本形状: (608, 100, 11)
预测样本形状: (610, 100, 11)

处理窗口长度 150:
已保存 4067 个样本到 D:/论文数据/mouse/data\pro

In [9]:
import pandas as pd
import os

def extract_rows(csv_path, num_rows):
    """
    从 CSV 文件的开头提取指定数量的连续行，确保不少于要求的行数
    """
    # 检查文件是否存在
    if not os.path.isfile(csv_path):
        raise FileNotFoundError(f"文件未找到: {csv_path}")
    
    # 先读取 CSV 文件的总行数（不包括标题）
    with open(csv_path, 'r', encoding='utf-8') as f:
        total_lines = sum(1 for _ in f) - 1  # 减去标题行
    
    if total_lines < num_rows:
        raise ValueError(f"文件 {csv_path} 的总行数 ({total_lines}) 少于要求的行数 ({num_rows})")
    
    # 设置起始行为0，从头开始提取
    start_row = 0
    
    # 使用 pandas 读取指定行
    # skiprows=1 跳过标题行，nrows=num_rows 读取指定的行数
    df = pd.read_csv(csv_path, skiprows=range(1, start_row + 1), nrows=num_rows)
    return df

def combine_csvs(csv_requests, output_csv):
    """
    提取多个 CSV 文件的数据并合并
    """
    dataframes = []
    header_saved = False  # 标记是否已经保存标题行
    
    for request in csv_requests:
        csv_path = request['path']
        num_rows = request['rows']
        
        print(f"正在处理文件: {csv_path}，需要提取 {num_rows} 行数据...")
        
        try:
            df = extract_rows(csv_path, num_rows)
            if not header_saved:
                dataframes.append(df)
                header_saved = True
            else:
                dataframes.append(df)
            print(f"成功从 {csv_path} 提取 {num_rows} 行数据。")
        except (FileNotFoundError, ValueError) as e:
            print(e)
            print(f"跳过文件: {csv_path}。\n")
    
    if dataframes:
        # 竖向拼接所有数据，保留一个标题行
        combined_df = pd.concat(dataframes, ignore_index=True)
        combined_df.to_csv(output_csv, index=False, encoding='utf-8-sig')
        print(f"所有数据已成功合并并保存到 {output_csv}。")
    else:
        print("没有数据被提取和合并。")

def main():
    # 配置部分
    output_csv = 'user9_combined_output.csv'  # 输出的 CSV 文件名
    
    # 定义要处理的 CSV 文件路径及对应需要提取的行数
    # 请将这里的路径替换为您的实际 CSV 文件路径
    csv_requests = [
        {'path': 'D:/datauser/NEW_Data_files/user9/session_0335985747.csv', 'rows': 7800},
        {'path': 'D:/datauser/NEW_Data_files/user9/session_3390119815.csv', 'rows': 8725},
        {'path': 'D:/datauser/NEW_Data_files/user9/session_3879637058.csv', 'rows': 11221},
        {'path': 'D:/datauser/NEW_Data_files/user9/session_4373781904.csv', 'rows': 8435},
        {'path': 'D:/datauser/NEW_Data_files/user9/session_5155383252.csv', 'rows': 8200},
        {'path': 'D:/datauser/NEW_Data_files/user9/session_7285432516.csv', 'rows': 8545},
        {'path': 'D:/datauser/NEW_Data_files/user9/session_8764610836.csv', 'rows': 8219}
        # 添加更多的 CSV 文件和对应的行数
    ]
    
    # 可选：设置随机种子以确保可重复性
    # random.seed(42)
    
    combine_csvs(csv_requests, output_csv)

if __name__ == "__main__":
    main()


正在处理文件: D:/datauser/NEW_Data_files/user9/session_0335985747.csv，需要提取 7800 行数据...
成功从 D:/datauser/NEW_Data_files/user9/session_0335985747.csv 提取 7800 行数据。
正在处理文件: D:/datauser/NEW_Data_files/user9/session_3390119815.csv，需要提取 8725 行数据...
成功从 D:/datauser/NEW_Data_files/user9/session_3390119815.csv 提取 8725 行数据。
正在处理文件: D:/datauser/NEW_Data_files/user9/session_3879637058.csv，需要提取 11221 行数据...
成功从 D:/datauser/NEW_Data_files/user9/session_3879637058.csv 提取 11221 行数据。
正在处理文件: D:/datauser/NEW_Data_files/user9/session_4373781904.csv，需要提取 8435 行数据...
成功从 D:/datauser/NEW_Data_files/user9/session_4373781904.csv 提取 8435 行数据。
正在处理文件: D:/datauser/NEW_Data_files/user9/session_5155383252.csv，需要提取 8200 行数据...
成功从 D:/datauser/NEW_Data_files/user9/session_5155383252.csv 提取 8200 行数据。
正在处理文件: D:/datauser/NEW_Data_files/user9/session_7285432516.csv，需要提取 8545 行数据...
成功从 D:/datauser/NEW_Data_files/user9/session_7285432516.csv 提取 8545 行数据。
正在处理文件: D:/datauser/NEW_Data_files/user9/session_8764610836.csv，需要提取 8219 

In [None]:
session_0335985747.csv	session_3390119815.csv	session_3879637058.csv	session_4373781904.csv	session_5155383252.csv	session_7285432516.csv	session_8764610836.csv
