In [None]:
"""
openai的batch api接口
地址：https://platform.openai.com/docs/guides/batch?lang=python

下面技巧可以列出所有提交过的batch任务：
import openai
# 列出所有批处理任务
batches = openai.batches.list()

batches.data
"""

import json
import logging

from openai import OpenAI
from langchain_core.prompts import ChatPromptTemplate

logging.basicConfig(filename="openai_batch.log", level=logging.INFO)
logger = logging.getLogger(__name__)

class OpenaiBatchRequest:
    def __init__(self, 
                 prompt: ChatPromptTemplate,
                 ) -> None:
        """
        openai batch api基础构建

        流程：
        构建ChatPromptTemplate传入 -> 填充ChatPromptTemplate（传入inputs_dict） 来生成jsonl -> 将jsonl上传openai并构建batch task
        -> 监听task status -> 获得结果
        """
        self.client = OpenAI()

    @property
    def task_status(self):
        return self.client.batches.retrieve(self.batches_id).status
    
    def roleName_replaced(self, msg_type: str):
        if msg_type == "human":
            return "user"
        if msg_type == "ai":
            return "assistant"
        if msg_type == "system":
            return "system" 
        
    def create_openai_batch_jsonl(self,
                                prompt: ChatPromptTemplate,
                                inputs_dict_list: list,
                                batch_jsonl_path: str,
                                model_name="gpt-4o"):
        print("开始构建openai batch jsonl输入数据:", batch_jsonl_path)
        with open(batch_jsonl_path, "w+", encoding="utf-8") as f:
            for request_id, input_dict in enumerate(inputs_dict_list):
                # 渲染 Prompt
                messages = prompt.format_messages(**input_dict)

                # 转换成 openai batch 中要求的jsonl结构
                json_messages = [{"role": self.roleName_replaced(msg.type), "content": msg.content} for msg in messages]
                
                final_payload = {
                    "custom_id": f"request-{request_id}",
                    "method": "POST",
                    "url": "/v1/chat/completions",
                    "body": {
                        "model": f"{model_name}",
                        "messages": json_messages
                    }
                }
                f.write(json.dumps(final_payload, ensure_ascii=False)+"\n")
        print("构建成功")

    def create_batch_task(self, batch_jsonl_path: str):
        print("开始上传jsonl文件:", batch_jsonl_path)
        batch_input_file = self.client.files.create(
            file=open(batch_jsonl_path, "rb"),
            purpose="batch"
        )
        print("jsonl文件上传成功")

        batch_input_file_id = batch_input_file.id

        print("创建batch任务~")
        batches = self.client.batches.create(
            input_file_id=batch_input_file_id,
            endpoint="/v1/chat/completions",
            completion_window="24h",
        )
        print("batch任务创建成功~")
        self.batches_id = batches.id
    
    def get_result(self):
        answers = []
        if self.task_status == "completed":
            file_response = self.client.files.content(self.client.batches.retrieve(self.batches_id).output_file_id)
            for resp in file_response.iter_lines():
                answer = json.loads(resp)["response"]["body"]["choices"][0]["message"]["content"]
                answers.append(answer)
            print("任务完成，获得输出数据")
        else:
            print("任务还未完成，任务状态:", self.task_status)
        return answers

In [None]:
"""
1.obr = OpenaiBatchRequest()
2.obr.create_openai_batch_jsonl(
    prompt, # ChatPromptTemplate模版
    inputs_dict_list, # 字典数据list，用来填充prompt中的placeholder数据
    batch_jsonl_path # 上传的jsonl数据中间文件
)
3.obr.create_batch_task(batch_jsonl_path) # 将中间文件jsonl上传到openai构建batch任务
4.answers = obr.get_result()
5.第四步骤没问题的情况下，使用answers做进一步处理
"""