## 08. 打造可配置、可扩展的自动化预处理流水线

## 二、关键步骤实现
### 1. PDF 文本提取（PaddleOCR API）
功能 ：调用 PaddleOCR 提取 PDF 中的文本，支持多语言、复杂排版。

In [None]:
import requests

class OCREngine:
    def __init__(self, config):
        self.api_url = config["ocr"]["api_url"]  # 从配置加载 OCR API 地址
        self.high_precision = config["ocr"].get("high_precision", False)

    def extract_text(self, file_path):
        """调用 PaddleOCR API 提取 PDF 文本"""
        with open(file_path, "rb") as f:
            files = {"file": f}
            response = requests.post(self.api_url, files=files)
        if response.status_code == 200:
            return response.json()["text"]
        else:
            raise Exception(f"OCR 提取失败: {response.text}")

### 2. 文本预处理与分段
功能 ：清洗 OCR 文本并按配置规则分段。
代码示例 ：

In [2]:
import re

class TextProcessor:
    def __init__(self, config):
        self.config = config
        self.rules = config["dify"]["process_rules"]

    def preprocess(self, text):
        """应用预处理规则（如去空格、去 URL）"""
        if self.rules["pre_processing"][0]["enabled"]:
            text = re.sub(r"\s+", " ", text)  # 去除多余空格 [[7]]
        return text

    def segment(self, text):
        """按配置分段（如按标题分割）"""
        separator = self.rules["segmentation"]["separator"]
        return re.split(separator, text)  # 分段逻辑 [[7]]

### 3. 上传至 Dify 知识库
功能 ：调用 Dify API 上传文本或文件，并绑定元数据。
代码示例 ：

In [3]:
import requests
import json

class DifyUploader:
    def __init__(self, config):
        self.api_key = config["dify"]["api_key"]
        self.dataset_id = config["dify"]["dataset_id"]
        self.base_url = f"https://api.dify.ai/v1/datasets/ {self.dataset_id}"

    def upload_by_text(self, segments):
        """通过文本创建文档"""
        headers = {"Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json"}
        for i, content in enumerate(segments):
            payload = {
                "name": f"segment_{i}.txt",
                "text": content,
                "indexing_technique": "high_quality",
                "process_rule": self.rules  # 从配置加载分段规则 [[7]]
            }
            response = requests.post(f"{self.base_url}/document/create_by_text", headers=headers, data=json.dumps(payload))
            print(f"Segment {i} uploaded: {response.status_code}")
        return response.json()["document"]["id"]

### 4. 动态元数据绑定
功能 ：根据配置动态绑定元数据字段（如作者、来源）。
代码示例 ：

In [4]:
def bind_metadata(document_id, config, filename):
    """动态绑定元数据（如从文件名提取来源）"""
    url = f"https://api.dify.ai/v1/datasets/ {config['dify']['dataset_id']}/documents/metadata"
    headers = {"Authorization": f"Bearer {config['dify']['api_key']}", "Content-Type": "application/json"}
    metadata_list = []
    for field in config["metadata"]["fields"]:
        value = field["value"]
        if field["value_from"] == "filename":
            value = filename  # 从文件名动态赋值 [[7]]
        metadata_list.append({
            "id": generate_metadata_id(field["name"]),  # 从 Dify API 获取字段 ID
            "value": value,
            "name": field["name"]
        })
    payload = {"operation_data": [{"document_id": document_id, "metadata_list": metadata_list}]}
    requests.post(url, headers=headers, data=json.dumps(payload))

## 三、可配置与可扩展设计
### 1. 配置驱动：外部化参数管理
示例：config.yaml

### 2. 插件式扩展：新增预处理规则
示例：新增去特殊字符插件

In [5]:
class RemoveSpecialChars:
    def __init__(self, enabled=True):
        self.enabled = enabled

    def apply(self, text):
        if self.enabled:
            return re.sub(r"[^\w\s]", "", text)  # 去除特殊字符
        return text

# 集成到 TextProcessor
class TextProcessor:
    def __init__(self, config):
        self.plugins = [
            RemoveSpecialChars(enabled=True),  # 动态加载插件 [[9]]
            # 可扩展：新增插件
        ]

    def preprocess(self, text):
        for plugin in self.plugins:
            text = plugin.apply(text)
        return text

### 3. 异步任务与扩展性支持
示例：异步处理 PDF

In [None]:
from celery import Celery

app = Celery('tasks', broker='redis://localhost:6379/0')

@app.task
def process_pdf_async(pdf_path):
    config = load_config()
    ocr_engine = OCREngine(config)
    raw_text = ocr_engine.extract_text(pdf_path)
    
    processor = TextProcessor(config)
    cleaned_text = processor.preprocess(raw_text)
    segments = processor.segment(cleaned_text)
    
    uploader = DifyUploader(config)
    document_id = uploader.upload_by_text(segments)
    
    bind_metadata(document_id, config, pdf_path)

## 四、完整流程示例

In [None]:
def main(pdf_path):
    config = load_config()  # 加载配置文件 [[4]]
    
    # 1. OCR 提取文本
    ocr_engine = OCREngine(config)
    raw_text = ocr_engine.extract_text(pdf_path)
    
    # 2. 文本预处理与分段
    processor = TextProcessor(config)
    cleaned_text = processor.preprocess(raw_text)
    segments = processor.segment(cleaned_text)
    
    # 3. 上传至 Dify
    uploader = DifyUploader(config)
    document_id = uploader.upload_by_text(segments)
    
    # 4. 绑定元数据
    bind_metadata(document_id, config, pdf_path)

if __name__ == "__main__":
    main("example.pdf")

## 五、总结与引用来源
可配置性 ：通过 config.yaml 集中管理 OCR 模式、分段规则、索引策略等参数 。

可扩展性 ：
- 插件化设计 ：支持新增预处理插件（如 RemoveSpecialChars）。
- 异步支持 ：结合 Celery 和 Redis 实现高并发处理 。
- 元数据动态绑定 ：通过配置定义字段来源（如文件名、时间戳）。

通过上述设计，系统可灵活应对多源异构数据处理场景，显著提升 RAG 构建效率与维护性。