In [None]:
!pip install openai weave wandb pandas numpy matplotlib seaborn python-docx
import os
import re
import weave
import wandb
import pandas as pd
import numpy as np
from openai import OpenAI
from typing import Dict, List, Any
import json
from datetime import datetime

# 初始化 Weave 和 W&B
weave.init('oral-history-rewriting-experiment')

# DeepSeek 客户端配置
client = OpenAI(
    api_key=os.environ.get('DEEPSEEK_API_KEY'),
    base_url="https://api.deepseek.com/v1"
)

# 提示词模板定义
BASE_PROMPT = """
请将以下老年人口述采访内容改写成流畅的叙述性文字：
{oral_text}
要求：保持原意，语言通顺，结构清晰。
"""

FEW_SHOT_PROMPT = """
你是一个专业的口述历史整理者。请参考以下示例，将新的口述内容改写成叙述性文字。

示例1：
原始口述："我那时候啊，每天早起去种地，太阳还没出来就出门了。家里孩子多，吃饭都成问题。后来去了工厂，生活才好一点。"
改写后：在那些艰苦的岁月里，他每天清晨在太阳升起之前就出门耕作。由于家庭人口众多，食物供应常常紧张。直到他进入工厂工作后，生活条件才逐渐改善。

示例2：
原始口述："文革时候挺难的，但是大家都一样，互相帮助。我记得邻居王大爷经常给我们送吃的。"
改写后：在文化大革命那段艰难时期，虽然生活困苦，但社区居民之间相互扶持。他特别记得邻居王大爷时常与他们分享食物，这种邻里情谊令人感动。

现在请改写以下内容：
{oral_text}
"""

COT_PROMPT = """
请按照以下步骤分析和改写口述内容：

第一步：提取关键时间信息
- 识别并整理时间线索
- 建立时间顺序

第二步：分析人物关系
- 识别提到的人物
- 理清人物关系网络

第三步：梳理事件逻辑
- 提取主要事件
- 分析事件因果关系

第四步：理解情感基调
- 分析原始情感表达
- 保持情感真实性，不美化负面情绪

第五步：基于以上分析进行改写

口述内容：
{oral_text}
"""

COMBINED_PROMPT = """
你是一个专业的口述历史整理者。请参考示例并按照思维链步骤进行改写。

【示例参考】
示例1：
原始口述："我那时候啊，每天早起去种地，太阳还没出来就出门了。家里孩子多，吃饭都成问题。后来去了工厂，生活才好一点。"
改写后：在那些艰苦的岁月里，他每天清晨在太阳升起之前就出门耕作。由于家庭人口众多，食物供应常常紧张。直到他进入工厂工作后，生活条件才逐渐改善。

示例2：
原始口述："文革时候挺难的，但是大家都一样，互相帮助。我记得邻居王大爷经常给我们送吃的。"
改写后：在文化大革命那段艰难时期，虽然生活困苦，但社区居民之间相互扶持。他特别记得邻居王大爷时常与他们分享食物，这种邻里情谊令人感动。

【分析步骤】
1. 时间线梳理
2. 人物关系分析  
3. 事件逻辑整理
4. 情感基调把握

【待改写内容】
{oral_text}
"""

class OralHistoryRewriter:
    def __init__(self):
        self.prompt_templates = {
            'base': BASE_PROMPT,
            'few_shot': FEW_SHOT_PROMPT,
            'cot': COT_PROMPT,
            'combined': COMBINED_PROMPT
        }
    
    @weave.op()
    def rewrite_text(self, method: str, oral_text: str, temperature: float = 0.3) -> str:
        """使用指定方法重写口述文本"""
        prompt = self.prompt_templates[method].format(oral_text=oral_text)
        
        try:
            response = client.chat.completions.create(
                model="deepseek-chat",
                messages=[
                    {"role": "system", "content": "你是一个专业的口述历史整理专家。"},
                    {"role": "user", "content": prompt}
                ],
                temperature=temperature
            )
            return response.choices[0].message.content
        except Exception as e:
            print(f"API调用错误: {str(e)}")
            return f"Error: {str(e)}"

class LongTextOralHistoryExperiment:
    def __init__(self):
        self.rewriter = OralHistoryRewriter()
    
    def read_docx_file(self, file_path: str) -> str:
        """读取docx文件内容"""
        try:
            from docx import Document
            doc = Document(file_path)
            full_text = []
            for paragraph in doc.paragraphs:
                full_text.append(paragraph.text)
            return '\n'.join(full_text)
        except ImportError:
            print("未安装python-docx库，无法读取docx文件")
            print("请运行: pip install python-docx")
            return ""
        except Exception as e:
            print(f"读取docx文件时出错: {str(e)}")
            try:
                with open(file_path, 'r', encoding='utf-8') as f:
                    return f.read()
            except:
                return ""
    
    def process_with_method(self, long_text: str, method: str, experiment_name: str) -> Dict:
        """使用特定方法处理整个长文本（不分割）"""
        print(f"  开始处理整个文本 (长度: {len(long_text)} 字符)")
        
        # 启动W&B
        wandb.init(
            project="oral-history-long-text",
            name=experiment_name,
            config={
                "method": method,
                "total_length": len(long_text)
            },
            reinit=True
        )
        
        start_time = datetime.now()
        
        rewrite = self.rewriter.rewrite_text(method, long_text)
        
        processing_time = (datetime.now() - start_time).total_seconds()
        
        is_rewritten = rewrite != long_text and len(rewrite) > len(long_text) * 0.5 and not rewrite.startswith("Error:")
        
        result = {
            'original_text': long_text,
            'rewritten_text': rewrite,
            'original_length': len(long_text),
            'rewritten_length': len(rewrite),
            'compression_ratio': len(rewrite) / len(long_text) if len(long_text) > 0 else 1,
            'is_rewritten': is_rewritten,
            'processing_time_seconds': processing_time
        }
        
        stats = {
            "total_length": len(long_text),
            "rewritten_length": len(rewrite),
            "compression_ratio": len(rewrite) / len(long_text) if len(long_text) > 0 else 1,
            "processing_time_seconds": processing_time,
            "is_rewritten": is_rewritten
        }
        
        # 记录到W&B
        wandb.log(stats)
        wandb.log({
            "original_text_sample": long_text[:1000] + "..." if len(long_text) > 1000 else long_text,
            "rewritten_text_sample": rewrite[:1000] + "..." if len(rewrite) > 1000 else rewrite
        })
        
        wandb.finish()
        
        return {
            "full_rewritten_text": rewrite,
            "result": result,
            "statistics": stats
        }
    
    def run_complete_experiment(self, long_text_path: str, experiment_name: str):
        """运行完整的长文本实验"""
        # 读取长文本
        if long_text_path.endswith('.docx'):
            long_text = self.read_docx_file(long_text_path)
        else:
            with open(long_text_path, 'r', encoding='utf-8') as f:
                long_text = f.read()
        
        if not long_text:
            print("无法读取文件内容")
            return {}
        
        print(f"读取文本，总长度: {len(long_text)} 字符")
        
        # 为每种方法运行实验
        all_results = {}
        
        for method in ['base', 'few_shot', 'cot', 'combined']:
            print(f"\n开始方法: {method}")
            
            method_experiment_name = f"{experiment_name}_{method}"
            result = self.process_with_method(long_text, method, method_experiment_name)
            all_results[method] = result
            
            # 保存结果到文件
            output_filename = f"rewritten_{method}_{experiment_name}.txt"
            with open(output_filename, 'w', encoding='utf-8') as f:
                f.write(result['full_rewritten_text'])
            
            print(f"保存结果到: {output_filename}")
            print(f"处理时间: {result['statistics']['processing_time_seconds']:.2f}秒")
            print(f"压缩比例: {result['statistics']['compression_ratio']:.2f}")
        
        return all_results

class ShortTextExperimentManager:
    def __init__(self, project_name: str):
        self.project_name = project_name
        self.rewriter = OralHistoryRewriter()
        self.experiment_history = []
    
    def create_test_cases(self) -> List[Dict]:
        """创建测试用例"""
        return [
            {
                'id': 'case_1',
                'oral_text': '我小时候家里穷，经常吃不饱饭。后来参加了革命，日子才好过一点。',
                'characteristics': '简单时间序列，轻微负面情感',
                'ground_truth': '他回忆童年时家境贫困，时常面临食物短缺的困境。直到后来投身革命事业，生活状况才有所改善。'
            },
            {
                'id': 'case_2',
                'oral_text': '文革那时候，我父亲被批斗，家里人都很害怕。但是邻居李阿姨偷偷给我们送饭，我很感激她。',
                'characteristics': '复杂情感，负面事件，人物关系',
                'ground_truth': '在文化大革命期间，他的父亲遭受批斗，全家人都生活在恐惧之中。然而，邻居李阿姨冒着风险暗中接济他们食物，这份恩情让他至今铭记。'
            },
            {
                'id': 'case_3',
                'oral_text': '58年大跃进，我们村搞集体食堂，开始吃得挺好，后来就没粮食了。饿得不行，就去挖野菜。',
                'characteristics': '历史事件，时间顺序，负面经历',
                'ground_truth': '1958年大跃进时期，他们村庄开办了集体食堂。初期食物供应尚可，但后来粮食逐渐短缺。在极度饥饿的情况下，村民们只能依靠挖野菜充饥。'
            },
            {
                'id': 'case_4',
                'oral_text': '我老伴走了五年了，刚开始那会儿特别难受，现在慢慢习惯了。孩子们都挺孝顺的，经常回来看我。',
                'characteristics': '情感变化，时间跨度，家庭关系',
                'ground_truth': '他的配偶已离世五年。在丧偶初期，他经历了巨大的悲痛，但随着时间推移逐渐适应了独居生活。令他欣慰的是，子女们都很孝顺，时常回家探望。'
            }
        ]
    
    def run_single_experiment(self, experiment_config: Dict) -> pd.DataFrame:
        """运行单个实验"""
        experiment_name = experiment_config["name"]
        method = experiment_config["method"]
        test_cases = experiment_config["test_cases"]
        
        print(f"开始实验: {experiment_name}")
        
        # 启动 W&B run
        run = wandb.init(
            project=self.project_name,
            name=experiment_name,
            config=experiment_config,
            reinit=True
        )
        
        results = []
        for case in test_cases:
            print(f"  处理案例: {case['id']}")
            
            rewrite = self.rewriter.rewrite_text(
                method=method,
                oral_text=case['oral_text'],
                temperature=experiment_config.get('temperature', 0.3)
            )
            
            result = {
                'experiment_id': experiment_config["id"],
                'experiment_name': experiment_name,
                'method': method,
                'case_id': case['id'],
                'original_text': case['oral_text'],
                'rewritten_text': rewrite,
                'characteristics': case['characteristics'],
                'ground_truth': case.get('ground_truth', ''),
                'timestamp': datetime.now().isoformat()
            }
            results.append(result)
            
            # 记录到 W&B
            wandb.log({
                f"{case['id']}_original": case['oral_text'],
                f"{case['id']}_rewrite": rewrite,
                f"{case['id']}_characteristics": case['characteristics']
            })
        
        # 创建结果表格
        results_df = pd.DataFrame(results)
        wandb.log({"results_table": wandb.Table(dataframe=results_df)})
        
        # 记录实验摘要
        input_lengths = [len(r['original_text']) for r in results]
        output_lengths = [len(r['rewritten_text']) for r in results]
        
        summary_stats = {
            "total_cases": len(results),
            "method": method,
            "avg_input_length": np.mean(input_lengths) if input_lengths else 0,
            "avg_output_length": np.mean(output_lengths) if output_lengths else 0
        }
        wandb.log(summary_stats)
        
        # 结束 W&B run
        wandb.finish()
        
        # 保存到实验历史
        experiment_record = {
            "config": experiment_config,
            "results": results,
            "summary": summary_stats,
            "timestamp": datetime.now().isoformat()
        }
        self.experiment_history.append(experiment_record)
        
        print(f"完成实验: {experiment_name}")
        return results_df
    
    def run_all_experiments(self) -> Dict[str, pd.DataFrame]:
        """运行所有预定义的实验"""
        test_cases = self.create_test_cases()
        
        experiments_config = [
            {
                "id": "exp_base_v1",
                "name": "基础方法-第一轮",
                "method": "base",
                "description": "基础提示词方法，测试逻辑准确性基础水平",
                "test_cases": test_cases,
                "temperature": 0.3
            },
            {
                "id": "exp_fewshot_v1",
                "name": "Few-Shot方法-情感优化",
                "method": "few_shot",
                "description": "使用Few-Shot示例优化情感表达",
                "test_cases": test_cases,
                "temperature": 0.3
            },
            {
                "id": "exp_cot_v1",
                "name": "思维链方法-逻辑优化",
                "method": "cot",
                "description": "使用CoT方法提升时间人物逻辑准确性",
                "test_cases": test_cases,
                "temperature": 0.3
            },
            {
                "id": "exp_combined_v1",
                "name": "组合方法-完整优化",
                "method": "combined",
                "description": "Few-Shot + CoT组合方法全面优化",
                "test_cases": test_cases,
                "temperature": 0.3
            }
        ]
        
        all_results = {}
        for config in experiments_config:
            results_df = self.run_single_experiment(config)
            all_results[config["id"]] = results_df
        
        return all_results
    
    def get_experiment_summary(self) -> pd.DataFrame:
        """获取实验概览"""
        summary_data = []
        for record in self.experiment_history:
            summary_data.append({
                "实验ID": record["config"]["id"],
                "实验名称": record["config"]["name"],
                "方法": record["config"]["method"],
                "测试案例数": record["summary"]["total_cases"],
                "平均输入长度": f"{record['summary']['avg_input_length']:.1f}",
                "平均输出长度": f"{record['summary']['avg_output_length']:.1f}",
                "运行时间": record["timestamp"]
            })
        
        return pd.DataFrame(summary_data)

def generate_analysis_report(results: Dict, experiment_name: str):
    """生成分析报告"""
    print(f"\n实验 '{experiment_name}' 分析报告")
    print("=" * 50)
    
    for method, result in results.items():
        stats = result['statistics']
        print(f"\n方法: {method.upper()}")
        print(f"  原始长度: {stats['total_length']} 字符")
        print(f"  改写后长度: {stats['rewritten_length']} 字符")
        print(f"  压缩比例: {stats['compression_ratio']:.2f}")
        print(f"  处理时间: {stats['processing_time_seconds']:.2f}秒")
        print(f"  是否成功改写: {'是' if stats['is_rewritten'] else '否'}")

def run_long_text_experiment():
    """运行长文本实验"""
    print("开始长文本老年人口述史AI改写实验...")
    
    # 初始化实验
    experiment = LongTextOralHistoryExperiment()
    
    # 运行实验
    long_text_path = "Jenny-老年人访谈-转写.docx"  # 你的文件路径
    experiment_name = "老年人采访完整实验_v1"
    
    results = experiment.run_complete_experiment(long_text_path, experiment_name)
    
    # 生成分析报告
    generate_analysis_report(results, experiment_name)
    
    return results

def run_short_text_experiment():
    """运行短文本测试实验"""
    print("开始短文本老年人口述史AI改写实验...")
    
    # 初始化实验管理器
    manager = ShortTextExperimentManager("oral-history-rewriting-experiment")
    
    # 运行所有实验
    print("\n开始运行所有实验...")
    all_results = manager.run_all_experiments()
    
    # 显示实验概览
    print("\n实验概览:")
    summary = manager.get_experiment_summary()
    print(summary.to_string(index=False))
    
    # 保存所有结果到CSV
    all_results_combined = pd.concat(all_results.values(), ignore_index=True)
    all_results_combined.to_csv("oral_history_experiment_results.csv", index=False, encoding='utf-8-sig')
    print(f"\n所有结果已保存至: oral_history_experiment_results.csv")
    
    return all_results

if __name__ == "__main__":
    # 选择运行哪种实验
    print("请选择实验类型:")
    print("1. 长文本实验 (处理完整的1.5万字采访)")
    print("2. 短文本测试实验 (使用测试用例)")
    
    choice = input("请输入选择 (1 或 2): ").strip()
    
    if choice == "1":
        run_long_text_experiment()
    elif choice == "2":
        run_short_text_experiment()
    else:
        print("无效选择，退出程序。")