# 会话数据清洗及分析
* 参考示例数据结构

## 1.准备数据

In [1]:
import pymongo
# MongoDB连接配置
mongo_client = pymongo.MongoClient("mongodb://localhost:27017/")
convoinsight_db = mongo_client["convoinsight-danghuan"]
conversations_collection = convoinsight_db["conversations"]

In [2]:
import pandas as pd
data = pd.read_excel("./danghuan.xlsx")

In [3]:
data.head()

Unnamed: 0,会话ID,问题类型,会话详情内容
0,7bc5c9f41f3c44b3a6685b66e47a8d02,下单问题-下单流程咨询,江苏/连云港0630-7992 2025-06-30 19:57:33_x000D_\n我这...
1,058a702463334c4a8053ab3063d7a19a,下单问题,金梦爱 2025-06-30 18:46:51_x000D_\n您好呀！我是小当回收客服欧杰...
2,fccd2d17776144cfbfc6bbd9323393ca,下单问题,宁夏/固原0630-3132 2025-06-30 18:41:57_x000D_\n你好，...
3,347696d5ae12442a82c2465809d72257,下单问题-物流预约,重庆0630-3059 2025-06-30 18:34:21_x000D_\n好的_x00...
4,afd4901fdbc84f39b25b128b66522817,下单问题,湖南/娄底0630-6277 2025-06-30 18:33:50_x000D_\n旧机子...


In [4]:
data['会话详情内容'][0]

'江苏/连云港0630-7992 2025-06-30 19:57:33_x000D_\n我这个旧手机可以再贵一点吗_x000D_\n_x000D_\ncoco 2025-06-30 19:57:42_x000D_\n您好呀！我是小当回收客服coco，很高兴能为您服务！_x000D_\n_x000D_\ncoco 2025-06-30 19:57:49_x000D_\n理解您的心情，二手手机行业与新机销售的市场类型不同，价格体系不同。_x000D_\n_x000D_\ncoco 2025-06-30 19:58:01_x000D_\n如果是已经提交了回收订单，辛苦提供回收订单号或者下单的手机号码，客服为您查询_x000D_\n_x000D_\n'

## 2. 调用代理

In [5]:
import os
# Set the proxy URL and port
proxy_url = 'http://127.0.0.1'
proxy_port = "6465" # !!!please replace it with your own port

# Set the http_proxy and https_proxy environment variables
os.environ['http_proxy'] = f'{proxy_url}:{proxy_port}'
os.environ['https_proxy'] = f'{proxy_url}:{proxy_port}'

## 3. 单条数据:抽取会话内容的基础信息

In [6]:
def system_prompt_for_info_extraction():
    prompt = """请将以下客服会话数据转换成结构化的JSON格式。根据对话内容提取相关信息，并按照以下结构组织：

1. 生成唯一会话ID（如果原始数据中没有明确ID，可使用时间戳或随机字符串）
2. 确定客服人员姓名
3. 提取客户信息（手机号或用户ID）
4. 分析互动情况（总消息数、客服消息数、客户消息数）
5. 按顺序整理所有消息，包括:
   - 消息类型（系统/客服/客户）
   - 消息内容
   - 发送者
   - 对客户消息添加情感分析（正向/中立/负向）

请使用以下JSON结构：

{
  "agent": "客服姓名",
  "customerInfo": {
    "userId": "客户ID或手机号",
    "device": "相关设备信息（如有）",
    "history": "客户历史信息（如有）"
  },
  "interactionAnalysis": {
    "totalMessages": 消息总数,
    "agentMessages": 客服消息数,
    "userMessages": 客户消息数
  },
  "messages": [
    {
      "type": "消息类型（system/agent/user）",
      "content": "消息内容",
      "sender": "发送者名称/ID",
      "sentiment": "情感分析（仅用户消息需要）"
    },
    // 更多消息...
  ]
}    
    """
    return prompt

def user_prompt_for_info_extraction(conversations_string):
    prompt = """# 原始会话数据：
{conversations_string}
请根据上述要求将原始会话数据转换为结构化的JSON格式。
    """.format(conversations_string = conversations_string)
    return prompt

In [7]:
print(user_prompt_for_info_extraction("test"))

# 原始会话数据：
test
请根据上述要求将原始会话数据转换为结构化的JSON格式。
    


In [8]:
from openai import OpenAI
import os
# client = OpenAI(api_key=os.getenv('DEEPSEEK_API_KEY'), base_url="https://api.deepseek.com")
client = OpenAI(api_key=os.getenv('OPEN_AI_KEY'))
model_name = "gpt-4o-mini"

In [9]:
import uuid
from datetime import datetime
def custom_length_uuid(length=8):
    """Generate a UUID of custom length (minimum recommended: 8)"""
    # Generate a random UUID and remove hyphens
    u = str(uuid.uuid4()).replace('-', '')
    # Return a substring of the desired length
    return "#"+u[:length]


In [10]:
custom_length_uuid()

'#0140da98'

In [11]:
import json
def process_single_conversation_extract(conversation:str,conversation_id:str,test_mode:bool,model="gpt-4o-mini"):
    """
    处理单条评论数据。

    参数:
        conversation: 客服会话对话信息
        test_mode: 是否为测试模式,布尔类型
        model:处理模型版本

    返回:
        Dict[str, Any]: 处理结果字典
    """
    
    try:
        # 准备提示文本
        system_prompt = system_prompt_for_info_extraction()
        user_prompt = user_prompt_for_info_extraction(conversations_string = conversation)

        # 调用模型获取结果
        response = client.chat.completions.create(
            model = model,
            messages=[
                {"role": "system", "content": system_prompt},
                {"role": "user", "content": user_prompt },
            ],
            stream=False,
                response_format={
                'type': 'json_object'
            }
        )

        # 获取 token 消耗数量
        token_usage = response.usage.total_tokens if hasattr(response, 'usage') else None

        try:
            content = response.choices[0].message.content
            parsed_content = json.loads(content)  # 解析内容
            # print(parsed_content)
            result_doc = parsed_content
            result_doc['origin_conversation'] = conversation
            result_doc['token_usage'] = token_usage
            result_doc['id'] = conversation_id
            result_doc['time'] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        except json.JSONDecodeError as e:
            print(f"JSON 解析失败: {e}")
        except AttributeError as e:
            print(f"属性访问失败: {e}")
        except Exception as e:
            print(f"处理过程中发生错误: {e}")
        
        if test_mode:
            return result_doc
        else:
            conversations_collection.insert_one(result_doc)
            print(f"已插入一条数据到conversations集合")
            return result_doc

    except Exception as e:
        # 记录错误信息
        result_doc = {
            'error': str(e),
            "id":conversation_id,
            'processed_at': datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
            'status': 'failed',
            "model": model,
        }
        return result_doc

In [27]:
response = process_single_conversation_extract(conversation=data['会话详情内容'][0],conversation_id="#001",test_mode=False)

已插入一条数据到conversations集合


In [28]:
response

{'agent': 'coco',
 'customerInfo': {'userId': '0630-7992', 'device': '旧手机', 'history': '无'},
 'interactionAnalysis': {'totalMessages': 5,
  'agentMessages': 3,
  'userMessages': 2},
 'messages': [{'type': 'user',
   'content': '我这个旧手机可以再贵一点吗',
   'sender': '0630-7992',
   'sentiment': '中立'},
  {'type': 'agent',
   'content': '您好呀！我是小当回收客服coco，很高兴能为您服务！',
   'sender': 'coco',
   'sentiment': None},
  {'type': 'agent',
   'content': '理解您的心情，二手手机行业与新机销售的市场类型不同，价格体系不同。',
   'sender': 'coco',
   'sentiment': None},
  {'type': 'agent',
   'content': '如果是已经提交了回收订单，辛苦提供回收订单号或者下单的手机号码，客服为您查询',
   'sender': 'coco',
   'sentiment': None}],
 'origin_conversation': '江苏/连云港0630-7992 2025-06-30 19:57:33_x000D_\n我这个旧手机可以再贵一点吗_x000D_\n_x000D_\ncoco 2025-06-30 19:57:42_x000D_\n您好呀！我是小当回收客服coco，很高兴能为您服务！_x000D_\n_x000D_\ncoco 2025-06-30 19:57:49_x000D_\n理解您的心情，二手手机行业与新机销售的市场类型不同，价格体系不同。_x000D_\n_x000D_\ncoco 2025-06-30 19:58:01_x000D_\n如果是已经提交了回收订单，辛苦提供回收订单号或者下单的手机号码，客服为您查询_x000D_\n_x000D_\n',
 'token_us

## 4. 单条数据：抽取会话内容的诊断信息

In [29]:
from datetime import datetime

In [30]:
def system_prompt_for_info_update():
    prompt = """# 客服对话结构化分析
你是一个有着十年专业经验的客服总监，现在需要基于客户对话进行会话内容的分析和诊断
## 任务描述
分析以下客服与客户的对话内容，将其转化为结构化数据，包括:
1. 客户满意度、问题解决程度、客服态度和安全风险评估等关键指标
2. 对话摘要和主要问题识别
3. 问题解决情况及解决方案
4. 相关标签和关键词提取
5. 客服改进建议

## 分析指南
1. **对话指标评估**:
   - 客户满意度(0-100): 根据客户回应的积极程度、问题解决情况评估
   - 问题解决程度(0-100): 根据客户问题是否得到全面解答评估
   - 客服态度(0-100): 根据客服回应的礼貌度、专业性、响应速度评估
   - 安全评估(0-100): 分析对话中可能存在的安全隐患，数值越高代表越安全
2. **对话摘要**:
   - 提取对话的主要主题和要点，用1-2句话概括
3. **对话详细分析**:
   - 识别客户的主要问题
   - 确定问题解决状态(已解决/部分解决/未解决/无法解决)
   - 概括客服提供的解决方案
4. **标签和关键词**:
   - 标签：根据对话内容提取最匹配的相关标签
   - 关键词：识别客户对话中出现的热门词汇或产品名称
5. **改进建议**:
   - 根据对话分析，提供2-4条客服可以改进的具体建议

## 输出格式
请按以下JSON格式输出分析结果:

```json
{
  "metrics": {
    "satisfaction": { "value": 0-100 },
    "resolution": { "value": 0-100 },
    "attitude": { "value": 0-100 },
    "security": { "value": 0-100 }
  },
  "summary": "一句话总结对话内容",
  "conversationSummary": {
    "mainIssue": "客户主要问题",
    "resolutionStatus": {
      "status": "已解决/部分解决/未解决/无法解决",
      "description": "解决方案描述"
    },
    "mainSolution": "客服提供的主要解决方案"
  },
  "tags": ["标签1"],
  "improvementSuggestions": [
    "改进建议1",
    "改进建议2",
    "改进建议3"
  ],
  "hotWords": ["关键词1", "关键词2", "关键词3"]
}
```
## 特别注意事项
1. 所有评分必须基于客观事实，避免主观臆断
2. 标签应精准反映对话主题和产品类别
3. 改进建议应具体、可操作且有建设性
4. 如对话未明确表明某信息，应做合理推断并注明
    """
    return prompt

def user_prompt_for_info_update(conversations_string):
    prompt = """# 原始会话数据：
{conversations_string}
    """.format(conversations_string = conversations_string)
    return prompt

In [31]:
problem_tags_list = """产品使用咨询>功能使用
产品使用咨询>相机操作咨询
产品使用咨询>Insta360 APP咨询
产品使用咨询>Studio 咨询
产品使用咨询>教程需求咨询
产品使用咨询>配件使用
产品使用咨询>云服务使用咨询
产品使用咨询>Connect使用咨询
产品使用咨询>其他产品使用咨询
相机故障问题>镜头外观不良
相机故障问题>镜头功能类不良
相机故障问题>屏外观不良
相机故障问题>屏功能不良
相机故障问题>按键外观/功能不良
相机故障问题>电池外观(功能)不良
相机故障问题>USB盖/喇叭盖/其他盖子损坏
相机故障问题>机身外观不良
相机故障问题>进液
相机故障问题>充电类不良
相机故障问题>发热
相机故障问题>拼接类不良
相机故障问题>Wifi连接类不良
相机故障问题>蓝牙连接不良
相机故障问题>有线连接类不良
相机故障问题>声音类不良
相机故障问题>图像类不良（预览&成片）
相机故障问题>防抖类异常
相机故障问题>死机
相机故障问题>开关机类不良
相机故障问题>升级异常
相机故障问题>续航异常
相机故障问题>相机录制异常/停止录像
相机故障问题>相机文件异常
相机故障问题>内存卡相关异常
相机故障问题>直播异常-专业级
相机故障问题>其他故障
"""

In [32]:
def validate_data_structure(data):
    """验证数据结构是否符合预期"""
    required_keys = {
        "metrics": dict,
        "summary": str,
        "conversationSummary": dict,
        "tags": list,
        "improvementSuggestions": list,
        "hotWords": list
    }
    
    for key, expected_type in required_keys.items():
        if key not in data or not isinstance(data[key], expected_type):
            return False
    
    # 进一步验证嵌套结构
    metrics_keys = {"satisfaction", "resolution", "attitude", "security"}
    if not all(k in data["metrics"] for k in metrics_keys):
        return False
    
    conversation_summary_keys = {"mainIssue", "resolutionStatus", "mainSolution"}
    if not all(k in data["conversationSummary"] for k in conversation_summary_keys):
        return False
    
    resolution_status_keys = {"status", "description"}
    if not all(k in data["conversationSummary"]["resolutionStatus"] for k in resolution_status_keys):
        return False
    
    return True

def update_conversation(conversation_id, new_data):
    # 验证数据结构
    if not validate_data_structure(new_data):
        print("数据结构不符合预期，更新操作已取消。")
        return None
    
    # 查找现有文档
    existing_doc = conversations_collection.find_one({'id': conversation_id})
    
    # 准备更新数据
    update_data = new_data.copy()
    
    if existing_doc:
        # 更新时间为最新时间
        update_data['times'] = new_data.get('time', existing_doc.get('time'))
        # 合并 token_usage
        if 'token_usage' in new_data and 'token_usage' in existing_doc:
            update_data['token_usage'] = existing_doc['token_usage'] + new_data['token_usage']

        # 合并其他字段，保留现有字段，添加新字段
        for key in existing_doc:
            if key not in update_data and key != '_id':
                update_data[key] = existing_doc[key]
    
    # 执行更新操作
    result = conversations_collection.update_one(
        {'id': conversation_id},
        {'$set': update_data},
        upsert=True  # 如果文档不存在，则创建新文档
    )
    print(f"已更新一条数据到conversations集合")
    return result

In [33]:
def process_single_conversation_update(conversation:str,conversation_id:str,test_mode:bool,model="gpt-4o-mini"):
    """
    处理单条评论数据。

    参数:
        conversation: 客服会话对话信息
        test_mode: 是否为测试模式,布尔类型
        model:处理模型版本

    返回:
        Dict[str, Any]: 处理结果字典
    """
    
    try:
        # 准备提示文本
        system_prompt = system_prompt_for_info_update()
        user_prompt = user_prompt_for_info_update(conversations_string = conversation)

        # 调用模型获取结果
        response = client.chat.completions.create(
            model = model,
            messages=[
                {"role": "system", "content": system_prompt},
                {"role": "user", "content": user_prompt },
            ],
            stream=False,
                response_format={
                'type': 'json_object'
            }
        )

        # 获取 token 消耗数量
        token_usage = response.usage.total_tokens if hasattr(response, 'usage') else None

        try:
            content = response.choices[0].message.content
            parsed_content = json.loads(content)  # 解析内容
            # print(parsed_content)
            result_doc = parsed_content
            result_doc['token_usage'] = token_usage
            result_doc['time'] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        except json.JSONDecodeError as e:
            print(f"JSON 解析失败: {e}")
        except AttributeError as e:
            print(f"属性访问失败: {e}")
        except Exception as e:
            print(f"处理过程中发生错误: {e}")
        
        if test_mode:
            return result_doc
        else:
            # 更新原始数据
            update_conversation(conversation_id, result_doc)
            return result_doc

    except Exception as e:
        # 记录错误信息
        result_doc = {
            'error': str(e),
            "conversation":conversation,
            'processed_at': datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
            'status': 'failed',
            "model": model,
        }
        return result_doc

In [34]:
test = dict(conversations_collection.find_one({"id": {"$exists": True},
                                                            "metrics": {"$exists": False}
                                                        }))

In [35]:
test

{'_id': ObjectId('6879b5f5d9eab0e645a9c2bf'),
 'agent': 'coco',
 'customerInfo': {'userId': '0630-7992', 'device': '旧手机', 'history': '无'},
 'interactionAnalysis': {'totalMessages': 5,
  'agentMessages': 3,
  'userMessages': 2},
 'messages': [{'type': 'user',
   'content': '我这个旧手机可以再贵一点吗',
   'sender': '0630-7992',
   'sentiment': '中立'},
  {'type': 'agent',
   'content': '您好呀！我是小当回收客服coco，很高兴能为您服务！',
   'sender': 'coco',
   'sentiment': None},
  {'type': 'agent',
   'content': '理解您的心情，二手手机行业与新机销售的市场类型不同，价格体系不同。',
   'sender': 'coco',
   'sentiment': None},
  {'type': 'agent',
   'content': '如果是已经提交了回收订单，辛苦提供回收订单号或者下单的手机号码，客服为您查询',
   'sender': 'coco',
   'sentiment': None}],
 'origin_conversation': '江苏/连云港0630-7992 2025-06-30 19:57:33_x000D_\n我这个旧手机可以再贵一点吗_x000D_\n_x000D_\ncoco 2025-06-30 19:57:42_x000D_\n您好呀！我是小当回收客服coco，很高兴能为您服务！_x000D_\n_x000D_\ncoco 2025-06-30 19:57:49_x000D_\n理解您的心情，二手手机行业与新机销售的市场类型不同，价格体系不同。_x000D_\n_x000D_\ncoco 2025-06-30 19:58:01_x000D_\n如果是已经提交了回收订单，辛苦提供回收订单号或

In [38]:
response = process_single_conversation_update(conversation=test['origin_conversation'],conversation_id=test["id"],test_mode= False,model="gpt-4o-mini")

已更新一条数据到conversations集合


In [39]:
response

{'metrics': {'satisfaction': {'value': 75},
  'resolution': {'value': 60},
  'attitude': {'value': 85},
  'security': {'value': 90}},
 'summary': '客户询问旧手机是否可以更高回收价，客服提供了相关建议。',
 'conversationSummary': {'mainIssue': '客户想知道旧手机是否可以获得更高的回收价格。',
  'resolutionStatus': {'status': '部分解决',
   'description': '客服解释了二手手机价格体系及查询要求，未能确认具体的回收价格。'},
  'mainSolution': '客服引导客户提供订单号或手机号码进行查询。'},
 'tags': ['手机回收', '二手手机', '客服咨询'],
 'improvementSuggestions': ['针对二手手机的回收价格提供更多具体信息，减少客户不确定感。',
  '在对话中主动询问客户是否有订单号，减少客户再次提问的机会。',
  '增强客服对市场行情的理解，以便更好解答价格相关的问题。'],
 'hotWords': ['旧手机', '回收价格', '二手手机'],
 'token_usage': 1182,
 'time': '2025-07-18 10:49:28'}

In [72]:
# data['id'] = [custom_length_uuid() for _ in range(len(data))]

In [73]:
# data.to_excel('./data_pro.xlsx')

## 1. 会话数据处理

In [40]:
import os
# 获取系统的 CPU 核心数
cpu_count = os.cpu_count()
default_max_workers = cpu_count * 5  # 默认线程数
# 设置新的 max_workers，增加 1.2 倍的线程数并转换为整数
new_max_workers = int(default_max_workers * 1.3)

In [41]:
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import List
from tqdm import tqdm

def multi_process_conversation_extraction(
    conversations: List[dict],  # 修改为字典列表以包含 conversation_id
    max_workers: int = 4,
    model="gpt-4o-mini",
    test_mode=True
) -> List[dict]:
    """
    并行处理评论并存储结果。

    参数:
        conversations: 包含会话数据和ID的字典列表
        max_workers: 最大线程数
        model: 使用的模型名称
        test_mode: 是否为测试模式
    返回:
        处理结果的列表
    """
    results = []
    try:
        with ThreadPoolExecutor(max_workers=max_workers) as executor:
            future_to_conversation = {
                executor.submit(
                    process_single_conversation_extract, 
                    conversation['content'], 
                    conversation['id'],  # 传递 conversation_id
                    test_mode, 
                    model
                ): conversation 
                for conversation in conversations
            }
            
            for future in tqdm(
                as_completed(future_to_conversation), 
                total=len(conversations),
                desc="处理会话"
            ):
                try:
                    result = future.result()
                    results.append(result)
                except Exception as e:
                    conversation = future_to_conversation[future]
                    print(f"处理会话 {conversation['id']} 时发生错误: {str(e)}")
                    
    except Exception as e:
        print(f"批处理过程中发生错误: {str(e)}")
    
    return results

In [None]:
# all_data = pd.read_excel("./data_pro.xlsx")

In [42]:
# all_data.head()
data

Unnamed: 0,会话ID,问题类型,会话详情内容
0,7bc5c9f41f3c44b3a6685b66e47a8d02,下单问题-下单流程咨询,江苏/连云港0630-7992 2025-06-30 19:57:33_x000D_\n我这...
1,058a702463334c4a8053ab3063d7a19a,下单问题,金梦爱 2025-06-30 18:46:51_x000D_\n您好呀！我是小当回收客服欧杰...
2,fccd2d17776144cfbfc6bbd9323393ca,下单问题,宁夏/固原0630-3132 2025-06-30 18:41:57_x000D_\n你好，...
3,347696d5ae12442a82c2465809d72257,下单问题-物流预约,重庆0630-3059 2025-06-30 18:34:21_x000D_\n好的_x00...
4,afd4901fdbc84f39b25b128b66522817,下单问题,湖南/娄底0630-6277 2025-06-30 18:33:50_x000D_\n旧机子...
...,...,...,...
1693,a02481b4b69a43eeb3e31c7d1efb0f3e,下单问题,江西/南昌0601-6614 2025-06-01 09:21:19_x000D_\n手机屏...
1694,7091769e214e404b9c6bf6c48b1988b6,下单问题-物流预约,云南/昆明0601-2621 2025-06-01 09:20:25_x000D_\n你好_...
1695,78e9550fd3be4966b95a7b61c9a11564,下单问题-下单流程咨询,浙江/杭州0601-5725 2025-06-01 09:18:29_x000D_\n你好_...
1696,20b8317392224743a311c9c6464e6047,下单问题-物流费用,湖南/长沙0531-1977 2025-06-01 09:02:35_x000D_\n回收手...


In [46]:
try:
    # 读取 Excel 数据
    data = pd.read_excel("./danghuan.xlsx")
    # 获取已处理的会话 ID
    processed_ids = set(doc['id'] for doc in conversations_collection.find({}, {'id': 1}))
    # 筛选未处理的会话
    unprocessed_conversations = [
        {'id': row['会话ID'], 'content': row['会话详情内容']}
        for _, row in data.iterrows()
        if row['会话ID'] not in processed_ids
    ]
    print(f"未处理的会话数量: {len(unprocessed_conversations)}")
    
    results = multi_process_conversation_extraction(unprocessed_conversations,max_workers=new_max_workers,test_mode=False)
except Exception as e:
    print(f"主程序执行错误: {str(e)}")

未处理的会话数量: 0


处理会话: 0it [00:00, ?it/s]


## 2. 会话信息更新

In [47]:
import os
# 获取系统的 CPU 核心数
cpu_count = os.cpu_count()
default_max_workers = cpu_count * 5  # 默认线程数
# 设置新的 max_workers，增加 1.2 倍的线程数并转换为整数
new_max_workers = int(default_max_workers * 1.3)

In [52]:
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import List
from tqdm import tqdm

def multi_process_conversation_update(
    conversations: List[dict],  # 修改为字典列表以包含 conversation_id
    max_workers: int = 4,
    model="gpt-4o-mini",
    test_mode=True
) -> List[dict]:
    """
    并行处理评论并存储结果。

    参数:
        conversations: 包含会话数据和ID的字典列表
        max_workers: 最大线程数
        model: 使用的模型名称
        test_mode: 是否为测试模式
    返回:
        处理结果的列表
    """
    results = []
    try:
        with ThreadPoolExecutor(max_workers=max_workers) as executor:
            future_to_conversation = {
                executor.submit(
                    process_single_conversation_update, 
                    conversation['content'], 
                    conversation['id'],  # 传递 conversation_id
                    test_mode, 
                    model
                ): conversation 
                for conversation in conversations
            }
            
            for future in tqdm(
                as_completed(future_to_conversation), 
                total=len(conversations),
                desc="处理会话"
            ):
                try:
                    result = future.result()
                    results.append(result)
                except Exception as e:
                    conversation = future_to_conversation[future]
                    print(f"处理会话 {conversation['id']} 时发生错误: {str(e)}")
                    
    except Exception as e:
        print(f"批处理过程中发生错误: {str(e)}")
    
    return results

In [53]:
try:
    # 获取已处理的会话 ID（仅包含有 metrics 字段的文档）# 筛选未处理的会话
    unprocessed_data =  conversations_collection.find({"id": {"$exists": True},
                                                            "metrics": {"$exists": False}
                                                        })
    unprocessed_conversations = []
    for row in unprocessed_data:
        unprocessed_conversations.append({'id': row['id'], 'content': row['origin_conversation']})
    print(f"未处理的会话数量: {len(unprocessed_conversations)}")
    
    results = multi_process_conversation_update(unprocessed_conversations, max_workers=new_max_workers, test_mode=False)
except Exception as e:
    print(f"主程序执行错误: {str(e)}")

未处理的会话数量: 3


处理会话:   0%|          | 0/3 [00:00<?, ?it/s]

处理会话:  33%|███▎      | 1/3 [00:05<00:11,  5.55s/it]

已更新一条数据到conversations集合


处理会话: 100%|██████████| 3/3 [00:06<00:00,  1.43s/it]

已更新一条数据到conversations集合
已更新一条数据到conversations集合


处理会话: 100%|██████████| 3/3 [00:06<00:00,  2.03s/it]


In [54]:
unprocessed_data =  conversations_collection.find({"id": {"$exists": True},
                                                        "metrics": {"$exists": False}
                                                    })
unprocessed_conversations = []
for row in unprocessed_data:
    unprocessed_conversations.append({'id': row['id'], 'content': row['origin_conversation']})
print(f"未处理的会话数量: {len(unprocessed_conversations)}")

未处理的会话数量: 0


# 数据清洗