# Module 3: Web Fetcher

Retrieve and clean web content for processing by other modules.

**Features:**
- URL fetching with proper headers and timeouts
- HTML → clean text extraction
- Rate limiting (be a good citizen)
- Caching (don't re-fetch unnecessarily)
- Error handling for unreachable sites

**Why build this?** The Research Agent needs clean text from web pages. Raw HTML is unusable by LLMs.

## Install Additional Dependencies

We need a few extra packages for robust HTML extraction.

In [None]:
# Install trafilatura - excellent for extracting article text from HTML
!pip install --user trafilatura

## Implementation

In [None]:
import requests
import time
import hashlib
import json
from pathlib import Path
from dataclasses import dataclass
from typing import Optional
from urllib.parse import urlparse

# trafilatura is excellent at extracting main content from web pages
import trafilatura
from bs4 import BeautifulSoup


@dataclass
class FetchResult:
    """Result of fetching a URL."""
    url: str
    success: bool
    content: Optional[str] = None       # Clean text content
    title: Optional[str] = None         # Page title
    html: Optional[str] = None          # Raw HTML (if needed)
    error: Optional[str] = None         # Error message if failed
    elapsed_time: float = 0.0
    from_cache: bool = False
    
    def __str__(self):
        if self.success:
            preview = self.content[:200] + "..." if len(self.content or "") > 200 else self.content
            return f"[{self.title}]\n{preview}"
        return f"Error: {self.error}"
    
    def word_count(self) -> int:
        if self.content:
            return len(self.content.split())
        return 0


class WebFetcher:
    """Fetch and extract clean text from web pages."""
    
    # Polite user agent
    USER_AGENT = (
        "Mozilla/5.0 (compatible; ResearchBot/1.0; "
        "+https://github.com/your-username/research-agent)"
    )
    
    def __init__(
        self,
        cache_dir: Optional[Path] = None,
        timeout: float = 30.0,
        rate_limit: float = 1.0,  # Minimum seconds between requests to same domain
    ):
        self.cache_dir = cache_dir or Path("/tmp/web_cache")
        self.cache_dir.mkdir(parents=True, exist_ok=True)
        self.timeout = timeout
        self.rate_limit = rate_limit
        self._last_request_time: dict[str, float] = {}  # domain -> timestamp
        
        # Session for connection pooling
        self.session = requests.Session()
        self.session.headers.update({
            "User-Agent": self.USER_AGENT,
            "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
            "Accept-Language": "en-US,en;q=0.5",
        })
    
    def fetch(self, url: str, use_cache: bool = True) -> FetchResult:
        """Fetch a URL and extract clean text."""
        start = time.time()
        
        # Check cache first
        if use_cache:
            cached = self._get_from_cache(url)
            if cached:
                cached.from_cache = True
                cached.elapsed_time = time.time() - start
                return cached
        
        # Rate limiting
        self._rate_limit(url)
        
        try:
            # Fetch the page
            response = self.session.get(url, timeout=self.timeout)
            response.raise_for_status()
            
            html = response.text
            
            # Extract clean text using trafilatura (best for articles)
            content = trafilatura.extract(
                html,
                include_comments=False,
                include_tables=True,
                no_fallback=False,
            )
            
            # Fallback to BeautifulSoup if trafilatura returns nothing
            if not content:
                content = self._extract_with_beautifulsoup(html)
            
            # Extract title
            title = self._extract_title(html)
            
            result = FetchResult(
                url=url,
                success=True,
                content=content,
                title=title,
                html=html,
                elapsed_time=time.time() - start,
            )
            
            # Cache the result
            if use_cache:
                self._save_to_cache(url, result)
            
            return result
            
        except requests.exceptions.Timeout:
            return FetchResult(
                url=url, success=False,
                error=f"Timeout after {self.timeout}s",
                elapsed_time=time.time() - start,
            )
        except requests.exceptions.HTTPError as e:
            return FetchResult(
                url=url, success=False,
                error=f"HTTP {e.response.status_code}",
                elapsed_time=time.time() - start,
            )
        except Exception as e:
            return FetchResult(
                url=url, success=False,
                error=str(e),
                elapsed_time=time.time() - start,
            )
    
    def fetch_multiple(self, urls: list[str], use_cache: bool = True) -> list[FetchResult]:
        """Fetch multiple URLs (respecting rate limits)."""
        results = []
        for url in urls:
            result = self.fetch(url, use_cache=use_cache)
            results.append(result)
            print(f"{'✅' if result.success else '❌'} {url[:60]}...")
        return results
    
    def _extract_title(self, html: str) -> Optional[str]:
        """Extract page title from HTML."""
        soup = BeautifulSoup(html, 'html.parser')
        title_tag = soup.find('title')
        if title_tag:
            return title_tag.get_text().strip()
        # Try og:title as fallback
        og_title = soup.find('meta', property='og:title')
        if og_title:
            return og_title.get('content', '').strip()
        return None
    
    def _extract_with_beautifulsoup(self, html: str) -> str:
        """Fallback text extraction using BeautifulSoup."""
        soup = BeautifulSoup(html, 'html.parser')
        
        # Remove script and style elements
        for element in soup(['script', 'style', 'nav', 'footer', 'header']):
            element.decompose()
        
        # Get text
        text = soup.get_text(separator='\n', strip=True)
        
        # Clean up whitespace
        lines = [line.strip() for line in text.splitlines() if line.strip()]
        return '\n'.join(lines)
    
    def _rate_limit(self, url: str):
        """Enforce rate limiting per domain."""
        domain = urlparse(url).netloc
        last_request = self._last_request_time.get(domain, 0)
        elapsed = time.time() - last_request
        
        if elapsed < self.rate_limit:
            sleep_time = self.rate_limit - elapsed
            time.sleep(sleep_time)
        
        self._last_request_time[domain] = time.time()
    
    def _cache_key(self, url: str) -> str:
        """Generate cache key for URL."""
        return hashlib.md5(url.encode()).hexdigest()
    
    def _get_from_cache(self, url: str) -> Optional[FetchResult]:
        """Try to get cached result."""
        cache_file = self.cache_dir / f"{self._cache_key(url)}.json"
        if cache_file.exists():
            try:
                with open(cache_file, 'r') as f:
                    data = json.load(f)
                return FetchResult(**data)
            except:
                pass
        return None
    
    def _save_to_cache(self, url: str, result: FetchResult):
        """Save result to cache."""
        cache_file = self.cache_dir / f"{self._cache_key(url)}.json"
        # Don't cache HTML to save space
        data = {
            'url': result.url,
            'success': result.success,
            'content': result.content,
            'title': result.title,
            'error': result.error,
            'elapsed_time': result.elapsed_time,
        }
        with open(cache_file, 'w') as f:
            json.dump(data, f)
    
    def clear_cache(self):
        """Clear all cached results."""
        for f in self.cache_dir.glob('*.json'):
            f.unlink()
        print(f"Cache cleared: {self.cache_dir}")


print("✅ WebFetcher class defined")

## Test: Basic Fetching

In [None]:
# Create a fetcher
fetcher = WebFetcher(rate_limit=1.0)

# Fetch a simple page
result = fetcher.fetch("https://example.com")

print(f"Success: {result.success}")
print(f"Title: {result.title}")
print(f"Time: {result.elapsed_time:.2f}s")
print(f"Word count: {result.word_count()}")
print(f"\nContent preview:")
print(result.content[:500] if result.content else "No content")

## Test: Article Extraction

Trafilatura excels at extracting article content from news sites and blogs.

In [None]:
# Try a real article (Wikipedia is a good test)
result = fetcher.fetch("https://en.wikipedia.org/wiki/Large_language_model")

print(f"Title: {result.title}")
print(f"Word count: {result.word_count()}")
print(f"Cached: {result.from_cache}")
print(f"\nFirst 1000 characters:")
print("-" * 50)
print(result.content[:1000] if result.content else "No content")

## Test: Caching

In [None]:
# Fetch the same URL again - should be cached
result2 = fetcher.fetch("https://en.wikipedia.org/wiki/Large_language_model")

print(f"From cache: {result2.from_cache}")
print(f"Time (should be near-instant): {result2.elapsed_time:.4f}s")

## Test: Error Handling

In [None]:
# Test with a non-existent URL
result = fetcher.fetch("https://this-domain-does-not-exist-12345.com/page")

print(f"Success: {result.success}")
print(f"Error: {result.error}")

In [None]:
# Test with a 404 page
result = fetcher.fetch("https://httpstat.us/404")

print(f"Success: {result.success}")
print(f"Error: {result.error}")

## Test: Multiple URLs

In [None]:
urls = [
    "https://example.com",
    "https://en.wikipedia.org/wiki/Python_(programming_language)",
    "https://httpstat.us/500",  # This will fail
]

results = fetcher.fetch_multiple(urls)

print("\nSummary:")
for r in results:
    status = "✅" if r.success else "❌"
    print(f"{status} {r.title or r.error} ({r.word_count()} words)")

## Integration: Fetcher + LLM

Combine web fetching with the LLM client to summarize pages.

In [None]:
# First, let's make sure we have the LLM client
# (You can also import from src/llm_client.py if you saved it)

import ollama

def summarize_url(url: str, max_words: int = 100) -> str:
    """Fetch a URL and summarize its content."""
    # Fetch
    result = fetcher.fetch(url)
    if not result.success:
        return f"Failed to fetch: {result.error}"
    
    # Truncate content if too long (LLMs have context limits)
    content = result.content[:8000] if result.content else ""
    
    # Summarize with LLM
    response = ollama.chat(
        model='llama3',
        messages=[{
            'role': 'user',
            'content': f"""Summarize this article in {max_words} words or less.

Title: {result.title}

Content:
{content}

Summary:"""
        }]
    )
    
    return response['message']['content']


# Test it
summary = summarize_url("https://en.wikipedia.org/wiki/Large_language_model", max_words=150)
print(summary)

## Export as Module

In [None]:
module_code = '''
"""Web Fetcher - Retrieve and clean web content."""

import requests
import time
import hashlib
import json
from pathlib import Path
from dataclasses import dataclass
from typing import Optional
from urllib.parse import urlparse

import trafilatura
from bs4 import BeautifulSoup


@dataclass
class FetchResult:
    """Result of fetching a URL."""
    url: str
    success: bool
    content: Optional[str] = None
    title: Optional[str] = None
    html: Optional[str] = None
    error: Optional[str] = None
    elapsed_time: float = 0.0
    from_cache: bool = False
    
    def __str__(self):
        if self.success:
            preview = self.content[:200] + "..." if len(self.content or "") > 200 else self.content
            return f"[{self.title}]\\n{preview}"
        return f"Error: {self.error}"
    
    def word_count(self) -> int:
        return len(self.content.split()) if self.content else 0


class WebFetcher:
    """Fetch and extract clean text from web pages."""
    
    USER_AGENT = "Mozilla/5.0 (compatible; ResearchBot/1.0)"
    
    def __init__(
        self,
        cache_dir: Optional[Path] = None,
        timeout: float = 30.0,
        rate_limit: float = 1.0,
    ):
        self.cache_dir = cache_dir or Path("/tmp/web_cache")
        self.cache_dir.mkdir(parents=True, exist_ok=True)
        self.timeout = timeout
        self.rate_limit = rate_limit
        self._last_request_time: dict[str, float] = {}
        
        self.session = requests.Session()
        self.session.headers.update({
            "User-Agent": self.USER_AGENT,
            "Accept": "text/html,application/xhtml+xml",
        })
    
    def fetch(self, url: str, use_cache: bool = True) -> FetchResult:
        start = time.time()
        
        if use_cache:
            cached = self._get_from_cache(url)
            if cached:
                cached.from_cache = True
                cached.elapsed_time = time.time() - start
                return cached
        
        self._rate_limit(url)
        
        try:
            response = self.session.get(url, timeout=self.timeout)
            response.raise_for_status()
            html = response.text
            
            content = trafilatura.extract(html, include_tables=True)
            if not content:
                content = self._extract_with_beautifulsoup(html)
            
            title = self._extract_title(html)
            
            result = FetchResult(
                url=url, success=True, content=content,
                title=title, html=html, elapsed_time=time.time() - start,
            )
            
            if use_cache:
                self._save_to_cache(url, result)
            return result
            
        except Exception as e:
            return FetchResult(url=url, success=False, error=str(e), elapsed_time=time.time() - start)
    
    def _extract_title(self, html: str) -> Optional[str]:
        soup = BeautifulSoup(html, "html.parser")
        title_tag = soup.find("title")
        return title_tag.get_text().strip() if title_tag else None
    
    def _extract_with_beautifulsoup(self, html: str) -> str:
        soup = BeautifulSoup(html, "html.parser")
        for element in soup(["script", "style", "nav", "footer"]):
            element.decompose()
        return soup.get_text(separator="\\n", strip=True)
    
    def _rate_limit(self, url: str):
        domain = urlparse(url).netloc
        elapsed = time.time() - self._last_request_time.get(domain, 0)
        if elapsed < self.rate_limit:
            time.sleep(self.rate_limit - elapsed)
        self._last_request_time[domain] = time.time()
    
    def _cache_key(self, url: str) -> str:
        return hashlib.md5(url.encode()).hexdigest()
    
    def _get_from_cache(self, url: str) -> Optional[FetchResult]:
        cache_file = self.cache_dir / f"{self._cache_key(url)}.json"
        if cache_file.exists():
            try:
                with open(cache_file) as f:
                    return FetchResult(**json.load(f))
            except:
                pass
        return None
    
    def _save_to_cache(self, url: str, result: FetchResult):
        cache_file = self.cache_dir / f"{self._cache_key(url)}.json"
        data = {
            "url": result.url, "success": result.success,
            "content": result.content, "title": result.title,
            "error": result.error, "elapsed_time": result.elapsed_time,
        }
        with open(cache_file, "w") as f:
            json.dump(data, f)
    
    def clear_cache(self):
        for f in self.cache_dir.glob("*.json"):
            f.unlink()
'''

# Save to src folder
with open('/home/developer/projects/sandbox-experiments/src/web_fetcher.py', 'w') as f:
    f.write(module_code.strip())

print("✅ Saved to src/web_fetcher.py")
print("\nUsage in other notebooks:")
print("  from src.web_fetcher import WebFetcher, FetchResult")

## Next Steps

With the Web Fetcher working, you can:

1. **Build Module 5 (Summarizer)** - Use LLM to summarize fetched content
2. **Build Module 6 (Research Agent)** - Add search capability to find URLs
3. **Combine with LLM Client** - Create pipelines that fetch → process → generate

The `WebFetcher` handles the messy work of getting clean text from the web.