In [None]:
#!/usr/bin/env python3
"""
mineru_batch_upload.py
批量上传本地文件到 MinerU 并下载解析结果 ZIP。s
依赖: requests
"""

import os
import time
import math
import json
import requests
from concurrent.futures import ThreadPoolExecutor, as_completed

# ========== 配置区 ==========
API_KEY = "eyJ0eXBlIjoiSldUIiwiYWxnIjoiSFM1MTIifQ.eyJqdGkiOiIxMzQwMDUzMCIsInJvbCI6IlJPTEVfUkVHSVNURVIiLCJpc3MiOiJPcGVuWExhYiIsImlhdCI6MTc1ODA5NTUyNCwiY2xpZW50SWQiOiJsa3pkeDU3bnZ5MjJqa3BxOXgydyIsInBob25lIjoiIiwib3BlbklkIjpudWxsLCJ1dWlkIjoiNjg0MTMyOTktYTM4ZC00MzJlLWJhMTQtM2JlOTQyMWJhZjI1IiwiZW1haWwiOiIiLCJleHAiOjE3NTkzMDUxMjR9.Gt0xmzkhK7Q_CIw7L09TGqSy7iRPey5yDXrCTfRmE3G9ftFeYF66gVYgZqdljTKC2j40WfZ43vVyW8R6ztmK1Q"          # <- 替换成你的 Token
BASE_URL = "https://mineru.net/api/v4"
INPUT_DIR = "/Users/bytedance/Project/OmniDocBench/OursDataset/images"                   # <- 放本地文件的目录
OUTPUT_DIR = "/Users/bytedance/Project/OmniDocBench/Models_ouput/mineru_vlm_results/zip"        # <- 解析后下载 zip 的目录
MAX_PER_BATCH = 200                    # 官方限制：单次申请 <= 200 个
UPLOAD_WORKERS = 6                     # 并发上传线程数（可调）
POLL_INTERVAL = 8                      # 轮询批量任务间隔（秒）
POLL_TIMEOUT = 60 * 60                 # 轮询超时（秒），默认 1 小时（可改）
ALLOWED_EXT = {".pdf", ".doc", ".docx", ".ppt", ".pptx", ".png", ".jpg", ".jpeg"}

# ========== 准备 ==========
os.makedirs(OUTPUT_DIR, exist_ok=True)
session = requests.Session()
headers_json = {
    "Authorization": f"Bearer {API_KEY}",
    "Content-Type": "application/json"
}

def list_files(input_dir):
    files = []
    for fn in sorted(os.listdir(input_dir)):
        path = os.path.join(input_dir, fn)
        if os.path.isfile(path) and os.path.splitext(fn.lower())[1] in ALLOWED_EXT:
            files.append(path)
    return files

def chunk(lst, n):
    """按 n 大小分块（生成器）"""
    for i in range(0, len(lst), n):
        yield lst[i:i + n]

# ========== 申请上传链接 ==========
def apply_upload_urls(file_names, global_options=None):
    """
    调用 POST /file-urls/batch
    file_names: list of filenames (basename only)
    global_options: dict of options like enable_formula, language, enable_table
    返回 (batch_id, upload_urls_mapping)
      - upload_urls_mapping: list of upload-url（顺序对应 file_names）
    """
    url = f"{BASE_URL}/file-urls/batch"
    files_payload = []
    for name in file_names:
        files_payload.append({"name": name, "is_ocr": False})  # 默认不 OCR，按需改 True
    payload = global_options.copy() if global_options else {}
    payload.update({"files": files_payload})
    resp = session.post(url, headers=headers_json, json=payload, timeout=30)
    resp.raise_for_status()
    res = resp.json()
    if res.get("code") != 0:
        raise RuntimeError(f"apply upload urls failed: {res}")
    data = res["data"]
    # 支持返回两种结构：data.file_urls (list) 或 data.files (dict)
    if "file_urls" in data:
        urls = data["file_urls"]
    elif "files" in data:
        # data.files 可能是 dict: { "filename": "url" }
        files_dict = data["files"]
        # 保持与 file_names 顺序一致
        urls = [files_dict.get(name) for name in file_names]
    else:
        raise RuntimeError("unexpected response: missing file_urls/files")
    batch_id = data.get("batch_id")
    return batch_id, urls

# ========== 上传单个文件（PUT） ==========
def upload_one(upload_url, local_path):
    """
    注意：官方说明上传时无须设置 Content-Type。使用 data=open(...,'rb') 即可。
    返回 True/False
    """
    with open(local_path, "rb") as f:
        # 不显式设置 Content-Type（官方说无需设置）
        r = session.put(upload_url, data=f, timeout=300)
    return r.status_code in (200, 201)

# ========== 轮询批量结果 ==========
def poll_batch_results(batch_id, timeout=POLL_TIMEOUT, interval=POLL_INTERVAL):
    """
    GET /extract-results/batch/{batch_id}，直到每个文件状态为 done/failed，返回最终 data（dict）
    """
    url = f"{BASE_URL}/extract-results/batch/{batch_id}"
    deadline = time.time() + timeout
    while time.time() < deadline:
        r = session.get(url, headers=headers_json, timeout=30)
        if not r.ok:
            print("poll request failed:", r.status_code, r.text)
            time.sleep(interval)
            continue
        j = r.json()
        if j.get("code") != 0:
            print("poll api returned error:", j)
            time.sleep(interval)
            continue
        data = j.get("data", {})
        results = data.get("extract_result", [])
        # 如果没有 extract_result 说明还未开始，继续等
        if not results:
            print("batch not ready yet (no extract_result). sleeping...")
            time.sleep(interval)
            continue
        # 判断是否全部结束（done 或 failed）
        all_done = True
        for item in results:
            st = item.get("state")
            if st not in ("done", "failed"):
                all_done = False
                break
        if all_done:
            return data
        # 打印进度摘要
        summary = {it.get("state"): sum(1 for i in results if i.get("state")==it.get("state")) for it in results}
        print(f"batch {batch_id} progress summary: {summary} — sleeping {interval}s")
        time.sleep(interval)
    raise TimeoutError(f"batch poll timeout after {timeout} seconds for batch {batch_id}")

# ========== 下载结果 ZIP ==========
def download_zip(zip_url, out_dir, suggested_name=None):
    r = session.get(zip_url, stream=True, timeout=120)
    r.raise_for_status()
    if suggested_name:
        outpath = os.path.join(out_dir, suggested_name)
    else:
        outpath = os.path.join(out_dir, os.path.basename(zip_url.split("?")[0]))
    with open(outpath, "wb") as f:
        for chunk in r.iter_content(1024 * 64):
            if chunk:
                f.write(chunk)
    return outpath

# ========== 主流程 ==========
def process_all(input_dir, output_dir):
    all_files = list_files(input_dir)
    if not all_files:
        print("No files found in", input_dir)
        return

    for batch_idx, chunk_files in enumerate(chunk(all_files, MAX_PER_BATCH), start=1):
        print(f"\n=== Processing batch {batch_idx} ({len(chunk_files)} files) ===")
        base_names = [os.path.basename(p) for p in chunk_files]
        # 1) 申请上传链接
        print("申请上传链接...")
        try:
            batch_id, upload_urls = apply_upload_urls(base_names, global_options={
                "enable_formula": True,
                "enable_table": True,
                "model_version": 'vlm',
                "language": "ch"   # or "en"
            })
        except Exception as e:
            print("申请上传链接失败:", e)
            continue
        if not upload_urls or len(upload_urls) != len(base_names):
            print("返回的 upload_urls 长度与文件数量不一致，返回数据：", upload_urls)
            continue
        print(f"batch_id={batch_id}, got {len(upload_urls)} upload urls")

        # 2) 并发上传
        print("开始并发上传文件（PUT 到返回的 upload url）...")
        upload_results = {}
        with ThreadPoolExecutor(max_workers=UPLOAD_WORKERS) as ex:
            future_to_idx = {}
            for idx, (local_path, up_url) in enumerate(zip(chunk_files, upload_urls)):
                future = ex.submit(upload_one, up_url, local_path)
                future_to_idx[future] = (idx, local_path, up_url)
            for fut in as_completed(future_to_idx):
                idx, local_path, up_url = future_to_idx[fut]
                try:
                    ok = fut.result()
                except Exception as e:
                    ok = False
                    print("上传异常:", local_path, e)
                upload_results[local_path] = ok
                print(f"上传 {'OK' if ok else 'FAIL'}: {local_path}")

        # 报告有失败的上传
        failed = [p for p, ok in upload_results.items() if not ok]
        if failed:
            print("以下文件上传失败（请重试或检查网络）:", failed)
            # 这里继续后续轮询（系统只会对成功上传的文件提交任务）
        else:
            print("全部上传成功。")

        # 3) 轮询批量结果（API 会看到已经上传的文件并自动提交解析任务）
        print("开始轮询批量解析结果...")
        try:
            batch_data = poll_batch_results(batch_id)
        except Exception as e:
            print("轮询失败或超时:", e)
            continue

        # 4) 下载每个文件的 full_zip_url（若 state=done）
        extract_results = batch_data.get("extract_result", [])
        for item in extract_results:
            fname = item.get("file_name") or item.get("file_name")
            state = item.get("state")
            zip_url = item.get("full_zip_url")
            print(f"file={fname} state={state} zip={zip_url}")
            if state == "done" and zip_url:
                out_name = f"{fname}.result.zip"
                try:
                    saved = download_zip(zip_url, output_dir, suggested_name=out_name)
                    print("下载成功:", saved)
                except Exception as e:
                    print("下载失败:", e)
            elif state == "failed":
                print("解析失败，err_msg:", item.get("err_msg"))

    print("\n全部批次处理完成。")

if __name__ == "__main__":
    process_all(INPUT_DIR, OUTPUT_DIR)


In [None]:
len(os.listdir('/Users/bytedance/Project/OmniDocBench/Models_ouput/mineru_vlm_results/zip'))

In [None]:
!unzip /Users/bytedance/Project/OmniDocBench/Models_ouput/mineru_vlm_results/zip/057_12.jpg.result.zip -d /Users/bytedance/Project/OmniDocBench/Models_ouput/mineru_vlm_results/057_12

In [None]:
import subprocess

target_path = "/Users/bytedance/Project/OmniDocBench/Models_ouput/mineru_results/zip"
for file in os.listdir(target_path):
    file_path = os.path.join(target_path, file)
    subprocess.run(f"unzip {file_path} -d /Users/bytedance/Project/OmniDocBench/Models_ouput/mineru_results/{file.split('.')[0]}", shell=True)
    print(f"完成: {file}")


In [23]:
import os, shutil


folder = "/Users/bytedance/Project/OmniDocBench/Models_ouput/mineru_vlm_results"
dst = "/Users/bytedance/Project/OmniDocBench/Models_ouput/mineru_vlm_results_md"
for file in os.listdir(folder):
    full_path = os.path.join(folder, file)
    if os.path.isdir(full_path):
        # os.makedirs(os.path.join(dst, file), exist_ok=True)
        shutil.move(f"{full_path}/full.md", f"{dst}/{file}.md")
        print(f"完成: {file}")


完成: 087_03
完成: 080_14
完成: 100_04
完成: 073_14
完成: 074_03
完成: 080_13
完成: 087_04
完成: 058_08
完成: 074_04
完成: 073_13
完成: 100_03
完成: 058_01
完成: 083_02
完成: 077_15
完成: 070_02
完成: 098_11
完成: 083_05
完成: 058_06
完成: 098_16
完成: 070_05
完成: 077_12
完成: 060_05
完成: 094_12
完成: 093_05
完成: 088_11
完成: 078_09
完成: 060_02
完成: 093_02
完成: 064_04
完成: 063_13
完成: 097_04
完成: 057_13
完成: 064_03
完成: 097_03
完成: 078_07
完成: 058_07
完成: 083_04
完成: 077_13
完成: 070_04
完成: 098_17
完成: 083_03
完成: 098_10
完成: 070_03
完成: 077_14
完成: 068_01
完成: 087_05
完成: 080_12
完成: 073_12
完成: 100_02
完成: 058_09
完成: 080_15
完成: 087_02
完成: 100_05
完成: 074_02
完成: 073_15
完成: 057_12
完成: 064_02
完成: 063_15
完成: 078_06
完成: 097_02
完成: 063_12
完成: 064_05
完成: 097_05
完成: 078_01
完成: 060_03
完成: 088_10
完成: 078_08
完成: 093_03
完成: 060_04
完成: 093_04
完成: 059_07
完成: 099_28
完成: 071_04
完成: 099_17
完成: 069_08
完成: 099_10
完成: 071_03
完成: 069_01
完成: 099_19
完成: 081_12
完成: 086_05
完成: 075_05
完成: 099_26
完成: 059_09
完成: 086_02
完成: 069_06
完成: 099_21
完成: 075_02
完成: 062_15
完成: 065_02
完成: 079_06