# IS5126 Individual Assignment 1 - HW1

In [None]:
%pip install crawl4ai
%pip install beautifulsoup4 lxml requests
!playwright install
print("✅ All libraries installed successfully!")

In [None]:
# Import required libraries
import asyncio
import nest_asyncio
import time
import re
import csv
import json
from typing import Dict, List, Any
from crawl4ai import AsyncWebCrawler
from bs4 import BeautifulSoup
from pydantic import BaseModel, Field
from typing import List, Optional, Dict, Any
import openai
import time
import logging
from datetime import datetime
import os
import json
import openai
import os
from typing import List, Dict, Any, Optional
from datetime import datetime

# Import visualization libraries
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
from datetime import datetime
import warnings
warnings.filterwarnings('ignore')

# Set font for visualization
plt.rcParams['font.sans-serif'] = ['Arial', 'DejaVu Sans']
plt.rcParams['axes.unicode_minus'] = False

# Enable nest_asyncio to support async operations in Jupyter
nest_asyncio.apply()

print("✅ All libraries imported successfully!")

# Enable nest_asyncio to support async operations in Jupyter
nest_asyncio.apply()

print("✅ All libraries imported successfully!")


## Part 1: Wikipedia Scraper Implementation:
– WikipediaScraper class with all required methods
– Demonstration of scraping your chosen 5+ Wikipedia articles
– Explanation of how your articles relate to your research domain
– Error handling examples with sample outputs

In [None]:
class WikipediaScraper:
    def __init__(self, base_urls: List[str]):
        """
        Initialize Wikipedia scraper

        Args:
            base_urls: List of Wikipedia article URLs to scrape
        """
        self.base_urls = base_urls
        self.scraped_data = []
        self.rate_limit_delay = 2  # Request interval (seconds) to avoid overloading Wikipedia servers

        # Validate URL format
        self._validate_urls()

    def _validate_urls(self):
        """Validate if URLs are valid Wikipedia links"""
        valid_urls = []
        for url in self.base_urls:
            if self._is_valid_wikipedia_url(url):
                valid_urls.append(url)
            else:
                print(f"Warning: Skipping invalid Wikipedia URL: {url}")

        if not valid_urls:
            raise ValueError("No valid Wikipedia URLs provided")

        self.base_urls = valid_urls
        print(f"Validation passed, {len(valid_urls)} valid URLs found")

    def _is_valid_wikipedia_url(self, url: str) -> bool:
        """Check if URL is a valid Wikipedia URL"""
        try:
            return (
                'wikipedia.org' in url and
                '/wiki/' in url
            )
        except:
            return False

    def _looks_like_html(self, content: str) -> bool:
        """Determine whether a string looks like HTML"""
        try:
            return bool(content and content.lstrip().startswith("<"))
        except Exception:
            return False

    def _html_to_text(self, html: str) -> str:
        """Convert Wikipedia HTML to cleaner plain text (keep title and paragraphs)"""
        try:
            soup = BeautifulSoup(html, "lxml")

            # Title
            title_node = soup.select_one("#firstHeading")
            title_text = title_node.get_text(" ", strip=True) if title_node else ""

            # Main content container
            content = soup.select_one("#mw-content-text") or soup

            # Noise selectors to remove
            noise_selectors = [
                "#toc", ".mw-references-wrap", "table.infobox",
                "div.navbox", "table.metadata", "div.hatnote",
                "script", "style", "noscript",
            ]
            for sel in noise_selectors:
                for node in content.select(sel):
                    node.decompose()

            # Paragraph text
            paragraphs = []
            for p in content.select("p"):
                text = p.get_text(" ", strip=True)
                text = re.sub(r"\s*\[\d+\]", "", text)  # remove footnote like [1]
                if text:
                    paragraphs.append(text)

            combined = ((title_text + "\n\n") if title_text else "") + "\n\n".join(paragraphs)
            combined = re.sub(r"\n{3,}", "\n\n", combined).strip()
            return combined
        except Exception:
            return html

    async def scrape_article(self, url: str) -> Dict[str, Any]:
        """
        Scrape a single Wikipedia article

        Args:
            url: Article URL

        Returns:
            Dictionary containing article information
        """
        try:
            print(f"Starting to scrape article: {url}")

            async with AsyncWebCrawler(verbose=False) as crawler:
                # Scrape article content
                result = await crawler.arun(url)

                if result.success:
                    # Extract article information
                    article_data = self._extract_article_info(result, url)
                    print(f"Successfully scraped article: {article_data.get('title', 'Unknown')}")
                    return article_data
                else:
                    error_msg = result.error_message if hasattr(result, 'error_message') else 'Unknown error'
                    print(f"Scraping failed: {error_msg}")
                    return self._create_error_response(url, error_msg)

        except Exception as e:
            print(f"Error occurred while scraping article {url}: {str(e)}")
            return self._create_error_response(url, str(e))

    def _extract_article_info(self, result, url: str) -> Dict[str, Any]:
        """Extract article info from scraping result"""
        try:
            content = result.html

            # Extract title
            title = self._extract_title(content, url)

            # Extract main content
            main_content = self._extract_main_content(content)

            # Extract key sections
            sections = self._extract_sections(content)

            # Clean content
            cleaned_content = self.clean_content(main_content)

            return {
                'url': url,
                'title': title,
                'main_content': cleaned_content,
                'sections': sections,
                'raw_content_length': len(content),
                'cleaned_content_length': len(cleaned_content),
                'scraped_at': time.strftime('%Y-%m-%d %H:%M:%S'),
                'status': 'success'
            }

        except Exception as e:
            print(f"Error extracting article info: {str(e)}")
            return self._create_error_response(url, f"Extraction failed: {str(e)}")

    def _extract_title(self, content: str, url: str) -> str:
        """Extract article title from HTML content"""
        try:
            soup = BeautifulSoup(content, 'html.parser')

            # Prefer title from h1
            h1_title = soup.find('h1')
            if h1_title:
                title_text = h1_title.get_text(strip=True)
                if title_text and len(title_text) > 0:
                    return title_text

            # Fallback: page <title>
            title_tag = soup.find('title')
            if title_tag:
                title_text = title_tag.get_text(strip=True)
                # Remove suffix like " - Wikipedia"
                if ' - Wikipedia' in title_text:
                    title_text = title_text.split(' - Wikipedia')[0]
                if title_text and len(title_text) > 0:
                    return title_text

            return "Unknown Title"

        except Exception as e:
            print(f"Error extracting title: {str(e)}")
            return "Unknown Title"

    def _extract_main_content(self, content: str) -> str:
        """Extract main article content from HTML"""
        try:
            soup = BeautifulSoup(content, 'html.parser')

            # Locate main content area
            main_content = soup.select_one('#mw-content-text')
            if not main_content:
                main_content = soup

            # Remove noisy elements
            noise_selectors = [
                '#toc', '.mw-references-wrap', 'table.infobox',
                'div.navbox', 'table.metadata', 'div.hatnote',
                'script', 'style', 'noscript', 'table', 'nav',
                '.mw-editsection', '.mw-cite-backlink', '.thumb',
                '.image', 'sup.reference', 'span.mw-ref'
            ]

            for selector in noise_selectors:
                for element in main_content.select(selector):
                    element.decompose()

            # Collect all paragraph text
            paragraphs = main_content.find_all('p')
            content_list = []

            for p in paragraphs:
                text = p.get_text(strip=True)
                if text and len(text) > 10:  # filter out very short paragraphs
                    # remove footnote numbers and artifacts
                    text = re.sub(r'\s*\[\d+\]', '', text)
                    text = re.sub(r'\[\s*\]', '', text)
                    content_list.append(text)

            if content_list:
                return '\n\n'.join(content_list)
            else:
                return "Unable to extract valid content"

        except Exception as e:
            print(f"Failed to extract content from HTML: {str(e)}")
            return content[:1000]  # return first 1000 chars as fallback

    def _extract_sections(self, content: str) -> List[Dict[str, str]]:
        """Extract heading structure from HTML (without content)"""
        try:
            soup = BeautifulSoup(content, 'html.parser')

            # Locate main content area
            main_content = soup.select_one('#mw-content-text')
            if not main_content:
                main_content = soup

            sections = []

            # Find all heading tags
            headings = main_content.find_all(['h1', 'h2', 'h3'])

            for heading in headings:
                level = int(heading.name[1])  # h1->1, h2->2, h3->3
                heading_text = heading.get_text(strip=True)

                # Skip some headings
                if self._should_skip_heading(heading_text):
                    continue

                sections.append({
                    'level': level,
                    'heading': heading_text
                })

            return sections

        except Exception as e:
            print(f"Failed to extract sections from HTML: {str(e)}")
            return []

    def _should_skip_heading(self, heading_text: str) -> bool:
        """Decide whether a heading should be skipped"""
        skip_patterns = [
            'Contents', 'References', 'External links', 'Further reading',
            'See also', 'Notes', 'Bibliography', 'Sources', 'Citations', 'Footnotes'
        ]

        for pattern in skip_patterns:
            if pattern.lower() in heading_text.lower():
                return True
        return False

    def clean_content(self, raw_content: str) -> str:
        """Clean and pre-process scraped content"""
        if not raw_content:
            return ""

        try:
            # Remove excessive whitespace and newlines
            cleaned = re.sub(r'\n{3,}', '\n\n', raw_content)
            cleaned = re.sub(r'[ \t]+', ' ', cleaned)

            # Remove footnote references
            cleaned = re.sub(r'\s*\[\d+\]', '', cleaned)

            # Ensure proper blank lines between paragraphs
            cleaned = re.sub(r'\n{3,}', '\n\n', cleaned)

            return cleaned.strip()

        except Exception as e:
            print(f"Failed to clean content: {e}")
            return raw_content

    async def scrape_all_articles(self, urls: List[str]) -> List[Dict[str, Any]]:
        """Scrape all articles"""
        print(f"Starting batch scrape for {len(urls)} articles")

        all_results = []

        for i, url in enumerate(urls):
            try:
                print(f"Progress: {i+1}/{len(urls)}")

                # Scrape single article
                article_data = await self.scrape_article(url)
                all_results.append(article_data)

                # Rate limit: add delay between requests
                if i < len(urls) - 1:  # not the last one
                    print(f"Waiting {self.rate_limit_delay} seconds...")
                    await asyncio.sleep(self.rate_limit_delay)

            except Exception as e:
                print(f"Error while scraping article {url}: {str(e)}")
                all_results.append(self._create_error_response(url, str(e)))

        self.scraped_data = all_results
        print(f"Batch scrape completed, successfully scraped {len([r for r in all_results if r.get('status') == 'success'])} articles")

        return all_results

    def _create_error_response(self, url: str, error_message: str) -> Dict[str, Any]:
        """Create an error response"""
        return {
            'url': url,
            'title': 'Error',
            'main_content': '',
            'sections': [],
            'error': error_message,
            'status': 'error',
            'scraped_at': time.strftime('%Y-%m-%d %H:%M:%S')
        }

    def save_to_json(self, data: List[Dict[str, Any]], filename: str):
        """Save scraped data to a JSON file"""
        try:
            with open(filename, 'w', encoding='utf-8') as f:
                json.dump(data, f, ensure_ascii=False, indent=2)
            print(f"Full data saved to: {filename}")
        except Exception as e:
            print(f"Error saving JSON file: {str(e)}")

    def save_content_to_markdown(self, data: List[Dict[str, Any]], filename: str):
        """Save main contents of the results to a Markdown file"""
        try:
            lines = []
            for i, item in enumerate(data, start=1):
                title = item.get('title', f'Article {i}')
                url = item.get('url', '')
                main = item.get('main_content', '').strip()

                lines.append(f"# {title}")
                if url:
                    lines.append(f"Source: {url}")
                lines.append("")

                if main:
                    lines.append(main)
                    lines.append("")

                lines.append("---\n")

            output = "\n".join(lines).strip() + "\n"
            with open(filename, 'w', encoding='utf-8') as f:
                f.write(output)
            print(f"Main content saved to: {filename}")
        except Exception as e:
            print(f"Error saving main content Markdown file: {str(e)}")

    def save_sections_to_markdown(self, data: List[Dict[str, Any]], filename: str):
        """Save section information of the results to a Markdown file"""
        try:
            lines = []
            for i, item in enumerate(data, start=1):
                title = item.get('title', f'Article {i}')
                url = item.get('url', '')
                sections = item.get('sections', [])

                lines.append(f"# {title}")
                if url:
                    lines.append(f"Source: {url}")
                lines.append("")

                if isinstance(sections, list) and sections:
                    for s in sections:
                        if isinstance(s, dict):
                            heading = s.get('heading') or s.get('title') or 'Section'
                            content = s.get('content') or s.get('text') or ''
                            lines.append(f"## {heading}")
                            if content:
                                lines.append(content)
                            lines.append("")
                        else:
                            lines.append(str(s))
                            lines.append("")
                else:
                    lines.append("No sections available")
                    lines.append("")

                lines.append("---\n")

            output = "\n".join(lines).strip() + "\n"
            with open(filename, 'w', encoding='utf-8') as f:
                f.write(output)
            print(f"Section information saved to: {filename}")
        except Exception as e:
            print(f"Error saving section information Markdown file: {str(e)}")

    def get_summary(self) -> Dict[str, Any]:
        """Get scraping summary"""
        if not self.scraped_data:
            return {"message": "No scraped data"}

        successful = [r for r in self.scraped_data if r.get('status') == 'success']
        failed = [r for r in self.scraped_data if r.get('status') == 'error']

        total_content_length = sum(r.get('cleaned_content_length', 0) for r in successful)

        return {
            'total_articles': len(self.scraped_data),
            'successful_scrapes': len(successful),
            'failed_scrapes': len(failed),
            'total_content_length': total_content_length,
            'average_content_length': total_content_length // len(successful) if successful else 0,
            'success_rate': len(successful) / len(self.scraped_data) * 100
        }

print("✅ WikipediaScraper class definition completed")


In [None]:
# Utility function to convert search keywords to Wikipedia URLs
def build_wikipedia_urls(terms):
    """Generate Wikipedia article URL list from keywords (English articles).
    - Rules: Replace spaces with underscores; trim whitespace; deduplicate.
    - Note: If keywords are not English or non-standard titles, additional processing/search may be needed.
    """
    if not terms:
        return []
    base = "https://en.wikipedia.org/wiki/"
    urls = []
    seen = set()
    for t in terms:
        if not isinstance(t, str):
            continue
        title = t.strip().replace(" ", "_")
        if not title:
            continue
        url = base + title
        if url not in seen:
            urls.append(url)
            seen.add(url)
    return urls


In [None]:
async def scrape_scientific_discoveries(search_terms_list: Optional[List[str]] = None):
    """Scrape scientific discovery related Wikipedia articles"""
    print("🚀 Starting to scrape scientific discovery related Wikipedia articles...")
    print("=" * 60)
    if search_terms_list != None:
        scientific_urls = build_wikipedia_urls(search_terms_list)
        print(f"✅ Generated {len(scientific_urls)} Wikipedia URLs")
    else:
        search_terms = [
        "CRISPR",
        "RNA vaccine",
        "Gravitational wave",
        "Higgs boson",
        "Quantum computing",
        "Ancient DNA",
        "Water on Mars",
        "Penicillin"
        ]
        scientific_urls = build_wikipedia_urls(search_terms)

    print(scientific_urls)
    scraper = WikipediaScraper(scientific_urls)
    results = await scraper.scrape_all_articles(scientific_urls)

    # Save data
    scraper.save_to_json(results,"./scientific_discoveries.json")
    scraper.save_content_to_markdown(results,"./content.md")
    scraper.save_sections_to_markdown(results,"./section.md")

    # Display summary
    summary = scraper.get_summary()
    print("\n📊 Scraping Results Summary:")
    print("=" * 40)
    for key, value in summary.items():
        print(f"{key}: {value}")

    print(f"\n🎉 Scraping completed! Retrieved data for {len(results)} articles")
    return results

print("✅ Data scraping function definition completed")


✅ 数据抓取函数定义完成


In [None]:
# Execute data scraping - can directly use await in Jupyter
scraped_info = await scrape_scientific_discoveries()

🚀 开始抓取科学发现相关的维基百科文章...
['https://en.wikipedia.org/wiki/CRISPR', 'https://en.wikipedia.org/wiki/RNA_vaccine', 'https://en.wikipedia.org/wiki/Gravitational_wave', 'https://en.wikipedia.org/wiki/Higgs_boson', 'https://en.wikipedia.org/wiki/Quantum_computing', 'https://en.wikipedia.org/wiki/Ancient_DNA', 'https://en.wikipedia.org/wiki/Water_on_Mars', 'https://en.wikipedia.org/wiki/Penicillin']
验证通过，共8个有效URL
开始批量爬取 8 篇文章
进度: 1/8
开始爬取文章: https://en.wikipedia.org/wiki/CRISPR


成功爬取文章: CRISPR
等待 2 秒...
进度: 2/8
开始爬取文章: https://en.wikipedia.org/wiki/RNA_vaccine


成功爬取文章: mRNA vaccine
等待 2 秒...
进度: 3/8
开始爬取文章: https://en.wikipedia.org/wiki/Gravitational_wave


成功爬取文章: Gravitational wave
等待 2 秒...
进度: 4/8
开始爬取文章: https://en.wikipedia.org/wiki/Higgs_boson


成功爬取文章: Higgs boson
等待 2 秒...
进度: 5/8
开始爬取文章: https://en.wikipedia.org/wiki/Quantum_computing


成功爬取文章: Quantum computing
等待 2 秒...
进度: 6/8
开始爬取文章: https://en.wikipedia.org/wiki/Ancient_DNA


成功爬取文章: Ancient DNA
等待 2 秒...
进度: 7/8
开始爬取文章: https://en.wikipedia.org/wiki/Water_on_Mars


成功爬取文章: Water on Mars
等待 2 秒...
进度: 8/8
开始爬取文章: https://en.wikipedia.org/wiki/Penicillin


成功爬取文章: Penicillin
批量爬取完成，成功爬取 8 篇文章
完整数据已保存到: ./scientific_discoveries.json
主内容已保存到: ./content.md
分节信息已保存到: ./section.md

📊 抓取结果摘要:
total_articles: 8
successful_scrapes: 8
failed_scrapes: 0
total_content_length: 367048
average_content_length: 45881
success_rate: 100.0

🎉 抓取完成！共获取 8 篇文章的数据


## Part2 Structured Data Extraction

In [69]:
# # Simple JSON file reader
# import json

# # Read the saved JSON data
# with open('./scientific_discoveries.json', 'r', encoding='utf-8') as f:
#     data = json.load(f)

# print(f"✅ Successfully read data of {len(data)} articles")

In [75]:
# Pydantic model designed for scientific discovery domain
class ScientificDiscoveryExtraction(BaseModel):
    """Structured data model for scientific discovery information"""

    # Basic information
    primary_name: str = Field(description="Primary name of the scientific discovery or technology")

    # Discoverers and time
    discoverers: List[str] = Field(description="List of names of main discoverers or researchers")
    discovery_years: List[str] = Field(description="Most important discovery year, strongly recommend returning only one year. Must select the single year that best represents the core breakthrough of the technology/discovery from the article. Priority: key technical implementation > important paper publication > initial concept proposal. For example, for CRISPR, choose 2012 (Doudna and Charpentier's key paper) rather than 1987 (first sequence discovery). Avoid returning multiple years.")
    discovery_timeline: List[str] = Field(description="Timeline from initial to final discovery")

    # Technical details
    mechanism: str = Field(description="Basic working principle or mechanism of the technology")
    key_features: List[str] = Field(description="Main features or advantages of the technology")

    # Applications and impact
    applications: List[str] = Field(description="Main application fields or uses")
    significance: str = Field(description="Scientific or social significance of the discovery")

    # Optional additional information
    institutions: Optional[List[str]] = Field(default=None, description="Related research institutions or universities")
    awards: Optional[List[str]] = Field(default=None, description="Important awards or honors received")

    # Metadata
    extracted_at: str = Field(default_factory=lambda: datetime.now().isoformat(), description="Data extraction time")

print("✅ Scientific discovery domain Pydantic model definition completed")


✅ Scientific discovery domain Pydantic model definition completed


In [76]:
class StructuredDataExtractor:
    """A class for structured data extraction using the OpenAI API"""

    def __init__(self, api_key: str = None, model: str = "gpt-4o-mini"):
        """
        Initialize the extractor

        Args:
            api_key: OpenAI API key (optional, will use environment variable OPENAI_API_KEY first)
            model: Model name to use
        """
        import os

        self.model = model
        self.client = None

        # Prefer the provided API key, then the environment variable; if not found, ask interactively
        final_api_key = os.getenv('OPENAI_API_KEY')
        if not final_api_key:
            try:
                from getpass import getpass
                final_api_key = getpass('Please enter OPENAI_API_KEY (input will be hidden): ')
            except Exception:
                final_api_key = input('Please enter OPENAI_API_KEY: ')

        if final_api_key:
            self.client = openai.OpenAI(api_key=final_api_key)
            print("✅ OpenAI client initialized successfully")
        else:
            print("⚠️ No OpenAI API key found, related features will be limited")

        # Set up logging
        logging.basicConfig(level=logging.INFO)
        self.logger = logging.getLogger(__name__)

        # Retry configuration
        self.max_retries = 3
        self.retry_delay = 2  # seconds

    def extract_structured_data(self, content: str, model: str = None) -> Optional[ScientificDiscoveryExtraction]:
        """
        Extract structured data using OpenAI structured output

        Args:
            content: Text content to extract
            model: Model to use (optional)

        Returns:
            ScientificDiscoveryExtraction: Extracted structured data
        """
        if not self.client:
            self.logger.error("OpenAI client not initialized, please provide an API key")
            return None

        model_to_use = model or self.model

        # Build system prompt
        system_prompt = """
        You are a professional scientific literature analyst. Please extract structured information 
        from the given scientific article content.

        Extract the following information:
        1. Main name of the scientific discovery or technology
        2. Main discoverer(s) or researcher(s)
        3. Important discovery time points
        4. Timeline of the discovery
        5. Basic working principle of the technology
        6. Main features and advantages
        7. Application areas
        8. Scientific significance
        9. Related institutions (if any)
        10. Awards received (if any)

        **Important extraction rules**:
        - For discovery_years, follow these rules:
          * If a range of years is mentioned (e.g., "2010-2012", "from 2008 to 2011"), 
            determine which year is most relevant to the key discovery in the title
          * Prefer the year associated with the key breakthrough, first publication, or major experiment success
          * If multiple years are mentioned and it's unclear which is more important, return the earliest one
          * Example: If the title is "CRISPR gene-editing technology" and the text says 
            "Research started in 2008, breakthrough in 2012", choose 2012
          * If completely uncertain, return the earliest year in the range

        If some information is not explicitly mentioned, try to infer from the context, 
        or state "unclear" in the corresponding field.
        """

        user_prompt = f"""
        Please extract structured information from the following scientific article content:

        {content[:8000]}  # Limit length to avoid token issues

        Please extract information according to the ScientificDiscoveryExtraction model.

        **Special note on discovery_years**:
        - Carefully read the article title and content to understand the core of the scientific discovery
        - If a year range is present, decide which year best represents the key breakthrough
        - Priority: first major publication, critical technical success, or key experimental result
        - If unclear, choose the earliest year in the range
        """

        # Try extraction with retries
        for attempt in range(self.max_retries):
            try:
                self.logger.info(f"Starting extraction attempt {attempt + 1}...")

                response = self.client.beta.chat.completions.parse(
                    model=model_to_use,
                    messages=[
                        {"role": "system", "content": system_prompt},
                        {"role": "user", "content": user_prompt}
                    ],
                    response_format=ScientificDiscoveryExtraction,
                    temperature=0.1  # Low temperature for consistency
                )

                # Extract structured data
                extracted_data = response.choices[0].message.parsed

                if extracted_data:
                    self.logger.info("✅ Structured data extraction succeeded")
                    return extracted_data
                else:
                    self.logger.warning("⚠️ API returned empty data")

            except openai.RateLimitError as e:
                self.logger.warning(f"Rate limit error (attempt {attempt + 1}/{self.max_retries}): {e}")
                if attempt < self.max_retries - 1:
                    time.sleep(self.retry_delay * (2 ** attempt))  # exponential backoff

            except openai.APIError as e:
                self.logger.error(f"OpenAI API error (attempt {attempt + 1}/{self.max_retries}): {e}")
                if attempt < self.max_retries - 1:
                    time.sleep(self.retry_delay)

            except Exception as e:
                self.logger.error(f"Unknown error (attempt {attempt + 1}/{self.max_retries}): {e}")
                if attempt < self.max_retries - 1:
                    time.sleep(self.retry_delay)

        self.logger.error("❌ All extraction attempts failed")
        return None

    def batch_extract(self, articles: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
        """
        Process multiple articles in batch

        Args:
            articles: List of article data

        Returns:
            List[Dict]: List of extraction results
        """
        results = []

        self.logger.info(f"Starting batch processing of {len(articles)} articles...")

        for i, article in enumerate(articles):
            try:
                self.logger.info(f"Processing article {i+1}/{len(articles)}: {article.get('title', 'Unknown')}")

                # Check article status
                if article.get('status') != 'success':
                    self.logger.warning(f"Skipping failed article: {article.get('error', 'Unknown error')}")
                    results.append({
                        'article_info': article,
                        'extraction_status': 'skipped',
                        'extraction_error': article.get('error', 'Article scraping failed'),
                        'structured_data': None
                    })
                    continue

                # Extract main content
                content = article.get('main_content', '')
                if not content:
                    self.logger.warning("Article content is empty, skipping")
                    results.append({
                        'article_info': article,
                        'extraction_status': 'failed',
                        'extraction_error': 'Empty content',
                        'structured_data': None
                    })
                    continue

                # Run structured extraction
                extracted_data = self.extract_structured_data(content)

                if extracted_data:
                    results.append({
                        'article_info': article,
                        'extraction_status': 'success',
                        'extraction_error': None,
                        'structured_data': extracted_data.dict()
                    })
                else:
                    results.append({
                        'article_info': article,
                        'extraction_status': 'failed',
                        'extraction_error': 'Extraction failed after retries',
                        'structured_data': None
                    })

                # Delay to avoid rate limit
                if i < len(articles) - 1:
                    time.sleep(1)

            except Exception as e:
                self.logger.error(f"Error processing article: {e}")
                results.append({
                    'article_info': article,
                    'extraction_status': 'error',
                    'extraction_error': str(e),
                    'structured_data': None
                })

        # Summary statistics
        successful = len([r for r in results if r['extraction_status'] == 'success'])
        failed = len([r for r in results if r['extraction_status'] in ['failed', 'error']])
        skipped = len([r for r in results if r['extraction_status'] == 'skipped'])

        self.logger.info(f"Batch processing finished: Success {successful}, Failed {failed}, Skipped {skipped}")

        return results

    def save_extraction_results(self, results: List[Dict[str, Any]], filename: str):
        """
        Save extraction results to a file

        Args:
            results: List of extraction results
            filename: Output file name
        """
        try:
            with open(filename, 'w', encoding='utf-8') as f:
                json.dump(results, f, ensure_ascii=False, indent=2, default=str)
            self.logger.info(f"✅ Extraction results saved to: {filename}")
        except Exception as e:
            self.logger.error(f"❌ Error saving file: {e}")

print("✅ Structured data extractor class definition completed")


✅ Structured data extractor class definition completed


In [77]:
def perform_structured_extraction(scraper_result):
    # Perform structured data extraction - this was the missing call code!
    print("🚀 Starting structured data extraction...")
    print("=" * 60)

    # Check if API key is set
    if not os.getenv('OPENAI_API_KEY'):
        print("⚠️ Please set the OpenAI API key first:")
        print("   os.environ['OPENAI_API_KEY'] = 'your-api-key-here'")
        print("   Then re-run this cell")
    else:
        try:
            # 1. Initialize the extractor (using the previously defined class)
            extractor = StructuredDataExtractor()

            if scraper_result is not None:
                scraped_articles = scraper_result
                print(f"📖 Using the provided scraping results")
            else:
                print(f"📖 No scraping results provided, trying to load from file")
                # 2. Read previously scraped data
                with open('./scientific_discoveries.json', 'r', encoding='utf-8') as f:
                    scraped_articles = json.load(f)

            print(f"📖 Successfully loaded {len(scraped_articles)} articles")

            # 3. Perform batch structured extraction (call the core method!)
            print("🔄 Starting batch structured extraction...")
            extraction_results = extractor.batch_extract(scraped_articles)

            # 4. Save extraction results (call the save method!)
            extractor.save_extraction_results(extraction_results, './structured_extractions.json')

            # 5. Show extraction statistics
            successful = [r for r in extraction_results if r['extraction_status'] == 'success']
            failed = [r for r in extraction_results if r['extraction_status'] != 'success']

            print(f"\n📊 Extraction statistics:")
            print(f"✅ Successfully extracted: {len(successful)} articles")
            print(f"❌ Failed to extract: {len(failed)} articles")

            # 6. Display the first successful extraction result
            if successful:
                first_result = successful[0]
                print(f"\n📄 Sample extraction result - {first_result['article_info']['title']}:")
                print("-" * 50)
                structured = first_result['structured_data']
                print(f"📝 Primary name: {structured['primary_name']}")
                print(f"👥 Discoverers: {', '.join(structured['discoverers'])}")
                print(f"📅 Discovery years: {', '.join(structured['discovery_years'])}")
                print(f"🔬 Applications: {len(structured['applications'])} items")
                print(f"⭐ Key features: {len(structured['key_features'])} items")

            print(f"\n✅ Structured extraction completed! Results saved to './structured_extractions.json'")

            return extraction_results
        except Exception as e:
            print(f"❌ Execution error: {str(e)}")
            print("Please check the API key settings and ensure the data file exists")


## Part 3: Function Calling Implementation

In [None]:
class OpenAIAnalyzer:
    """OpenAI分析器类，用于智能匹配和数据分析"""
    
    def __init__(self, api_key: Optional[str] = None):
        """初始化OpenAI客户端
        
        Args:
            api_key: OpenAI API密钥，如果为None则从环境变量获取
        """
        if api_key is None:
            api_key = os.getenv('OPENAI_API_KEY')
            if not api_key:
                raise ValueError("请设置OPENAI_API_KEY环境变量或传入api_key参数")
        
        self.client = OpenAI(api_key=api_key)
        print("✅ OpenAI客户端初始化成功")
    
    def analyze_query_match(self, user_query: str, available_names: List[str]) -> Dict[str, Any]:
        """分析用户查询与可用名称的匹配度
        
        Args:
            user_query: 用户输入的查询
            available_names: 可用的名称列表
            
        Returns:
            包含最佳匹配和置信度的字典
        """
        try:
            # 构建提示词
            prompt = f"""
            用户查询: "{user_query}"
            
            可用的科学发现名称列表:
            {json.dumps(available_names, ensure_ascii=False, indent=2)}
            
            请分析用户查询与可用名称的匹配度，返回JSON格式结果:
            {{
                "best_match": "最匹配的名称",
                "confidence": 0.0-1.0的置信度,
                "reasoning": "匹配原因",
                "alternative_matches": ["其他可能的匹配项"]
            }}
            
            注意：
            1. 考虑同义词、缩写、不同表达方式
            2. 如果找不到合适匹配，confidence设为0.0
            3. 只返回JSON，不要其他文字
            """
            
            response = self.client.chat.completions.create(
                model="gpt-4o-mini",
                messages=[
                    {"role": "system", "content": "你是一个专业的科学数据匹配助手，擅长分析查询与数据的匹配度。"},
                    {"role": "user", "content": prompt}
                ],
                temperature=0.1
            )
            
            result = json.loads(response.choices[0].message.content)
            return result
            
        except Exception as e:
            print(f"❌ OpenAI分析出错: {e}")
            return {
                "best_match": None,
                "confidence": 0.0,
                "reasoning": f"分析失败: {e}",
                "alternative_matches": []
            }
    
    def suggest_search_strategy(self, user_query: str, data_sample: List[Dict[str, Any]]) -> Dict[str, Any]:
        """建议搜索策略
        
        Args:
            user_query: 用户查询
            data_sample: 数据样本
            
        Returns:
            搜索策略建议
        """
        try:
            prompt = f"""
            用户查询: "{user_query}"
            
            数据样本结构:
            {json.dumps(data_sample[:3], ensure_ascii=False, indent=2)}
            
            请分析用户查询意图，建议最佳搜索策略，返回JSON格式:
            {{
                "search_type": "name|scientist|application|general",
                "search_keywords": ["关键词1", "关键词2"],
                "explanation": "搜索策略说明"
            }}
            """
            
            response = self.client.chat.completions.create(
                model="gpt-4o-mini",
                messages=[
                    {"role": "system", "content": "你是一个科学数据搜索策略专家。"},
                    {"role": "user", "content": prompt}
                ],
                temperature=0.1
            )
            
            return json.loads(response.choices[0].message.content)
            
        except Exception as e:
            print(f"❌ 搜索策略分析出错: {e}")
            return {
                "search_type": "general",
                "search_keywords": [user_query],
                "explanation": f"分析失败，使用通用搜索: {e}"
            }

print("✅ OpenAIAnalyzer类定义完成")


***3.1 Scientific Data Query***

In [91]:
class ScientificDataQuery:
    """Scientific discovery data query class - simple data access functions
    
    This class provides basic data access functions. Fuzzy matching is handled by OpenAI.
    """

    def __init__(self, data_file: str = "./structured_extractions.json", data: Optional[List[Dict[str, Any]]] = None):
        """Initialize the query class with data loading
        
        Args:
            data_file: Path to the structured data file (default: "./structured_extractions.json")
            data: Optional in-memory data to use instead of loading from file
        """
        try:
            if data is not None:
                # Use in-memory data directly
                self.raw_data = data
                print(f"✅ Using provided in-memory data with {len(data)} items")
            else:
                # Load from file (default behavior for function calls)
                with open(data_file, 'r', encoding='utf-8') as f:
                    self.raw_data = json.load(f)
                print(f"✅ Loaded data from {data_file}")

            # Extract successful structured data
            self.discoveries = []
            for item in self.raw_data:
                if isinstance(item, dict):
                    if item.get('extraction_status') == 'success' and item.get('extracted_data'):
                        # Merge extracted data with metadata
                        extracted_data = item['extracted_data']
                        if isinstance(extracted_data, dict):
                            self.discoveries.append({**item, **extracted_data})
                        else:
                            print(f"⚠️ Skipping item with malformed 'extracted_data': {item.get('title', 'Unknown Title')}")
                    elif isinstance(item, dict) and 'primary_name' in item:
                        # Already structured data
                        self.discoveries.append(item)

            print(f"✅ Successfully loaded {len(self.discoveries)} scientific discovery data")

        except FileNotFoundError:
            print(f"❌ Error: Data file not found at {data_file}")
            self.discoveries = []
        except json.JSONDecodeError:
            print(f"❌ Error: Could not decode JSON from {data_file}")
            self.discoveries = []
        except Exception as e:
            print(f"❌ An unexpected error occurred during data loading: {e}")
            self.discoveries = []

    def find_discovery_by_name(self, discovery_name: str) -> Optional[Dict[str, Any]]:
        """Find a scientific discovery by exact name match"""
        discovery_name_lower = discovery_name.lower().strip()
        for discovery in self.discoveries:
            if discovery.get('primary_name', '').lower() == discovery_name_lower:
                return discovery
        return None

    def find_discoveries_by_scientist(self, scientist_name: str) -> List[Dict[str, Any]]:
        """Find all discoveries by a scientist (exact name match)"""
        scientist_name_lower = scientist_name.lower().strip()
        matched_discoveries = []
        
        for discovery in self.discoveries:
            for discoverer in discovery.get('discoverers', []):
                if discoverer.lower() == scientist_name_lower:
                    matched_discoveries.append(discovery)
                    break
        
        return matched_discoveries

    def get_all_discoveries(self) -> List[Dict[str, Any]]:
        """Get all available scientific discoveries"""
        return self.discoveries

    def get_discovery_details(self, discovery_name: str) -> Optional[Dict[str, Any]]:
        """Get comprehensive details about a specific scientific discovery"""
        discovery = self.find_discovery_by_name(discovery_name)
        if discovery:
            return {
                "primary_name": discovery.get('primary_name'),
                "discoverers": discovery.get('discoverers', []),
                "discovery_years": discovery.get('discovery_years', []),
                "mechanism": discovery.get('mechanism', ''),
                "key_features": discovery.get('key_features', []),
                "applications": discovery.get('applications', []),
                "significance": discovery.get('significance', ''),
                "institutions": discovery.get('institutions', []),
                "awards": discovery.get('awards', []),
                "url": discovery.get('url', '')
            }
        return None

    def search_by_application(self, application_keyword: str) -> List[Dict[str, Any]]:
        """Search for scientific discoveries related to a specific application keyword"""
        results = []
        keyword_lower = application_keyword.lower()
        for discovery in self.discoveries:
            applications = discovery.get('applications', [])
            if any(keyword_lower in app.lower() for app in applications):
                results.append(discovery)
        return results

print("✅ ScientificDataQuery class definition completed")

✅ ScientificDataQuery class definition completed


***3.2 Functions***

In [80]:
def compare_discoveries(discovery1: str, discovery2: str) -> str:
    """
    Compare two scientific discoveries

    Args:
        discovery1: Name of the first discovery
        discovery2: Name of the second discovery

    Returns:
        Detailed comparison analysis
    """

    query = ScientificDataQuery()

    # Find the two discoveries
    disc1 = query.find_discovery_by_name(discovery1)
    disc2 = query.find_discovery_by_name(discovery2)

    if not disc1:
        return f"❌ Discovery not found: {discovery1}"
    if not disc2:
        return f"❌ Discovery not found: {discovery2}"

    # Build comparison analysis
    comparison = f"""
    🔬 Scientific Discovery Comparison
    {'='*50}

    📍 Discovery 1: {disc1['primary_name']}
    📍 Discovery 2: {disc2['primary_name']}

    👥 Discoverers:
    • {disc1['primary_name']}: {', '.join(disc1['discoverers'])}
    • {disc2['primary_name']}: {', '.join(disc2['discoverers'])}

    📅 Discovery Years:
    • {disc1['primary_name']}: {', '.join(disc1['discovery_years'])}
    • {disc2['primary_name']}: {', '.join(disc2['discovery_years'])}

    🔧 Mechanisms:
    • {disc1['primary_name']}: {disc1['mechanism']}
    • {disc2['primary_name']}: {disc2['mechanism']}

    ⭐ Key Features:
    • {disc1['primary_name']}: {', '.join(disc1['key_features'])}
    • {disc2['primary_name']}: {', '.join(disc2['key_features'])}

    🎯 Applications:
    • {disc1['primary_name']}: {', '.join(disc1['applications'])}
    • {disc2['primary_name']}: {', '.join(disc2['applications'])}

    🏆 Awards:
    • {disc1['primary_name']}: {', '.join(disc1.get('awards', ['No award information']))}
    • {disc2['primary_name']}: {', '.join(disc2.get('awards', ['No award information']))}

    💡 Significance:
    • {disc1['primary_name']}: {disc1['significance']}
    • {disc2['primary_name']}: {disc2['significance']}

    🏛️ Institutions:
    • {disc1['primary_name']}: {', '.join(disc1.get('institutions', ['No institution info']))}
    • {disc2['primary_name']}: {', '.join(disc2.get('institutions', ['No institution info']))}
    """

    return comparison


def get_research_timeline(scientist: str) -> str:
    """
    Get the research timeline of a scientist

    Args:
        scientist: Scientist name

    Returns:
        Timeline of the scientist's discoveries
    """

    query = ScientificDataQuery()
    discoveries = query.find_discoveries_by_scientist(scientist)

    if not discoveries:
        return f"❌ No discoveries found for scientist '{scientist}'"

    # Sort by discovery year
    sorted_discoveries = sorted(discoveries,
                                key=lambda x: int(x['discovery_years'][0]) if x['discovery_years'] else 0)

    timeline = f"""
    👨‍🔬 Research Timeline of {scientist}
    {'='*50}
    """

    for i, discovery in enumerate(sorted_discoveries, 1):
        timeline += f"""
    🔬 Discovery {i}: {discovery['primary_name']}
    📅 Year: {', '.join(discovery['discovery_years'])}
    🎯 Significance: {discovery['significance']}
    🏛️ Institutions: {', '.join(discovery.get('institutions', ['None']))}
    🏆 Awards: {', '.join(discovery.get('awards', ['None']))}
    """

    return timeline


def search_by_application(application: str) -> str:
    """
    Search for discoveries by application area

    Args:
        application: Application keyword

    Returns:
        List of relevant discoveries
    """

    query = ScientificDataQuery()
    all_discoveries = query.get_all_discoveries()

    application_lower = application.lower()
    relevant_discoveries = []

    for discovery in all_discoveries:
        # Search within applications
        for app in discovery.get('applications', []):
            if application_lower in app.lower():
                relevant_discoveries.append(discovery)
                break

    if not relevant_discoveries:
        return f"❌ No discoveries found related to '{application}'"

    result = f"""
    🎯 Discoveries related to '{application}'
    {'='*50}
    """

    for i, discovery in enumerate(relevant_discoveries, 1):
        result += f"""
        🔬 Discovery {i}: {discovery['primary_name']}
        👥 Discoverers: {', '.join(discovery['discoverers'])}
        📅 Year: {', '.join(discovery['discovery_years'])}
        🎯 Applications: {', '.join(discovery['applications'])}
        💡 Significance: {discovery['significance']}
        """

    return result


def get_discovery_details(discovery_name: str) -> str:
    """
    Get detailed information of a specific discovery

    Args:
        discovery_name: Name of the scientific discovery

    Returns:
        Detailed information
    """

    query = ScientificDataQuery()
    discovery = query.find_discovery_by_name(discovery_name)

    if not discovery:
        return f"❌ Discovery not found: {discovery_name}"

    details = f"""
    🔬 {discovery['primary_name']} - Details
    {'='*60}

    👥 Discoverers: {', '.join(discovery['discoverers'])}

    📅 Discovery Years: {', '.join(discovery['discovery_years'])}

    🔧 Mechanism:
    {discovery['mechanism']}

    ⭐ Key Features:
    {chr(10).join([f"  • {feature}" for feature in discovery['key_features']])}

    🎯 Applications:
    {chr(10).join([f"  • {app}" for app in discovery['applications']])}

    💡 Significance:
    {discovery['significance']}

    🏛️ Institutions: {', '.join(discovery.get('institutions', ['None']))}

    🏆 Awards: {', '.join(discovery.get('awards', ['None']))}

    🔗 URL: {discovery.get('url', 'No URL available')}
    """

    return details

print("✅ Part 3 function definitions completed")

✅ Part 3 function definitions completed


***3.3 Function Scheme***

In [81]:
FUNCTION_SCHEMAS = [
    {
        "name": "compare_discoveries",
        "description": "Compare two scientific breakthroughs in detail. Accepts discovery names, keywords, or partial names (e.g., 'CRISPR', '基因编辑', 'mRNA', '疫苗', '引力波'). Supports fuzzy matching in both English and Chinese.",
        "parameters": {
            "type": "object",
            "properties": {
                "discovery1": {
                    "type": "string",
                    "description": "First scientific discovery name or keyword (supports full name, abbreviation, partial match, or Chinese/English keyword)"
                },
                "discovery2": {
                    "type": "string",
                    "description": "Second scientific discovery name or keyword (supports full name, abbreviation, partial match, or Chinese/English keyword)"
                }
            },
            "required": ["discovery1", "discovery2"]
        }
    },
    {
        "name": "get_research_timeline",
        "description": "Get chronological timeline of a scientist's major discoveries. Accepts names in full, partial, or last name only (e.g., 'Jennifer Doudna', 'Doudna', '爱因斯坦', 'Einstein'). Supports fuzzy matching in both English and Chinese.",
        "parameters": {
            "type": "object",
            "properties": {
                "scientist": {
                    "type": "string",
                    "description": "Scientist's name or keyword (supports full name, partial name, or last name in English/Chinese)"
                }
            },
            "required": ["scientist"]
        }
    },
    {
        "name": "search_by_application",
        "description": "Search for scientific discoveries by their application domain or usage field. Accepts keywords in English or Chinese (e.g., 'medicine', 'agriculture', 'biotechnology', '医学', '农业').",
        "parameters": {
            "type": "object",
            "properties": {
                "application": {
                    "type": "string",
                    "description": "Application domain keyword (e.g., 'medicine', 'agriculture', 'biotechnology', '材料科学')"
                }
            },
            "required": ["application"]
        }
    },
    {
        "name": "get_discovery_details",
        "description": "Get comprehensive details about a specific scientific discovery. Accepts discovery names, abbreviations, or keywords (e.g., 'CRISPR', '基因编辑', 'mRNA', '疫苗'). Supports fuzzy matching in both English and Chinese.",
        "parameters": {
            "type": "object",
            "properties": {
                "discovery_name": {
                    "type": "string",
                    "description": "Scientific discovery name or keyword (supports partial name, abbreviation, or Chinese/English keyword)"
                }
            },
            "required": ["discovery_name"]
        }
    }
]


***3.4 Assistant***

In [82]:
class ScientificResearchAssistant:
    """科学发现交互式研究助手"""

    def __init__(self):
        """初始化助手"""
        self.client = None

        # 检查API密钥（支持交互式输入）
        api_key = os.getenv('OPENAI_API_KEY')
        if not api_key:
            try:
                from getpass import getpass
                api_key = getpass('请输入 OPENAI_API_KEY（输入内容不可见）：')
            except Exception:
                api_key = input('请输入 OPENAI_API_KEY：')

        if api_key:
            self.client = openai.OpenAI(api_key=api_key)
            print("✅ 科学研究助手初始化成功")
        else:
            print("⚠️ 未设置OpenAI API密钥，功能将受限")

        # 可用函数映射
        self.available_functions = {
            "compare_discoveries": compare_discoveries,
            "get_research_timeline": get_research_timeline,
            "search_by_application": search_by_application,
            "get_discovery_details": get_discovery_details
        }

    def chat(self, user_message: str) -> str:
        """与助手对话"""

        if not self.client:
            return "❌ OpenAI API未配置，无法进行智能对话"

        try:
            # 调用OpenAI API with Function Calling
            response = self.client.chat.completions.create(
                model="gpt-4o-mini",
                messages=[
                    {
                        "role": "system",
                        "content":"""
                        你是一个专业的科学发现研究助手。你可以帮助用户：
                        1. 比较不同的科学发现
                        2. 查看科学家的研究时间线
                        3. 根据应用领域搜索发现
                        4. 获取特定发现的详细信息
                        请根据用户的问题，选择合适的函数来回答。"""
                    },
                    {"role": "user", "content": user_message}
                ],
                functions=FUNCTION_SCHEMAS,
                function_call="auto"
            )

            message = response.choices[0].message

            # 检查是否需要函数调用
            if message.function_call:
                function_name = message.function_call.name
                function_args = json.loads(message.function_call.arguments)

                print(f"🔧 调用函数: {function_name}")
                print(f"📝 参数: {function_args}")

                # 执行函数
                if function_name in self.available_functions:
                    result = self.available_functions[function_name](**function_args)
                    return result
                else:
                    return f"❌ 未知函数: {function_name}"

            else:
                return message.content

        except Exception as e:
            return f"❌ 处理请求时发生错误: {str(e)}"

    def show_available_data(self) -> str:
        """显示可用的数据"""
        query = ScientificDataQuery()
        discoveries = query.get_all_discoveries()

        if not discoveries:
            return "❌ 暂无可用数据"

        result = "📚 可用的科学发现数据:\n" + "="*40 + "\n\n"

        for i, discovery in enumerate(discoveries, 1):
            result += f"{i}. {discovery['primary_name']}\n"
            result += f"   发现者: {', '.join(discovery['discoverers'])}\n"
            result += f"   年份: {', '.join(discovery['discovery_years'])}\n\n"

        return result


***Instance***

In [83]:
def assistant_chat(example_queries: List[str]):
    """主函数演示"""
    print("🔬 科学发现交互式研究助手")
    print("="*50)

    # 初始化助手
    assistant = ScientificResearchAssistant()

    # 显示可用数据
    print(assistant.show_available_data())

    if example_queries !=[]:
        print("✅ 使用传入的示例对话")
    else:
        print("✅ 未传入示例对话，使用默认示例")
        example_queries = [
            "请比较CRISPR和量子计算两个科学发现",
            "Jennifer Doudna有哪些重要研究发现？",
            "搜索在医学领域应用的科学发现",
            "给我详细介绍一下CRISPR技术"
        ]
        for q in example_queries:
            print("示例问题:", q)

    # 进行对话
    for q in example_queries:
        print("Question:", q)
        print("Answer:", assistant.chat(q))

    return assistant


# Part 4: Integration and Demonstration

## 🎯 目标
创建一个全面的演示，展示所有组件如何协同工作，包括：
- 完整的端到端工作流程
- 所有主要功能演示
- 错误处理示例
- 数据可视化
- 可运行的完整管道


In [87]:
search_terms = [
        "CRISPR",
        "RNA vaccine",
        "Gravitational wave",
        "Higgs boson",
        "Quantum computing",
        "Ancient DNA",
        "Water on Mars",
        "Penicillin"
        ]

example_queries = [
    "Please compare the scientific discoveries of CRISPR and quantum computing.",
    "What are the major research discoveries of Jennifer Doudna?",
    "Search for scientific discoveries applied in the field of medicine.",
    "Give me a detailed introduction to CRISPR technology."
]

In [89]:
class ComprehensiveDemo:
    """综合演示类 - 展示完整的端到端工作流程"""

    def __init__(self):
        """初始化演示系统"""
        self.scraper_result = None
        self.extractor_result = None
        self.assistant = None
        self.query = None

        print("🚀 综合演示系统初始化完成")

    async def run_complete_pipeline(self):
        """运行完整的端到端管道"""
        print("=" * 80)
        print("🔬 科学发现研究系统 - 完整演示")
        print("=" * 80)

        try:
            # Part 1: 执行数据抓取（若传空列表则使用默认搜索词）
            # self.scraper_result = await scrape_scientific_discoveries(search_terms)

            with open('./scientific_discoveries.json', 'r', encoding='utf-8') as f:
                data = json.load(f)
            self.scraper_result = data

            # Part 2: 执行结构化数据提取（基于抓取结果或已存在文件）
            self.extractor_result = perform_structured_extraction(self.scraper_result)

            # 初始化查询器，供可视化与助手使用
            self.query = ScientificDataQuery()

            # Part 3: 初始化交互式助手并进行示例对话
            self.assistant = assistant_chat(example_queries)

            # 可视化演示
            self._demo_visualization()

            print("\n✅ 完整演示完成！")

        except Exception as e:
            print(f"❌ 演示过程中发生错误: {str(e)}")


    def _demo_visualization(self):
        """演示数据可视化"""
        print("-" * 50)

        if not self.query or not self.query.discoveries:
            print("❌ 没有数据可供可视化")
            return

        # 创建数据可视化
        # self._create_discovery_timeline()
        # self._create_discoverer_network()
        # self._create_application_distribution()

    def _create_discovery_timeline(self):
        """创建发现时间线图"""
        print("📅 创建科学发现时间线图...")

        try:
            # 准备数据（来自结构化结果）
            discoveries = self.query.discoveries
            years = []
            names = []

            import re
            def parse_year(value: str):
                if not value:
                    return None
                m = re.search(r"\b(1|2)\d{3}\b", str(value))
                return int(m.group(0)) if m else None

            for discovery in discoveries:
                years_raw = discovery.get('discovery_years') or []
                y = parse_year(years_raw[0] if years_raw else "")
                if y is None:
                    continue
                years.append(y)
                names.append(discovery['primary_name'])

            # 创建时间线图
            plt.figure(figsize=(12, 6))
            plt.scatter(years, range(len(years)), s=100, alpha=0.7, c='skyblue', edgecolors='navy')

            for i, (year, name) in enumerate(zip(years, names)):
                plt.annotate(f"{year}: {name[:20]}...",
                        (year, i),
                        xytext=(10, 0),
                        textcoords='offset points',
                        fontsize=8,
                        ha='left')

            plt.xlabel('发现年份')
            plt.ylabel('科学发现')
            plt.title('科学发现时间线')
            plt.grid(True, alpha=0.3)
            plt.tight_layout()
            plt.show()

            print("✅ 时间线图创建成功")

        except Exception as e:
            print(f"❌ 时间线图创建失败: {str(e)}")

    def _create_discoverer_network(self):
        """创建发现者网络图"""
        print("👥 创建发现者网络图...")

        try:
            # 统计发现者
            discoverer_count = {}
            for discovery in self.query.discoveries:
                for discoverer in discovery.get('discoverers', []):
                    discoverer_count[discoverer] = discoverer_count.get(discoverer, 0) + 1

            # 创建柱状图
            if discoverer_count:
                plt.figure(figsize=(10, 6))
                names = list(discoverer_count.keys())
                counts = list(discoverer_count.values())

                bars = plt.bar(range(len(names)), counts, color='lightcoral', alpha=0.7)
                plt.xlabel('发现者')
                plt.ylabel('发现数量')
                plt.title('发现者贡献统计')
                plt.xticks(range(len(names)), [name.split()[-1] for name in names], rotation=45)

                # 添加数值标签
                for bar, count in zip(bars, counts):
                    plt.text(bar.get_x() + bar.get_width()/2, bar.get_height() + 0.1,
                            str(count), ha='center', va='bottom')

                plt.tight_layout()
                plt.show()

                print("✅ 发现者网络图创建成功")
            else:
                print("⚠️ 没有发现者数据")

        except Exception as e:
            print(f"❌ 发现者网络图创建失败: {str(e)}")

    def _create_application_distribution(self):
        """创建应用领域分布图"""
        print("🎯 创建应用领域分布图...")

        try:
            # 统计应用领域
            application_count = {}
            for discovery in self.query.discoveries:
                for app in discovery.get('applications', []):
                    # 简化应用领域名称
                    app_key = app.lower().strip()
                    if 'research' in app_key:
                        app_key = 'Research'
                    elif 'medicine' in app_key or 'medical' in app_key:
                        app_key = 'Medicine'
                    elif 'biotechnology' in app_key or 'biotech' in app_key:
                        app_key = 'Biotechnology'
                    elif 'treatment' in app_key or 'therapy' in app_key:
                        app_key = 'Treatment'
                    else:
                        app_key = app_key.title()

                    application_count[app_key] = application_count.get(app_key, 0) + 1

            # 创建饼图
            if application_count:
                plt.figure(figsize=(8, 8))
                labels = list(application_count.keys())
                sizes = list(application_count.values())
                colors = plt.cm.Set3(range(len(labels)))

                plt.pie(sizes, labels=labels, colors=colors, autopct='%1.1f%%', startangle=90)
                plt.title('科学发现应用领域分布')
                plt.axis('equal')
                plt.show()

                print("✅ 应用领域分布图创建成功")
            else:
                print("⚠️ 没有应用领域数据")

        except Exception as e:
            print(f"❌ 应用领域分布图创建失败: {str(e)}")

print("✅ 综合演示类定义完成")


✅ 综合演示类定义完成


In [None]:
# 运行完整的Part 4演示
def run_part4_demo():
    """运行Part 4完整演示"""
    print("🎬 开始Part 4: Integration and Demonstration")
    print("=" * 60)

    # 创建演示实例
    demo = ComprehensiveDemo()

    # 运行完整管道（本函数需在 Jupyter 中用 await 调用）
    import nest_asyncio, asyncio
    nest_asyncio.apply()
    asyncio.get_event_loop().run_until_complete(demo.run_complete_pipeline())

    print("\n🎉 Part 4演示完成！")
    print("=" * 60)
    print("📋 演示内容总结:")
    print("✅ 1. 完整端到端工作流程")
    print("✅ 2. 所有主要功能演示")
    print("✅ 3. 错误处理示例")
    print("✅ 4. 数据可视化")
    print("✅ 5. 交互式助手演示")

# 执行演示
run_part4_demo()


🎬 开始Part 4: Integration and Demonstration
🚀 综合演示系统初始化完成
🔬 科学发现研究系统 - 完整演示
🚀 Starting structured data extraction...
✅ OpenAI client initialized successfully
📖 Using the provided scraping results
📖 Successfully loaded 8 articles
🔄 Starting batch structured extraction...


INFO:__main__:Starting batch processing of 8 articles...
INFO:__main__:Processing article 1/8: CRISPR
INFO:__main__:Starting extraction attempt 1...
INFO:httpx:HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"
INFO:__main__:✅ Structured data extraction succeeded
INFO:__main__:Processing article 2/8: mRNA vaccine
INFO:__main__:Starting extraction attempt 1...
INFO:httpx:HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"
INFO:__main__:✅ Structured data extraction succeeded
INFO:__main__:Processing article 3/8: Gravitational wave
INFO:__main__:Starting extraction attempt 1...
INFO:httpx:HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"
INFO:__main__:✅ Structured data extraction succeeded
INFO:__main__:Processing article 4/8: Higgs boson
INFO:__main__:Starting extraction attempt 1...
INFO:httpx:HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"
INFO:__main__:✅ Structured da


📊 Extraction statistics:
✅ Successfully extracted: 8 articles
❌ Failed to extract: 0 articles

📄 Sample extraction result - CRISPR:
--------------------------------------------------
📝 Primary name: CRISPR-Cas9 genome editing technology
👥 Discoverers: Jennifer Doudna, Emmanuelle Charpentier, Francisco Mojica, Yoshizumi Ishino, Rodolphe Barrangou, Ruud Jansen
📅 Discovery years: 2012
🔬 Applications: 4 items
⭐ Key features: 4 items

✅ Structured extraction completed! Results saved to './structured_extractions.json'
✅ Loaded data from ./structured_extractions.json
✅ Successfully loaded 0 scientific discovery data
🔬 科学发现交互式研究助手
✅ 科学研究助手初始化成功
✅ Loaded data from ./structured_extractions.json
✅ Successfully loaded 0 scientific discovery data
❌ 暂无可用数据
✅ 使用传入的示例对话
Question: Please compare the scientific discoveries of CRISPR and quantum computing.


INFO:httpx:HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"


🔧 调用函数: compare_discoveries
📝 参数: {'discovery1': 'CRISPR', 'discovery2': 'quantum computing'}
✅ Loaded data from ./structured_extractions.json
✅ Successfully loaded 0 scientific discovery data
Answer: ❌ Discovery not found: CRISPR
Question: What are the major research discoveries of Jennifer Doudna?


INFO:httpx:HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"


🔧 调用函数: get_research_timeline
📝 参数: {'scientist': 'Jennifer Doudna'}
✅ Loaded data from ./structured_extractions.json
✅ Successfully loaded 0 scientific discovery data
Answer: ❌ No discoveries found for scientist 'Jennifer Doudna'
Question: Search for scientific discoveries applied in the field of medicine.


INFO:httpx:HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"


🔧 调用函数: search_by_application
📝 参数: {'application': 'medicine'}
✅ Loaded data from ./structured_extractions.json
✅ Successfully loaded 0 scientific discovery data
Answer: ❌ No discoveries found related to 'medicine'
Question: Give me a detailed introduction to CRISPR technology.


INFO:httpx:HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"


🔧 调用函数: get_discovery_details
📝 参数: {'discovery_name': 'CRISPR'}
✅ Loaded data from ./structured_extractions.json
✅ Successfully loaded 0 scientific discovery data
Answer: ❌ Discovery not found: CRISPR
--------------------------------------------------
❌ 没有数据可供可视化

✅ 完整演示完成！

🎉 Part 4演示完成！
📋 演示内容总结:
✅ 1. 完整端到端工作流程
✅ 2. 所有主要功能演示
✅ 3. 错误处理示例
✅ 4. 数据可视化
✅ 5. 交互式助手演示
