# 数据处理模块（Data Process Module）

## 📖 概述

数据处理模块是 RM-Gallery 核心数据处理系统的关键组件，为用户提供了统一、灵活的数据处理解决方案。该模块基于**操作符管道（Operator Pipeline）**的设计理念，允许用户通过灵活组合多个操作符来构建复杂的数据处理工作流。

## 🏗️ 架构设计

### 核心组件

#### 1. **DataProcess** - 数据处理引擎
   - 继承自 `BaseDataModule`，提供标准化的数据处理接口
   - 负责管理和编排操作符序列的执行顺序
   - 支持批量数据处理和实时数据流处理

#### 2. **BaseOperator** - 操作符抽象基类
   - 定义了操作符的标准接口规范
   - 支持泛型类型，确保类型安全
   - 提供可扩展的数据处理抽象方法

#### 3. **OperatorFactory** - 操作符工厂
   - 实现操作符的统一注册和动态创建机制
   - 无缝集成 data-juicer 生态系统的操作符
   - 支持基于配置的操作符实例化

## 🔧 核心特性

### 1. 🔄 管道式数据处理
- **链式操作**：支持多个操作符的无缝串联执行
- **元数据保持**：完整保留原始数据集的元数据信息
- **全程追踪**：提供详细的处理日志、性能统计和数据流向追踪

### 2. 🛠️ 丰富的操作符生态
- **内置操作符**：
  - `TextLengthFilter` - 基于文本长度的智能过滤器
  - `ConversationTurnFilter` - 对话轮次数量过滤器
- **外部集成**：
  - 完整支持 data-juicer 操作符库
  - 支持用户自定义操作符扩展

### 3. ⚙️ 配置驱动的设计
- **声明式配置**：通过配置文件灵活定义数据处理流程
- **参数化控制**：所有操作符参数均可通过配置文件调整
- **动态调整**：支持运行时动态修改处理参数



## 🚀 快速入门

### 方式一：直接创建操作符

In [None]:
from rm_gallery.core.data.process.process import create_process_module
from rm_gallery.core.data.process.ops.filter.text_length_filter import TextLengthFilter
from rm_gallery.core.data.process.ops.filter.conversation_turn_filter import ConversationTurnFilter
from rm_gallery.core.data.load.base import DataLoad
import rm_gallery.core.data     # 核心策略注册
import rm_gallery.gallery.data  # 扩展策略注册

# 配置本地文件加载参数
config = {
    "path": "/Users/xielipeng/RM-Gallery/data/reward-bench-2/data/test-00000-of-00001.parquet",
    "limit": 1000,  # 限制加载的数据条数
}

# 创建数据加载器
loader = DataLoad(
    name="rewardbench2",           # 数据集名称
    load_strategy_type="local",    # 使用本地文件加载策略
    data_source="rewardbench2",    # 指定数据源格式转换器
    config=config                  # 传入配置参数
)

# 执行数据加载
dataset = loader.run()

# 创建操作符
text_filter = TextLengthFilter(
    name="text_length_filter",
    config={"min_length": 50, "max_length": 2000}
)

turn_filter = ConversationTurnFilter(
    name="conversation_turn_filter", 
    config={"min_turns": 1, "max_turns": 10}
)

# 创建数据处理模块
processor = create_process_module(
    name="data_processor",
    operators=[text_filter, turn_filter]
)

# 处理数据
result = processor.run(dataset)
print(f"处理前: {len(dataset.datas)} 条数据")
print(f"处理后: {len(result.datas)} 条数据")

[32m2025-06-17 14:23:24.600[0m | [1mINFO    [0m | [36mrm_gallery.core.data.load.base[0m:[36m_load_data_impl[0m:[36m275[0m - [1mLoaded 1865 samples from file: /Users/xielipeng/RM-Gallery/data/reward-bench-2/data/test-00000-of-00001.parquet[0m
处理前: 1000 条数据
处理后: 140 条数据
[32m2025-06-17 14:23:24.601[0m | [1mINFO    [0m | [36mrm_gallery.core.data.load.base[0m:[36mrun[0m:[36m180[0m - [1mApplied limit of 1000, final count: 1000[0m
[32m2025-06-17 14:23:24.601[0m | [1mINFO    [0m | [36mrm_gallery.core.data.load.base[0m:[36mrun[0m:[36m194[0m - [1mSuccessfully loaded 1000 items from rewardbench2[0m
[32m2025-06-17 14:23:24.610[0m | [1mINFO    [0m | [36mrm_gallery.core.data.process.process[0m:[36mrun[0m:[36m52[0m - [1mProcessing 1000 items with 2 operators[0m
[32m2025-06-17 14:23:24.610[0m | [1mINFO    [0m | [36mrm_gallery.core.data.process.process[0m:[36mrun[0m:[36m59[0m - [1mApplying operator 1/2: text_length_filter[0m
[32m2025-06-17 14

[32m2025-06-17 14:23:24.614[0m | [1mINFO    [0m | [36mrm_gallery.core.data.process.process[0m:[36mrun[0m:[36m59[0m - [1mApplying operator 2/2: conversation_turn_filter[0m
[32m2025-06-17 14:23:24.614[0m | [1mINFO    [0m | [36mrm_gallery.core.data.process.process[0m:[36mrun[0m:[36m63[0m - [1mOperator conversation_turn_filter completed: 140 items remaining[0m
[32m2025-06-17 14:23:24.614[0m | [1mINFO    [0m | [36mrm_gallery.core.data.process.process[0m:[36mrun[0m:[36m87[0m - [1mProcessing completed: 1000 -> 140 items[0m


### 方式二：配置化批量处理

通过配置文件的方式可以更加灵活地定义数据处理流程，特别适合复杂的多步骤处理场景。

In [None]:
# 通过配置创建操作符
from rm_gallery.core.data.process.process import create_process_module
from rm_gallery.core.data.load.base import DataLoad
from rm_gallery.core.data.process.ops.base import OperatorFactory
import rm_gallery.core.data     # 核心策略注册
import rm_gallery.gallery.data  # 扩展策略注册

# 配置本地文件加载参数
config = {
    "path": "/Users/xielipeng/RM-Gallery/data/reward-bench-2/data/test-00000-of-00001.parquet",
    "limit": 1000,  # 限制加载的数据条数
}

# 创建数据加载器
loader = DataLoad(
    name="rewardbench2",           # 数据集名称
    load_strategy_type="local",    # 使用本地文件加载策略
    data_source="rewardbench2",    # 指定数据源格式转换器
    config=config                  # 传入配置参数
)

# 执行数据加载
dataset = loader.run()

# 配置多个操作符
operator_configs = [
    {
        "type": "filter",
        "name": "conversation_turn_filter",
        "config": {"min_turns": 1, "max_turns": 8}
    },
    {
        "type": "filter",
        "name": "text_length_filter", 
        "config": {"min_length": 100, "max_length": 2000}
    },
    {
        "type": "data_juicer",
        "name": "character_repetition_filter",
        "config": {
            "rep_len": 10,
            "min_ratio": 0.0,
            "max_ratio": 0.5
        }
    }
]

# 批量创建操作符
operators = [OperatorFactory.create_operator(config) for config in operator_configs]

# 创建处理器
processor = create_process_module(
    name="batch_processor",
    operators=operators
)

result = processor.run(dataset)
print(f"处理前: {len(dataset.datas)} 条数据")
print(f"处理后: {len(result.datas)} 条数据")

[32m2025-06-17 14:53:26.626[0m | [1mINFO    [0m | [36mrm_gallery.core.data.load.base[0m:[36m_load_data_impl[0m:[36m275[0m - [1mLoaded 1865 samples from file: /Users/xielipeng/RM-Gallery/data/reward-bench-2/data/test-00000-of-00001.parquet[0m
[32m2025-06-17 14:53:26.627[0m | [1mINFO    [0m | [36mrm_gallery.core.data.load.base[0m:[36mrun[0m:[36m180[0m - [1mApplied limit of 1000, final count: 1000[0m
[32m2025-06-17 14:53:26.627[0m | [1mINFO    [0m | [36mrm_gallery.core.data.load.base[0m:[36mrun[0m:[36m194[0m - [1mSuccessfully loaded 1000 items from rewardbench2[0m
[32m2025-06-17 14:53:26.640[0m | [1mINFO    [0m | [36mrm_gallery.core.data.process.process[0m:[36mrun[0m:[36m52[0m - [1mProcessing 1000 items with 3 operators[0m
[32m2025-06-17 14:53:26.641[0m | [1mINFO    [0m | [36mrm_gallery.core.data.process.process[0m:[36mrun[0m:[36m59[0m - [1mApplying operator 1/3: conversation_turn_filter[0m
[32m2025-06-17 14:53:26.642[0m | [1m

处理前: 1000 条数据
处理后: 159 条数据


## 🛠️ 高级功能

### 自定义操作符开发

当内置操作符无法满足特定需求时，您可以轻松创建自定义操作符。以下是完整的开发流程：

#### 步骤1：实现操作符类

在 `rm_gallery/gallery/data/process/` 目录下创建自定义操作符：

```python
from rm_gallery.core.data.process.ops.base import BaseOperator, OperatorFactory

@OperatorFactory.register("custom_filter")
class CustomFilter(BaseOperator):
    """自定义数据过滤器示例"""
    
    def process_dataset(self, items):
        """
        处理数据集的核心方法
        
        Args:
            items: 输入的数据项列表
            
        Returns:
            过滤后的数据项列表
        """
        filtered_items = []
        for item in items:
            if self._custom_condition(item):
                filtered_items.append(item)
        return filtered_items
    
    def _custom_condition(self, item):
        """
        自定义过滤条件
        
        Args:
            item: 单个数据项
            
        Returns:
            bool: 是否保留该数据项
        """
        # 在这里实现您的过滤逻辑
        return True
```

#### 步骤2：注册操作符

在 `rm_gallery/gallery/data/__init__.py` 中导入操作符以完成注册：

```python
from rm_gallery.gallery.data.process.custom_filter import CustomFilter
```

### Data-Juicer 操作符集成

RM-Gallery 无缝集成了 data-juicer 生态系统，让您可以使用其丰富的数据处理操作符：

```python
# 使用 data-juicer 操作符的配置示例
config = {
    "type": "data_juicer",
    "name": "text_length_filter",
    "config": {
        "min_len": 10,
        "max_len": 20
    }
}

operator = OperatorFactory.create_operator(config)
```

## 🔍 支持的操作符

### RM-Gallery 内置操作符

| 操作符名称 | 功能描述 | 配置参数 |
|-----------|----------|----------|
| `TextLengthFilter` | 根据文本长度过滤数据样本 | `min_length`, `max_length` |
| `ConversationTurnFilter` | 根据对话轮次数量过滤样本 | `min_turns`, `max_turns` |

### Data-Juicer 集成操作符

| 操作符名称 | 功能描述 | 状态 |
|-----------|----------|------|
| `text_length_filter` | 文本长度过滤 | ✅ 已测试 |
| `character_repetition_filter` | 字符重复过滤 | ✅ 已测试 |
| `word_repetition_filter` | 单词重复过滤 | 🔄 测试中 |

> 💡 **提示**: 我们持续添加和测试新的操作符，敬请期待更多功能！



