In [1]:
import sys
import os

# 把项目根目录加入 Python path
sys.path.append(os.path.abspath(".."))

In [2]:
from src.data_loader import load_comments
from src.chunking import chunk_comments
from src.llm_extraction_v2 import extract_insights_from_chunk

comments = load_comments("../data/bts_comments.csv")
chunks = chunk_comments(comments, chunk_size=500)

In [3]:
# # （串发）Notebook cell: 解析 LLM 输出并构建 group_results（使用 src/parsing.robust_parse）
# # 先确保你已经在 src/parsing.py 中实现了 robust_parse(raw_output, raise_on_fail=False)
# # 并且 extract_insights_from_chunk(chunk) 函数可用（它会向 LLM 发请求并返回原始字符串）。

# from src.parsing import robust_parse

# group_results = []
# failed = []

# # 防护：确保 chunks 存在
# try:
#     iterator = enumerate(chunks[:6])
# except NameError:
#     raise NameError("变量 `chunks` 未定义。请先生成 chunks（例如按视频或文本分片）。")

# for i, chunk in iterator:
#     # 1) 调用你的提取函数（这会返回 LLM 的原始字符串输出）
#     raw_output = extract_insights_from_chunk(chunk)

#     # 2) 用稳健解析器解析（不会在解析失败时抛异常，除非你改参数）
#     parsed = robust_parse(raw_output, raise_on_fail=False)

#     # 3) 处理解析失败的情况：记录并以空结构占位，避免后续 pipeline 崩溃
#     if parsed is None:
#         print(f"[Warning] chunk #{i} parse failed — saving preview to debug file.")
#         failed.append(i)
#         with open(f"debug_raw_chunk_{i}.txt", "w", encoding="utf-8") as f:
#             # 保存原始输出，便于离线检查（供你或我分析）
#             f.write(str(raw_output))
#         group_results.append({
#             "audience_interest_themes": [],
#             "positive_content_drivers": [],
#             "recurring_pain_points": []
#         })
#     else:
#         # 解析成功 -> 追加到结果列表
#         group_results.append(parsed)

# print(f"Done. total={len(group_results)} failed={len(failed)}. failed indices={failed}")
# # 现在你可以调用 aggregate_insights_with_clustering(group_results, eps=0.35)


In [None]:
#（并发）
from src.parsing import robust_parse
import concurrent.futures

group_results = []
failed = []

def process_chunk(i_chunk):
    i, chunk = i_chunk
    raw_output = extract_insights_from_chunk(chunk)
    parsed = robust_parse(raw_output, raise_on_fail=False)

    if parsed is None:
        with open(f"debug_raw_chunk_{i}.txt", "w", encoding="utf-8") as f:
            f.write(str(raw_output))
        return i, {
            'top_content_elements'
            'top_audience_insights': [],
            'top_engagement_drivers': [],
            'top_audience_pain_points': []
        }, True
    else:
        return i, parsed, False


selected_chunks = list(enumerate(chunks[:6]))

with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
    futures = [executor.submit(process_chunk, item) for item in selected_chunks]

    for future in concurrent.futures.as_completed(futures):
        i, result, is_failed = future.result()
        group_results.append(result)
        if is_failed:
            failed.append(i)

print(f"Done. total={len(group_results)} failed={len(failed)}. failed indices={failed}")


Done. total=6 failed=0. failed indices=[]


In [5]:
from src.aggregation import aggregate_insights_with_clustering
final = aggregate_insights_with_clustering(group_results, eps=0.35)
print(final['top_content_elements'])
print(final['top_audience_insights'])
print(final['top_engagement_drivers'])
print(final['top_audience_pain_points'])


[("BTS's humor and interactions", 6), ("Jimin's interactions", 6), ("Jin's personality", 4), ('BTS V', 3), ("BTS's concert performances", 2)]
[('Individuals who identify as ARMY and engage with BTS content regularly', 5), ('Fans from diverse backgrounds (e.g., Australian, Bangladeshi, Russian)', 3), ('International audience', 2)]
[('Relatability through shared experiences and emotions', 6), ("Emotional connections to the members' personalities and interactions", 3), ('Desire for community interaction and shared fandom experiences', 3)]
[('Requests for more content or specific performances', 2), ('Frustration with the absence of certain members in videos', 2), ('Confusion about song meanings or contexts', 2)]


In [6]:
# # Stability check

# group_results = []
# failed = []

# def process_chunk(i_chunk):
#     i, chunk = i_chunk
#     raw_output = extract_insights_from_chunk(chunk)
#     parsed = robust_parse(raw_output, raise_on_fail=False)

#     if parsed is None:
#         with open(f"debug_raw_chunk_{i}.txt", "w", encoding="utf-8") as f:
#             f.write(str(raw_output))
#         return i, {
#             "audience_interest_themes": [],
#             "positive_content_drivers": [],
#             "recurring_pain_points": []
#         }, True
#     else:
#         return i, parsed, False


# selected_chunks = list(enumerate(chunks[6:12]))

# with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
#     futures = [executor.submit(process_chunk, item) for item in selected_chunks]

#     for future in concurrent.futures.as_completed(futures):
#         i, result, is_failed = future.result()
#         group_results.append(result)
#         if is_failed:
#             failed.append(i)

# print(f"Done. total={len(group_results)} failed={len(failed)}. failed indices={failed}")


In [7]:
# final = aggregate_insights_with_clustering(group_results, eps=0.35)
# print(final['top_content_elements'])
# print(final['top_audience_insights'])
# print(final['top_engagement_drivers'])
# print(final['top_audience_pain_points'])