In [1]:
import os
from pathlib import Path
from openai import OpenAI

client = OpenAI(
    api_key='',  # 如果您没有配置环境变量，请在此处用您的API Key进行替换
)


In [2]:
part=1
dataset='Grocery_and_Gourmet_Food'
phase='test'

In [None]:

# test.jsonl 是一个本地示例文件，purpose必须是batch
file_object = client.files.create(file=Path(f"{dataset}_random_{phase}_part{part}.jsonl"), purpose="batch")

print(file_object.model_dump_json())
print(file_object.id)  # 打印文件id
file_object_id=file_object.id

In [None]:
batch = client.batches.create(
    input_file_id=file_object_id,  # 上传文件返回的 id
    endpoint="/v1/chat/completions",  # 大语言模型固定填写，/v1/chat/completions
    completion_window="24h"  # 当前只支持24h，24小时未运行完会超时
)
print(batch)
print(batch.id)  # 打印Batch任务的id
batch_id=batch.id

In [None]:
#查询
batch = client.batches.retrieve('')  # 将batch_id替换为Batch任务的id
print(batch)
print(batch.error_file_id)
print(batch.output_file_id)  # 打印输出文件id
error_file_id=batch.error_file_id
output_file_id=batch.output_file_id

In [None]:
content = client.files.content(file_id=output_file_id)
# 打印结果文件内容
print(content.text)
# 保存结果文件至本地
content.write_to_file(f"{dataset}_random_{phase}_part{part}_result.jsonl")

In [None]:
import json
import re

def clean_json_content(content):
    """
    Clean the JSON-like content by removing trailing commas in all nested structures.
    """
    # Remove trailing commas in dictionaries or lists
    cleaned_content = re.sub(r',\s*$', '', content.strip())
    return cleaned_content

def process_jsonl_file_with_trailing_comma_fix(file_path):
    custom_id_to_content = {}
    failed_custom_ids = []  # To store custom_ids for all failed parsing attempts

    # Regex patterns to extract required fields
    patterns = {
        "user_preferences": r'"user_preferences"\s*:\s*"(.*?)"',
        "candidate_perception": r'"candidate_perception"\s*:\s*\{(.*?)\}'
    }

    def parse_dict_content(content, custom_id, field_name):
        """
        Parse and clean dictionary-like content.
        """
        cleaned_content = clean_json_content(content)
        try:
            return json.loads("{" + cleaned_content + "}")
        except json.JSONDecodeError as e:
            print(f"字段内出错 {field_name} (custom_id: {custom_id}): {content}")
            return {}

    def is_valid_extraction(content_dict):
        """
        Validate if all required fields are present and non-empty in the content dictionary.
        """
        required_fields = ["user_preferences", "candidate_perception"]
        return all(field in content_dict and content_dict[field] for field in required_fields)

    # Open the file and read line by line
    with open(file_path, 'r', encoding='utf-8') as file:
        for line_number, line in enumerate(file, start=1):
            try:
                # Parse the line as JSON
                data = json.loads(line)
                custom_id = data.get('custom_id')
                content = data.get('response', {}).get('body', {}).get('choices', [])[0].get('message', {}).get('content')

                if not custom_id:
                    print(f"Line {line_number}: Missing custom_id.")
                    failed_custom_ids.append(None)
                    continue

                if not content:
                    print(f"Line {line_number}: Missing content for custom_id {custom_id}.")
                    failed_custom_ids.append(custom_id)
                    continue

                try:
                    # Try parsing content as JSON directly
                    content_json = json.loads(content)
                    if is_valid_extraction(content_json):
                        custom_id_to_content[custom_id] = {
                            "user_preferences": content_json.get("user_preferences", ""),
                            "candidate_perception": content_json.get("candidate_perception", {})
                        }
                    else:
                        print(f"一次解析成功了，但是缺字段 {custom_id}.")
                        failed_custom_ids.append(custom_id)

                except json.JSONDecodeError:
                    # Fall back to regex extraction and cleaning
                    extracted_content = {}
                    for field, pattern in patterns.items():
                        match = re.search(pattern, content, re.DOTALL)
                        if match:
                            if field == "user_preferences":
                                extracted_content[field] = match.group(1)
                            else:
                                extracted_content[field] = parse_dict_content(match.group(1), custom_id, field)
                        else:
                            print(f"正则表达式找不到字段: Missing or invalid {field} for custom_id {custom_id}.")
                            failed_custom_ids.append(custom_id)

                    if is_valid_extraction(extracted_content):
                        custom_id_to_content[custom_id] = extracted_content
                    else:
                        print(f"Line {line_number}: Extracted content invalid for custom_id {custom_id}.")
                        failed_custom_ids.append(custom_id)

            except json.JSONDecodeError as e:
                print(f"Line {line_number}: Error decoding JSON: {e}")
                failed_custom_ids.append(custom_id if 'custom_id' in locals() else None)

    # Output all unique failed custom_ids
    unique_failed_custom_ids = list(set(failed_custom_ids))
    print("\nFailed custom_ids (unique):")
    print(unique_failed_custom_ids)

    return custom_id_to_content,unique_failed_custom_ids

# Example usage: Replace with actual dataset, phase, and part variables
result,unique_failed_custom_ids = process_jsonl_file_with_trailing_comma_fix(f"{dataset}_random_{phase}_part{part}_result.jsonl")
# print(result)


In [None]:
len(result)

In [None]:
import json

def remove_failed_ids_from_result(ids, result_file):
    """
    Remove records from the result JSONL file that match the given custom IDs
    and overwrite the file with the remaining data.

    :param ids: List of custom IDs to remove.
    :param result_file: Path to the result JSONL file.
    """
    remaining_records = []

    # Read the result file and filter out matching records
    with open(result_file, 'r', encoding='utf-8') as infile:
        for line in infile:
            try:
                data = json.loads(line)
                custom_id = data.get('custom_id')
                # Keep records that are not in the ids list
                if custom_id not in ids:
                    remaining_records.append(data)
            except json.JSONDecodeError as e:
                print(f"Error decoding line: {line.strip()}. Error: {e}")

    # Overwrite the result file with the remaining records
    with open(result_file, 'w', encoding='utf-8') as outfile:
        for record in remaining_records:
            outfile.write(json.dumps(record, ensure_ascii=False) + '\n')

    print(f"Updated {result_file} with {len(remaining_records)} remaining records, removed {len(ids)} records.")

# Example usage
failed_custom_ids = unique_failed_custom_ids  # Replace with actual list of failed custom IDs
result_file = f"{dataset}_random_{phase}_part{part}_result.jsonl"  # Replace with actual result file path
remove_failed_ids_from_result(failed_custom_ids, result_file)


In [None]:
import json

def extract_failed_custom_ids(failed_custom_ids, input_file, output_file):
    """
    Extract records from the input JSONL file that match the failed custom IDs
    and write them to a new file.

    :param failed_custom_ids: List of custom IDs to extract.
    :param input_file: Path to the input JSONL file.
    :param output_file: Path to the output JSONL file.
    """
    failed_records = []

    # Open the input file and read line by line
    with open(input_file, 'r', encoding='utf-8') as infile:
        for line in infile:
            try:
                # Parse the line as JSON
                data = json.loads(line)
                custom_id = data.get('custom_id')

                # Check if the custom_id is in the failed_custom_ids list
                if custom_id in failed_custom_ids:
                    failed_records.append(data)

            except json.JSONDecodeError as e:
                print(f"Error decoding line: {line.strip()}. Error: {e}")

    # Write the extracted records to the output file
    with open(output_file, 'w', encoding='utf-8') as outfile:
        for record in failed_records:
            outfile.write(json.dumps(record, ensure_ascii=False) + '\n')

    print(f"Extracted {len(failed_records)} records to {output_file}.")

# Example usage
failed_custom_ids = unique_failed_custom_ids # Replace with actual list of failed custom IDs
input_file = f"{dataset}_random_{phase}_part{part}.jsonl"  # Replace with actual file path
output_file = f"{dataset}_random_{phase}_part{part}_.jsonl"  # Replace with desired output file path
extract_failed_custom_ids(failed_custom_ids, input_file, output_file)

In [None]:

# test.jsonl 是一个本地示例文件，purpose必须是batch
file_object = client.files.create(file=Path(f"{dataset}_random_{phase}_part{part}_.jsonl"), purpose="batch")

print(file_object.model_dump_json())
print(file_object.id)  # 打印文件id
file_object_id=file_object.id

In [None]:
batch = client.batches.create(
    input_file_id=file_object_id,  # 上传文件返回的 id
    endpoint="/v1/chat/completions",  # 大语言模型固定填写，/v1/chat/completions
    completion_window="24h"  # 当前只支持24h，24小时未运行完会超时
)
print(batch)
print(batch.id)  # 打印Batch任务的id
batch_id=batch.id

In [None]:
#查询
batch = client.batches.retrieve('')  # 将batch_id替换为Batch任务的id
print(batch)
print(batch.error_file_id)
print(batch.output_file_id)  # 打印输出文件id
error_file_id=batch.error_file_id
output_file_id=batch.output_file_id

In [None]:
content = client.files.content(file_id=output_file_id)
# 打印结果文件内容
print(content.text)
# 保存结果文件至本地
content.write_to_file(f"{dataset}_random_{phase}_part{part}_result_.jsonl")

In [None]:
len(result)

In [33]:
import pickle
with open(f'{dataset}_{phase}.pkl', 'wb') as file:
        pickle.dump(result, file)