# 基础调用示例

## OpenAI调用示例

In [None]:
from openai import OpenAI

# 填入你的实际 API Key
client = OpenAI(api_key="<OpenAI API Key>", base_url="https://api.openai.com/v1")

prompt = "You are a helpful assistant"
input_text = "What is the capital of France?"
completion = client.chat.completions.create(
    model="gpt-4o",
    messages=[
    {"role": "system","content": prompt},
    {
        "role": "user",
        "content": input_text
    }
    ],
    temperature=1,
    max_tokens=2048,  # 修改为 max_tokens
    top_p=1,
    frequency_penalty=0,
    presence_penalty=0
)

# 打印响应内容
print(completion.choices[0].message.content)


The capital of France is Paris.


## DeepSeek调用示例

In [None]:
from openai import OpenAI

# 填入你的实际 API Key
client = OpenAI(api_key="<DeepSeek API Key>", base_url="https://api.deepseek.com")

prompt = "你是一名文本情绪分析专家"
input_text = "请你判断这段股民评论属于：积极、消极还是相对中性？#留言：顶王粉丝光荣站岗"
completion = client.chat.completions.create(
    model="deepseek-reasoner",
    messages=[
    {"role": "system","content": prompt},
    {
        "role": "user",
        "content": input_text
    }
    ],
    temperature=1,
    max_tokens=2048,  # 修改为 max_tokens
    top_p=1,
    frequency_penalty=0,
    presence_penalty=0
)

# 打印响应内容
print(completion.choices[0].message.content)


这段评论中的“顶王粉丝光荣站岗”属于**消极**情绪。

**分析依据**：
1. **"站岗"**：在股市用语中常指投资者在高位买入股票后股价下跌，被迫长期持有（类似哨兵站岗无法离开），带有套牢的负面含义。
2. **"光荣"**：此处为反语，表面褒义实则表达自嘲，暗示被迫坚持持有亏损头寸的无奈。
3. **整体语境**：结合股民社区常见表达习惯，该评论通过调侃方式反映对持仓亏损的不满，情绪偏向消极。


# 多线程调用示例

### 本代码默认要求输出格式为json格式的词典：json\n{\n  \"economic_growth_goal\": \"7.5%\"\n}
### 可根据需求修改JSON解析模块以适应不同情况

In [2]:
import pandas as pd
import os
import json
import time
import logging
from datetime import datetime, timedelta
from concurrent.futures import ThreadPoolExecutor
from tqdm import tqdm
from openai import OpenAI

# -------------------------------
# JSON解析模块（独立模块）
# -------------------------------
def default_json_parser(content, idx=None):
    """
    默认的 JSON 解析器：
    清理输入内容后尝试解析 JSON，
    若成功则返回完整的字典，若失败返回空字典。
    """
    try:
        # 去除代码块标记，清理内容
        cleaned_content = content.replace('```json\n', '').replace('```', '').strip()
        parsed_result = json.loads(cleaned_content)
        return parsed_result
    except json.JSONDecodeError:
        if idx is not None:
            logging.warning(f"警告: 第 {idx} 行解析 JSON 失败")
        return {}
    except Exception as e:
        if idx is not None:
            logging.error(f"错误: 第 {idx} 行解析失败 - {str(e)}")
        return {}

# -------------------------------
# 限流处理器（控制请求频率）
# -------------------------------
class RateLimitedProcessor:
    def __init__(self):
        self.request_timestamps = []
        self.MAX_RPM = 500
        self.window_size = 60  # 60秒窗口

    def _clean_old_records(self, current_time):
        cutoff_time = current_time - timedelta(seconds=self.window_size)
        self.request_timestamps = [ts for ts in self.request_timestamps if ts > cutoff_time]

    def can_make_request(self):
        """检查是否可以发起新请求"""
        current_time = datetime.now()
        self._clean_old_records(current_time)
        if len(self.request_timestamps) >= self.MAX_RPM:
            return False
        self.request_timestamps.append(current_time)
        return True

# -------------------------------
# OpenAI文本处理器
# -------------------------------
class OpenAITextProcessor:
    def __init__(self, api_key=None, model=None, base_url=None, json_parser=None):
        self.client = OpenAI(api_key=api_key,base_url=base_url)
        self.model = model
        self.rate_limiter = RateLimitedProcessor()
        self.n_workers = 14  # 优化后的线程数
        # 如果未提供自定义解析器，则使用默认解析器
        self.json_parser = json_parser if json_parser is not None else default_json_parser

    def process_batch(self, df, text_column, prompt, batch_size=20, delay=1, json_parser=None):
        """
        批量处理文本，支持灵活的 JSON 解析。
        
        参数:
            df: 包含文本数据的 DataFrame
            text_column: 文本所在的列名
            prompt: 系统提示，用于 API 调用
            batch_size: 每个批次处理的文本条数
            delay: 每次请求后的延迟（秒）
            json_parser: 可选的自定义 JSON 解析器，若不传入则使用实例内的解析器
        
        返回:
            新的 DataFrame，包含原始数据及 API 返回结果（通过 JSON 解析获得的各字段）
        """
        parser = json_parser if json_parser is not None else self.json_parser
        results = []  # 保存每次请求解析后的结果（字典形式）

        def process_chunk(chunk_data):
            chunk_results = []
            for idx, text in chunk_data:
                # 限流检测：等待直到可以发送请求
                while not self.rate_limiter.can_make_request():
                    time.sleep(0.1)
                try:
                    response = self.client.chat.completions.create(
                        model=self.model,
                        messages=[
                            {"role": "system", "content": prompt},
                            {"role": "user", "content": text}
                        ],
                        temperature=0,
                        max_tokens=40
                    )
                    # 使用解析器处理响应内容，得到字典格式结果
                    parsed_result = parser(response.choices[0].message.content, idx)
                    chunk_results.append(parsed_result)
                    time.sleep(delay)
                except Exception as e:
                    logging.error(f"错误: 处理第 {idx} 行时发生异常: {str(e)}")
                    chunk_results.append({})
            return chunk_results

        # 将数据分成批次，保留行号信息
        chunks = [
            list(enumerate(df[text_column][i:i+batch_size]))
            for i in range(0, len(df), batch_size)
        ]

        with ThreadPoolExecutor(max_workers=self.n_workers) as executor:
            futures = list(tqdm(
                executor.map(process_chunk, chunks),
                total=len(chunks),
                desc="Processing batches"
            ))
            for chunk_results in futures:
                results.extend(chunk_results)

        # 将解析结果列表转为 DataFrame，并与原 DataFrame 合并
        df_result = df.copy().reset_index(drop=True)
        results_df = pd.json_normalize(results)
        df_result = pd.concat([df_result, results_df], axis=1)

        # 统计处理情况
        success_count = sum(1 for r in results if r)
        total_count = len(results)
        success_rate = (success_count / total_count) * 100 if total_count > 0 else 0
        logging.info(f"处理完成: 总数 {total_count}, 成功 {success_count}, 成功率 {success_rate:.2f}%")
        
        return df_result


In [None]:
prompt= "You are a helpful assistant."

In [None]:
processor = OpenAITextProcessor(api_key="<OpenAI API Key>", base_url="https://api.openai.com/v1",model="gpt-4o-mini")
df_result = processor.process_batch(
    df=pd.read_csv(""),#读取待处理的csv文件
    text_column="",#待处理的列名
    prompt=prompt,#提示词
    batch_size=5,#批处理大小
)
df_result.to_csv("", index=False)

Processing batches: 100%|██████████| 20/20 [00:24<00:00,  1.24s/it]


# 实际处理案例

## 构建规范表格

In [14]:
import pandas as pd
import os

def read_txt_files(base_path='data/政府工作报告'):
    # 创建空列表来存储数据
    data = []
    
    # 遍历城市文件夹
    for city in os.listdir(base_path):
        city_path = os.path.join(base_path, city)
        
        # 确保是目录
        if os.path.isdir(city_path):
            # 遍历年份文件
            for file_name in os.listdir(city_path):
                if file_name.endswith('.txt'):
                    # 获取年份（去掉.txt后缀）
                    year = file_name.replace('.txt', '')
                    
                    # 构建完整的文件路径
                    file_path = os.path.join(city_path, file_name)
                    
                    try:
                        # 读取文件内容
                        with open(file_path, 'r', encoding='utf-8') as f:
                            content = f.read()
                            
                        # 将数据添加到列表中
                        data.append({
                            '城市': city,
                            '年份': year,
                            '文本内容': content
                        })
                    except Exception as e:
                        print(f"读取文件 {file_path} 时出错: {str(e)}")
    
    # 创建DataFrame
    df = pd.DataFrame(data)
    
    return df

# 使用函数
try:
    df = read_txt_files()
    print("DataFrame创建成功！")
    print("\n数据预览：")
    print(df.head())
    
except Exception as e:
    print(f"处理过程中出错: {str(e)}")


DataFrame创建成功！

数据预览：
    城市    年份                                               文本内容
0  上海市  2016           \n\t上海市市长杨雄\n\n\t各位代表：\n\n\t　　现在，我代表上...
1  上海市  2017           \n\t上海市市长杨雄\n\n\t各位代表：\n\n\t　　现在，我代表上...
2  上海市  2015  \n\t——2015年1月25日在上海市第十四届人民代表大会第三次会议上\n\n\t上海市市...
3  上海市  2014          \n\t　　各位代表：\n\n\t　　现在，我代表上海市人民政府，向大会作政...
4  上海市  2010  \n\n                            \n\n          ...


## 调用大语言模型批量处理

In [6]:
import pandas as pd
import os
import json
import time
import logging
from datetime import datetime, timedelta
from concurrent.futures import ThreadPoolExecutor
from tqdm import tqdm
from openai import OpenAI

# -------------------------------
# JSON解析模块（独立模块）
# -------------------------------
def default_json_parser(content, idx=None):
    """
    默认的 JSON 解析器：
    清理输入内容后尝试解析 JSON，
    若成功则返回完整的字典，若失败返回空字典。
    """
    try:
        # 去除代码块标记，清理内容
        cleaned_content = content.replace('```json\n', '').replace('```', '').strip()
        parsed_result = json.loads(cleaned_content)
        return parsed_result
    except json.JSONDecodeError:
        if idx is not None:
            logging.warning(f"警告: 第 {idx} 行解析 JSON 失败")
        return {}
    except Exception as e:
        if idx is not None:
            logging.error(f"错误: 第 {idx} 行解析失败 - {str(e)}")
        return {}

# -------------------------------
# 限流处理器（控制请求频率）
# -------------------------------
class RateLimitedProcessor:
    def __init__(self):
        self.request_timestamps = []
        self.MAX_RPM = 500
        self.window_size = 60  # 60秒窗口

    def _clean_old_records(self, current_time):
        cutoff_time = current_time - timedelta(seconds=self.window_size)
        self.request_timestamps = [ts for ts in self.request_timestamps if ts > cutoff_time]

    def can_make_request(self):
        """检查是否可以发起新请求"""
        current_time = datetime.now()
        self._clean_old_records(current_time)
        if len(self.request_timestamps) >= self.MAX_RPM:
            return False
        self.request_timestamps.append(current_time)
        return True

# -------------------------------
# OpenAI文本处理器
# -------------------------------
class OpenAITextProcessor:
    def __init__(self, api_key=None, model=None, base_url=None, json_parser=None):
        self.client = OpenAI(api_key=api_key,base_url=base_url)
        self.model = model
        self.rate_limiter = RateLimitedProcessor()
        self.n_workers = 14  # 优化后的线程数
        # 如果未提供自定义解析器，则使用默认解析器
        self.json_parser = json_parser if json_parser is not None else default_json_parser

    def process_batch(self, df, text_column, prompt, batch_size=20, delay=1, json_parser=None):
        """
        批量处理文本，支持灵活的 JSON 解析。
        
        参数:
            df: 包含文本数据的 DataFrame
            text_column: 文本所在的列名
            prompt: 系统提示，用于 API 调用
            batch_size: 每个批次处理的文本条数
            delay: 每次请求后的延迟（秒）
            json_parser: 可选的自定义 JSON 解析器，若不传入则使用实例内的解析器
        
        返回:
            新的 DataFrame，包含原始数据及 API 返回结果（通过 JSON 解析获得的各字段）
        """
        parser = json_parser if json_parser is not None else self.json_parser
        results = []  # 保存每次请求解析后的结果（字典形式）

        def process_chunk(chunk_data):
            chunk_results = []
            for idx, text in chunk_data:
                # 限流检测：等待直到可以发送请求
                while not self.rate_limiter.can_make_request():
                    time.sleep(0.1)
                try:
                    response = self.client.chat.completions.create(
                        model=self.model,
                        messages=[
                            {"role": "system", "content": prompt},
                            {"role": "user", "content": text}
                        ],
                        temperature=0,
                        max_tokens=40
                    )
                    # 使用解析器处理响应内容，得到字典格式结果
                    parsed_result = parser(response.choices[0].message.content, idx)
                    chunk_results.append(parsed_result)
                    time.sleep(delay)
                except Exception as e:
                    logging.error(f"错误: 处理第 {idx} 行时发生异常: {str(e)}")
                    chunk_results.append({})
            return chunk_results

        # 将数据分成批次，保留行号信息
        chunks = [
            list(enumerate(df[text_column][i:i+batch_size]))
            for i in range(0, len(df), batch_size)
        ]

        with ThreadPoolExecutor(max_workers=self.n_workers) as executor:
            futures = list(tqdm(
                executor.map(process_chunk, chunks),
                total=len(chunks),
                desc="Processing batches"
            ))
            for chunk_results in futures:
                results.extend(chunk_results)

        # 将解析结果列表转为 DataFrame，并与原 DataFrame 合并
        df_result = df.copy().reset_index(drop=True)
        results_df = pd.json_normalize(results)
        df_result = pd.concat([df_result, results_df], axis=1)

        # 统计处理情况
        success_count = sum(1 for r in results if r)
        total_count = len(results)
        success_rate = (success_count / total_count) * 100 if total_count > 0 else 0
        logging.info(f"处理完成: 总数 {total_count}, 成功 {success_count}, 成功率 {success_rate:.2f}%")
        
        return df_result


In [7]:
prompt= "从政府采购文本中识别该市当年经济增长目标，并将其以规范的JSON格式输出。确保信息准确无误，并正确识别该政府的经济增长目标。\n\n# Steps\n\n1. 阅读并理解给定的政府采购文本。\n2. 查找与该市当年经济增长目标相关的信息。\n3. 提取相关信息，确保其准确反映文本中的内容。\n4. 使用规范的JSON格式输出信息。\n\n# Output Format\n\n输出应为JSON格式，包含以下结构：\n\n```json\n{\n  \"economic_growth_goal\": \"[经济增长目标百分比]\"\n}\n```\n\n# Examples\n\n**Input:** \n政府采购文本的部分内容为：“今年本市的经济增长目标为7.5%。”\n\n**Output:**\n\n```json\n{\n  \"economic_growth_goal\": \"7.5%\"\n}\n```\n#Notes\n1. 增长目标以数字+%的形式输出"

In [None]:
processor = OpenAITextProcessor(api_key="<DeepSeek API Key>", base_url="https://api.deepseek.com",model="deepseek-chat")
df_result = processor.process_batch(
    df=df,
    text_column="文本内容",
    prompt=prompt,
    batch_size=5,
)
df_result.to_csv("data/提取结果.csv", index=False)

Processing batches: 100%|██████████| 4/4 [02:31<00:00, 37.86s/it]
