# 0. 准备json数据

In [None]:
import pymongo
import pandas as pd
# MongoDB连接配置
mongo_client = pymongo.MongoClient("mongodb://localhost:27017/")
kinyo_db = mongo_client["kinyo_db"]
kinyo_reviews_collection = kinyo_db["kinyo_new_reviews"]
kinyo_llm_results_collection = kinyo_db["kinyo_llm_results"]
kinyo_data_result = kinyo_db["kinyo_data_result"]

import matplotlib.pyplot as plt
plt.rcParams['font.sans-serif'] = ['SimHei']  # 设置中文字体为黑体
plt.rcParams['axes.unicode_minus'] = False    # 正常显示负号

In [None]:
# 1. 用户画像统计
# 获取已处理的评论ID集合
# 设置当前实验的 test_version 和 solution
current_test_version = "1"
current_solution = "AI自打标：不限定"
print("test_version:", current_test_version)
print("solution:", current_solution)
kinyo_llm_results_collection.find_one({"test_version": current_test_version, "solution": current_solution})

In [None]:
from collections import Counter
import json

# 1. 画像数据

## 统计画像数据

In [None]:
def profile_stats(collection, test_version, solution, fields):
    cursor = collection.find({
        "test_version": test_version,
        "solution": solution
    })

    # 初始化每个字段的计数器
    field_counters = {field: Counter() for field in fields}
    total = 0

    for doc in cursor:
        user_profile = doc.get("user_profile", {})
        for field in fields:
            value = user_profile.get(field, "")
            field_counters[field][value] += 1
        total += 1

    # 组织结果
    result = {}
    for field, counter in field_counters.items():
        field_result = []
        for value, count in counter.items():
            if value !="未知" and value != "未指定":
                field_result.append({
                    "value": value,
                    "count": count
                })
        result[field] = field_result

    # print(json.dumps(result, ensure_ascii=False, indent=2))
    return result

# 用法示例
fields = [
    "gender", "occupation", "consumption_scene", "consumption_motivation",
    "consumption_frequency", "consumption_satisfaction", "consumption_unsatisfaction"
]
fields = ["new_"+field for field in fields]
profile_stats_json = profile_stats(kinyo_llm_results_collection, current_test_version, current_solution, fields)

In [None]:
profile_stats_json

In [None]:
def total_review(collection, test_version, solution):
    cursor = collection.find({
        "test_version": test_version,
        "solution": solution
    })
    return len(list(cursor))

In [None]:
from datetime import datetime
import uuid
all_data_result_json = {"test_version": current_test_version, "solution": current_solution}
all_data_result_json["user_profile"] = profile_stats(kinyo_llm_results_collection, current_test_version, current_solution, fields)
all_data_result_json["total_review"] = total_review(kinyo_llm_results_collection, current_test_version, current_solution)
all_data_result_json['process_time'] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
all_data_result_json['data_id'] = str(uuid.uuid4())

## 局部摘要：画像内容

In [None]:
# imports
import ast  # for converting embeddings saved as strings back to arrays
from openai import OpenAI # for calling the OpenAI API
import pandas as pd  # for storing text and embeddings data
import tiktoken  # for counting tokens
import os # for getting API token from env variable OPENAI_API_KEY
from scipy import spatial  # for calculating vector similarities for search
import json
# create a list of models 
GPT_MODELS = ["gpt-4o", "gpt-4o-mini"]
# models
EMBEDDING_MODEL = "BAAI/bge-base-zh"

In [None]:
# Set the proxy URL and port
proxy_url = 'http://127.0.0.1'
proxy_port = 6465 # !!!please replace it with your own port

# Set the http_proxy and https_proxy environment variables
os.environ['http_proxy'] = f'{proxy_url}:{proxy_port}'
os.environ['https_proxy'] = f'{proxy_url}:{proxy_port}'

client = OpenAI(api_key=os.getenv('OPEN_AI_KEY'))

In [None]:
# 检索用户数据的所有评论并进行摘要
# step1. 检索用户相关数据的所有评论
def get_comments_by_user_profile_field_mongo(collection, field, value):
    """
    collection: MongoDB 集合对象
    field: user_profile 下的字段名
    value: 需要匹配的值
    """
    query = {f"user_profile.{field}": value}
    projection = {"comment": 1, "_id": 0}
    return [doc["comment"] for doc in collection.find(query, projection)]


In [None]:
from typing import List, Optional

def get_summary(
    reviews: List[str],
    summary_direction: str,
    direction_focus: str,
    client,
    model: str = "gpt-4o-mini"
) -> Optional[str]:
    """
    根据评论列表和总结方向，调用大模型生成50字以内总结。
    :param reviews: 评论文本列表
    :param summary_direction: 总结方向
    :param direction_focus: 总结方向焦点
    :param client: OpenAI或兼容API客户端对象
    :param model: 使用的模型名称
    :return: 总结内容（字符串），如出错返回None
    """
    prompt = (
        f"# 任务\n"
        f"- 请你根据以下所有的评论，生成一个{summary_direction}方向关于{direction_focus}的总结，简单解释在这个方向下总结消费者的理由,只从评论中提取信息并总结，不添加其他信息\n"
        f"# 评论\n"
        f"- {reviews}\n"
        f"# 输出\n"
        f"- 50字以内的总结，只有总结内容，没有其他说明"
    )
    try:
        response = client.chat.completions.create(
            model=model,
            messages=[
                {"role": "system", "content": "你是一个专业的评论总结专家"},
                {"role": "user", "content": prompt}
            ],
            stream=False,
        )
        return response.choices[0].message.content
    except Exception as e:
        print(f"调用大模型总结出错: {e}")
        return None

In [None]:
# [item['value'] for item in all_data_result_json['user_profile']['new_consumption_unsatisfaction']]

In [None]:
# 需要检索的字段
fields = [
    'new_consumption_motivation',
    'new_consumption_satisfaction',
    'new_consumption_unsatisfaction'
]

fields_dict = {
    'new_consumption_motivation': '用户消费动机',
    'new_consumption_satisfaction': '用户消费满意的原因',
    'new_consumption_unsatisfaction': '用户消费不满的原因'
}

for field in fields:
    focus_point_list = [item['value'] for item in all_data_result_json['user_profile'][field]]
    print("#"*50)
    print(f"现在正在处理：{field}方向下的用户评论总结")
    for focus_point in focus_point_list:
        reviews_list =  get_comments_by_user_profile_field_mongo(kinyo_llm_results_collection, field, focus_point)
        summary = get_summary(
            reviews=reviews_list,
            summary_direction=fields_dict[field],
            direction_focus=focus_point,
            client=client,
            model="gpt-4o-mini"
        )
        if summary:
            print("#"*25,focus_point,"#"*25)
            print(f"{field}方向下的{focus_point}总结：{summary}")
            # 更新数据
            for item in all_data_result_json['user_profile'][field]:
                if item['value'] == focus_point:
                    item['summary'] = summary
                    break  # 找到就退出
        

# 2. 话题摘要

## 统计话题数据

In [None]:
from collections import Counter

def topic_polarity_stats(collection, test_version, solution):
    """
    统计每个 new_topic 下好评和差评的数量
    """
    cursor = collection.find({
        "test_version": test_version,
        "solution": solution,
        "product_topic_result": {"$exists": True, "$ne": []}
    })

    stats = {}  # {new_topic: {"好评": count, "差评": count，"中评": count，"总数": count}}

    for doc in cursor:
        for topic_item in doc.get("product_topic_result", []):
            new_topic = topic_item.get("new_topic")
            polarity = topic_item.get("polarity")
            if new_topic and polarity in ("好评", "差评", "中评"):
                if new_topic not in stats:
                    stats[new_topic] = {"好评": 0, "差评": 0, "中评": 0, "总数": 0}
                stats[new_topic][polarity] += 1
                stats[new_topic]["总数"] += 1

    # 可选：去除没有好评和差评的topic
    stats = {k: v for k, v in stats.items() if v["好评"] > 0 or v["差评"] > 0}
    return stats

# 用法示例
topic_stats_json = topic_polarity_stats(
    kinyo_llm_results_collection,
    current_test_version,
    current_solution
)

# 打印结果
import json
print(json.dumps(topic_stats_json, ensure_ascii=False, indent=2))

In [None]:
all_data_result_json["new_topic"] = topic_stats_json

In [None]:
def get_comments_by_new_topic_and_polarity(collection, new_topic_value, polarity_value="好评"):
    """
    检索所有product_topic_result中new_topic等于指定值且polarity为好评的评论
    :param collection: MongoDB集合对象
    :param new_topic_value: 需要匹配的new_topic值
    :param polarity_value: 需要匹配的极性（如'好评'），默认为'好评'
    :return: 评论列表
    """
    # 只查有product_topic_result的文档，减少数据量
    cursor = collection.find(
        {"product_topic_result": {"$exists": True, "$ne": []}},
        {"comment": 1, "product_topic_result": 1, "_id": 0}
    )
    matched_comments = []
    for doc in cursor:
        for topic_item in doc.get("product_topic_result", []):
            if (
                topic_item.get("new_topic") == new_topic_value
                and topic_item.get("polarity") == polarity_value
            ):
                matched_comments.append(doc["comment"])
                break  # 一个评论只保留一次
    return matched_comments

# 用法举例
comments = get_comments_by_new_topic_and_polarity(
    kinyo_llm_results_collection,
    new_topic_value="物流/物流速度",  # 你要检索的new_topic
    polarity_value="好评"           # 你要检索的极性
)
print(comments)

In [None]:
# 极性定义
good_polarity = '好评'
bad_polarities = ['中评', '差评']  # 你可以根据实际数据调整

topic_polarity_dict = {
    '好评': '该主题下的好评总结',
    '中差评': '该主题下的中差评总结'
}

# 如果没有 product_topic 字段，先创建一个
if 'product_topic' not in all_data_result_json:
    all_data_result_json['product_topic'] = {}

for new_topic in all_data_result_json['new_topic']:
    print("#" * 50)
    print(f"现在正在处理 new_topic：{new_topic}")

    # 如果没有该 new_topic 的二级字典，先创建
    if new_topic not in all_data_result_json['product_topic']:
        all_data_result_json['product_topic'][new_topic] = {}

    # 好评
    reviews_good = get_comments_by_new_topic_and_polarity(
        kinyo_llm_results_collection,
        new_topic_value=new_topic,
        polarity_value=good_polarity
    )
    summary_good = get_summary(
        reviews=reviews_good,
        summary_direction=topic_polarity_dict['好评'],
        direction_focus=f"{new_topic}",
        client=client,
        model="gpt-4o-mini"
    )
    if summary_good:
        print("#" * 25, f"{new_topic} - 好评", "#" * 25)
        print(f"{new_topic}下好评的总结：{summary_good}")
        all_data_result_json['new_topic'][new_topic]['好评摘要'] = summary_good

    # 中差评合并
    reviews_bad = []
    for polarity in bad_polarities:
        reviews_bad.extend(get_comments_by_new_topic_and_polarity(
            kinyo_llm_results_collection,
            new_topic_value=new_topic,
            polarity_value=polarity
        ))
    summary_bad = get_summary(
        reviews=reviews_bad,
        summary_direction=topic_polarity_dict['中差评'],
        direction_focus=f"{new_topic}下的中差评的原因",
        client=client,
        model="gpt-4o-mini"
    )
    if summary_bad:
        print("#" * 25, f"{new_topic} - 中差评", "#" * 25)
        print(f"{new_topic}下中差评的总结：{summary_bad}")
        all_data_result_json['new_topic'][new_topic]['中差评摘要'] = summary_bad

# 3. 数据存储

In [None]:
kinyo_data_result.insert_one(all_data_result_json)