# 第五次课后练习 之二（选做）

在本次作业中，如果你在配置环境中遇到了问题，可以参考助教的解决方案：

首先我们在将这个文件放在一个文件夹中（可以和这门课程的其他作业放在一起，但是别放在桌面这种一大堆其他文件的文件夹里）。

随后运行：
```
python3 -m venv myenv
source myenv/bin/activate
pip install openai
pip install requests
pip install socksio
```

然后在 notebook 中切换 ipykernel 为 myenv（如果你采用的是 VSCode 的话，可以看看右上角的 ipykernel，点一下就可以切换了），就可以运行了。

如果你发现还存在包缺失的情况，注意采用 Restart，不然环境可能同步不过来。

**负责助教：吴迪**

<span style="color:red; font-weight:bold;">请将作业文件命名为 第五次课后练习-之二+姓名+学号.ipynb, 例如 第五次课后练习-之二+张三+1000000000.ipynb</span>

#  请认真阅读代码，理解学习代码的功能

## **0.1** 读取网页内容，调用大语言模型API进行中文摘要
    

In [None]:
# 这部分代码进行了重构，主要解决了多次尝试连接、异常处理、日志记录等问题
# 代码中使用了retrying库，用于实现重试机制
# 代码中使用了logging库，用于记录日志信息
# 代码中使用了typing库，用于类型提示
import re
import requests
import html
import logging
from retrying import retry
from bs4 import BeautifulSoup
from openai import OpenAI
from typing import Optional, List, Dict

# 配置日志
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[logging.StreamHandler()]
)
logger = logging.getLogger(__name__)

# 目标URL列表
TARGET_URLS = [
    "https://arxiv.org/abs/2410.03761",
    "https://arxiv.org/abs/2305.15186"
]

# API配置
API_ENDPOINT = "https://openrouter.ai/api/v1"
API_KEY = "<API_KEY>"  

# 请求头配置
HEADERS = {
    'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) '
                  'AppleWebKit/537.36 (KHTML, like Gecko) '
                  'Chrome/120.0.0.0 Safari/537.36'
}

# 模型配置
MODEL_CONFIG = {
    "model": "deepseek/deepseek-chat:free",
    "max_tokens": 500,
    "temperature": 0.7,
}

# 正则表达式，用于解析网页摘要（读取html网页对象的content字段内容）
pattern = r'<meta\s+property="og:description"\s+content="(.*?)"\s*/>'

class SummaryGenerator:
    def __init__(self):
        self.client = OpenAI(
            base_url=API_ENDPOINT,
            api_key=API_KEY,
        )

    @retry(stop_max_attempt_number=3, wait_fixed=2000)
    def generate_summary(self, text: str) -> Optional[str]:
        """生成文本摘要（带重试机制）"""
        try:
            completion = self.client.chat.completions.create(
                **MODEL_CONFIG,
                messages=[
                    {
                        "role": "system",
                        "content": "请用中文撰写用户文本的300字摘要，保持专业学术风格"
                    },
                    {
                        "role": "user",
                        "content": text
                    }
                ]
            )
            return completion.choices[0].message.content
        except Exception as e:
            logger.error(f"生成摘要失败: {e}")
            return None

class ArxivParser:
    @staticmethod
    @retry(stop_max_attempt_number=3, wait_fixed=2000)
    def fetch_content(url: str) -> Optional[str]:
        """获取网页内容（带重试机制）"""
        try:
            # 发送HTTP GET请求获取网页内容
            response = requests.get(url, headers=HEADERS, timeout=15) # 设置超时时间
            response.raise_for_status() # 检查请求是否成功
            return response.text
        except requests.exceptions.RequestException as e:
            logger.error(f"获取内容失败[{url}]: {e}")
            return None

    @staticmethod
    def parse_content(html_content: str) -> Optional[str]:
        """解析网页内容"""
        try:
            soup = BeautifulSoup(html_content, 'html.parser')
            meta_tag = soup.find('meta', {'property': 'og:description'})
            if meta_tag and (content := meta_tag.get('content')):
                return html.unescape(content)
            logger.warning("未找到摘要内容")
            return None
        except Exception as e:
            logger.error(f"解析内容失败: {e}")
            return None

def process_url(url: str, generator: SummaryGenerator) -> Optional[Dict]:
    """处理单个URL的完整流程"""
    logger.info(f"正在处理: {url}")
    
    # 获取网页内容
    if (html_content := ArxivParser.fetch_content(url)) is None:
        return None
    
    # 解析摘要
    if (summary := ArxivParser.parse_content(html_content)) is None:
        return None
    
    # 生成中文摘要
    if (zh_summary := generator.generate_summary(summary)) is None:
        return None
    
    return {
        "url": url,
        "original_summary": summary,
        "chinese_summary": zh_summary
    }

def main():
    generator = SummaryGenerator()
    all_summaries = []
    
    for url in TARGET_URLS:
        if result := process_url(url, generator):
            logger.info(f"成功处理: {url}")
            all_summaries.append(result)
            # 打印当前结果
            print(f"\n论文地址: {result['url']}")
            print("原始摘要:", result['original_summary'])
            print("中文摘要:", result['chinese_summary'])
    
    # 生成综合摘要（限制输入长度）
    combined = " ".join([s['original_summary'] for s in all_summaries])
    if len(combined) > 3000:
        combined = combined[:3000] + "...[截断]"
    
    if final_summary := generator.generate_summary(combined):
        print("\n综合摘要:", final_summary)

if __name__ == "__main__":
    main()

# 1 仿照上面的框架，实现一个自己版本的多文档（或多轮对话）的分析功能。可以用更复杂的爬虫获得需要的信息。也可以直接从本地读取数据文件实现有实际意义的功能。

完成有新意有创意工作的同学，或在作业过程中觉得有心得或者自己拓展学习到有价值内容的，可以在文件名最后加一个#号。例如第五次课后练习+张三+1000000000+#.ipynb
只是完成学习的同学，没有尝试改进探索工作的可以不提交这个作业

In [None]:
# 这个代码实现了进行本地学术论文分析
import os
import re
import logging
import pdfplumber
import fitz  
from retrying import retry
from openai import OpenAI
from typing import List, Dict, Optional
from collections import Counter

# 配置日志
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[logging.StreamHandler()]
)
logger = logging.getLogger(__name__)

# 支持的文件类型
SUPPORTED_EXT = ['.pdf', '.txt']

# 模型配置
MODEL_CONFIG = {
    "model": "deepseek/deepseek-chat:free",
    "max_tokens": 800,
    "temperature": 0.3,
}

class PDFAnalyzer:
    @staticmethod
    def extract_metadata(file_path: str) -> Dict:
        """提取PDF元数据"""
        try:
            with fitz.open(file_path) as doc:
                meta = doc.metadata
                return {
                    "title": meta.get("title", ""),
                    "author": meta.get("author", ""),
                    "keywords": meta.get("keywords", ""),
                    "pages": len(doc)
                }
        except Exception as e:
            logger.error(f"元数据提取失败: {e}")
            return {}

    @staticmethod
    def extract_text(file_path: str) -> Optional[str]:
        """专业PDF文本提取（优先获取正文内容）"""
        try:
            full_text = []
            
            # 策略一：使用pdfplumber提取结构化文本
            with pdfplumber.open(file_path) as pdf:
                for page in pdf.pages:
                    # 优先提取正文（排除页眉页脚）
                    y0 = page.height * 0.1  # 排除顶部10%
                    y1 = page.height * 0.9  # 排除底部10%
                    cropped = page.crop((0, y0, page.width, y1))
                    text = cropped.extract_text()
                    if text:
                        # 清理换行符
                        text = re.sub(r'-\n', '', text)
                        text = re.sub(r'\n', ' ', text)
                        full_text.append(text)
            
            # 策略二：如果方法一失败，使用PyMuPDF的智能提取
            if not full_text:
                with fitz.open(file_path) as doc:
                    for page in doc:
                        text = page.get_text("text")
                        full_text.append(text)

            return "\n".join(full_text)
        except Exception as e:
            logger.error(f"PDF解析失败: {e}")
            return None

class FileProcessor:
    @staticmethod
    @retry(stop_max_attempt_number=3, wait_fixed=2000)
    def read_content(file_path: str) -> Optional[Dict]:
        """智能文件读取（自动判断文件类型）"""
        if not os.path.exists(file_path):
            logger.warning(f"文件不存在: {file_path}")
            return None

        try:
            file_type = os.path.splitext(file_path)[1].lower()
            
            if file_type == '.pdf':
                # 提取PDF元数据
                meta = PDFAnalyzer.extract_metadata(file_path)
                # 提取正文内容
                if (text := PDFAnalyzer.extract_text(file_path)):
                    return {"content": text, "meta": meta}
            
            elif file_type == '.txt':
                # 文本文件读取（自动检测编码）
                for encoding in ['utf-8', 'gbk', 'latin-1']:
                    try:
                        with open(file_path, 'r', encoding=encoding) as f:
                            return {"content": f.read(), "meta": {}}
                    except UnicodeDecodeError:
                        continue
            
            logger.error(f"不支持的格式: {file_type}")
            return None
        except Exception as e:
            logger.error(f"文件读取失败[{file_path}]: {e}")
            return None

class ResearchPaperAnalyzer:
    def __init__(self):
        self.client = OpenAI(
            base_url="https://openrouter.ai/api/v1",
            api_key="<API_KEY>",
        )
        self.keyword_counter = Counter()
        self.domain_counter = Counter()

    @retry(stop_max_attempt_number=3, wait_fixed=2000)
    def analyze_paper(self, content: str) -> Optional[Dict]:
        """学术论文深度分析"""
        try:
            completion = self.client.chat.completions.create(
                **MODEL_CONFIG,
                messages=[
                    {
                        "role": "system",
                        "content": "您是一名学术研究助理，请分析论文内容并返回：\n"
                                   "1. 核心研究问题\n"
                                   "2. 5个关键技术术语\n"
                                   "3. 研究方法（50字）\n"
                                   "4. 主要结论（50字）\n"
                                   "5. 所属学科领域（最多3个）\n"
                                   "格式：JSON，包含problem, keywords, methodology, conclusion, fields"
                    },
                    {
                        "role": "user",
                        "content": content[:6000]  # 限制输入长度
                    }
                ]
            )
            return self._parse_response(completion.choices[0].message.content)
        except Exception as e:
            logger.error(f"分析失败: {e}")
            return None

    def _parse_response(self, response: str) -> Optional[Dict]:
        """解析模型响应"""
        try:
            # 提取JSON部分
            json_str = re.search(r'\{.*\}', response, re.DOTALL).group()
            result = eval(json_str)
            
            # 数据校验
            required_keys = ['problem', 'keywords', 'methodology', 'conclusion', 'fields']
            if all(key in result for key in required_keys):
                # 统计关键词和领域
                self.keyword_counter.update(result['keywords'])
                self.domain_counter.update(result['fields'])
                return result
            return None
        except Exception as e:
            logger.error(f"响应解析失败: {e}")
            return None

def generate_visual_report(analyzer: ResearchPaperAnalyzer) -> str:
    """生成可视化报告"""
    report = ["\n学术论文分析报告", "="*40]
    
    # 关键词云
    top_keywords = analyzer.keyword_counter.most_common(10)
    report.append("\n🔍 高频关键词TOP10：")
    report.extend([f"- {word[0]} ({word[1]}次)" for word in top_keywords])
    
    # 领域分布
    report.append("\n📚 学科领域分布：")
    for domain, count in analyzer.domain_counter.most_common():
        report.append(f"- {domain}: {count}篇")
    
    return "\n".join(report)

def main():
    # 示例文件路径（需修改为实际路径）
    papers = ["data/report.pdf"]
    
    analyzer = ResearchPaperAnalyzer()
    
    for paper_path in papers:
        logger.info(f"正在分析: {paper_path}")
        
        # 读取文件
        if not (data := FileProcessor.read_content(paper_path)):
            continue
            
        # 执行分析
        if not (result := analyzer.analyze_paper(data['content'])):
            continue
            
        # 打印结果
        print(f"\n📄 论文: {os.path.basename(paper_path)}")
        if data['meta'].get('title'):
            print(f"标题: {data['meta']['title']}")
        print(f"领域: {', '.join(result['fields'])}")
        print(f"核心问题: {result['problem']}")
        print(f"研究方法: {result['methodology']}")
        print(f"主要结论: {result['conclusion']}")
        print(f"关键词: {', '.join(result['keywords'])}")
    
    # 生成综合报告
    print(generate_visual_report(analyzer))

if __name__ == "__main__":
    main()