# LLM Based Web Scraping of Clark U websites

## Import Libraries

In [29]:
import nest_asyncio
nest_asyncio.apply()  # Patch asyncio for Jupyter

import collections
import requests
import json
from urllib.parse import urlparse
import os
from dotenv import load_dotenv
from bs4 import BeautifulSoup
from typing import Optional  # Added missing import

from scrapegraphai.graphs import DepthSearchGraph

# Load environment variables
load_dotenv()
# Set TOKENIZERS_PARALLELISM explicitly to avoid warnings
os.environ["TOKENIZERS_PARALLELISM"] = "false"

## LLM Setup

In [30]:
# gemini_key = os.getenv('GEMINI_KEY')
# hf_api_token = os.getenv('HUGGINGFACEHUB_API_TOKEN')
openai_key = os.getenv('OPENAI_API_KEY')

# Define the configuration for the graph
graph_config = {
    "llm": {
        "api_key": openai_key,
        "model": "openai/gpt-4o-mini",
        "temperature": 0,
    },
    "verbose": True,
    "headless": True,
    "depth": 200,              
    "only_inside_links": True  # Only follow links inside the same domain
}

## Define Helper Functions


In [None]:
def chunk_text(text, chunk_size=300):
    """
    Splits text into chunks of approximately 'chunk_size' words.
    """
    words = text.split()
    return [" ".join(words[i:i + chunk_size]) for i in range(0, len(words), chunk_size)]

def process_page(page, counter):
    """
    Given a page (dict with keys like 'title', 'url', 'content') extracted by the LLM,
    process the content and return a list of JSON entries.
    
    The LLM typically provides more structured content that needs different handling
    than raw HTML.
    """
    title = page.get("title", "")
    url = page.get("url", "")
    content = page.get("content", "")
    
    # Check if content is already a string
    if not isinstance(content, str):
        try:
            # Try to convert to string
            content = str(content)
        except Exception as e:
            print(f"Warning: Could not convert content to string for {url}: {e}")
            content = ""
    
    # Extract text from content - LLM may provide already cleaned content
    # so we may not need BeautifulSoup parsing here
    clean_text = content
    
    # If content looks like HTML, try to clean it
    if "<" in content and ">" in content and ("<html" in content.lower() or "<body" in content.lower()):
        try:
            soup = BeautifulSoup(content, "html.parser")
            clean_text = soup.get_text(separator=" ", strip=True)
        except Exception as e:
            print(f"Error parsing content with BeautifulSoup for {url}: {e}")
            print(f"Using original content instead")
    
    # Split the cleaned text into chunks
    chunks = chunk_text(clean_text, chunk_size=300)
    entries = []
    
    # If no chunks were created, create at least one entry with whatever content we have
    if not chunks and (title or url):
        entry = {
            "id": str(counter),
            "title": title,
            "url": url,
            "text": clean_text if clean_text else "No content extracted"
        }
        entries.append(entry)
        counter += 1
    else:
        for chunk in chunks:
            entry = {
                "id": str(counter),
                "title": title,
                "url": url,
                "text": chunk
            }
            entries.append(entry)
            counter += 1
    
    # Print a summary
    print(f"Processed URL: {url} - Created {len(entries)} chunks")
    
    return entries, counter

In [None]:
def run_depth_scrape(source_url, config):
    """
    Uses DepthSearchGraph to recursively scrape a given source URL.
    The prompt instructs the LLM to extract the page title and all textual content.
    """
    prompt = (
        "Extract all useful textual content from the webpage, including its title. "
        "Extract specific metadata (e.g., faculty names, research interests, course details) if present. "
        "Preserve sections like headers or subtitles that can help structure the content. "
        "Identify and include citation-like metadata (e.g., source URLs) for later reference. "
        "Recursively follow internal links up to the specified depth. "
        "Return each page as a JSON object with keys 'title', 'url', and 'content'."
    )
    depth_graph = DepthSearchGraph(
        prompt=prompt,
        source=source_url,
        config=config
    )
    result = depth_graph.run()
    return result

## Enhanced URL Collection and Processing

In [33]:
def get_domain(url):
    """Extract domain from URL"""
    parsed = urlparse(url)
    return parsed.netloc

def extract_links(html, base_url):
    """Extract all links from HTML content"""
    soup = BeautifulSoup(html, 'html.parser')
    domain = get_domain(base_url)
    links = []
    
    for a_tag in soup.find_all('a', href=True):
        href = a_tag['href']
        # Convert relative URLs to absolute
        if href.startswith('/'):
            href = f"https://{domain}{href}"
        elif not href.startswith(('http://', 'https://')):
            # Skip javascript links, anchors, etc.
            continue
        
        # Only include links from the same domain
        if get_domain(href) == domain:
            links.append(href)
    
    return list(set(links))  # Remove duplicates

def fetch_url(url):
    """Fetch URL content with error handling"""
    try:
        headers = {
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
        }
        response = requests.get(url, headers=headers, timeout=10)
        response.raise_for_status()
        return response.text
    except Exception as e:
        print(f"Error fetching {url}: {e}")
        return None

def extract_content(html, url):
    """Extract title and content from HTML with improved handling"""
    if not html:
        return None, None
        
    soup = BeautifulSoup(html, 'html.parser')
    
    # Get title
    title = ""
    title_tag = soup.find('title')
    if title_tag:
        title = title_tag.get_text(strip=True)
    else:
        # Try h1 tag if title not found
        h1_tag = soup.find('h1')
        if h1_tag:
            title = h1_tag.get_text(strip=True)
    
    # Extract main content - improved approach
    # First remove navigation, headers, footers, scripts, and ads
    for element in soup.find_all(['nav', 'header', 'footer', 'script', 'style', 'noscript', 
                                  'iframe', 'svg', 'aside', 'form']):
        element.decompose()
    
    # Try common content containers
    main_content_tags = [
        # ID-based selectors
        {'tag': 'div', 'attrs': {'id': ['content', 'main-content', 'mainContent', 'main']}},
        {'tag': 'main', 'attrs': {}},
        {'tag': 'article', 'attrs': {}},
        # Class-based selectors
        {'tag': 'div', 'attrs': {'class': ['content', 'main-content', 'main', 'article', 'post']}},
    ]
    
    main_content = None
    
    # Try each potential content container
    for selector in main_content_tags:
        tag = selector['tag']
        attrs = selector['attrs']
        
        # Handle multi-value attributes
        for attr_name, attr_values in attrs.items():
            if isinstance(attr_values, list):
                for value in attr_values:
                    elements = soup.find_all(tag, {attr_name: lambda x: x and value in x.split() if x else False})
                    if elements:
                        # Use the largest element by text length
                        main_content = max(elements, key=lambda e: len(e.get_text(strip=True)))
                        break
            if main_content:
                break
        
        # No need to continue if we found content
        if main_content:
            break
    
    # If no specific content container found, use body
    if not main_content:
        main_content = soup.body or soup
    
    # Extract content by sections
    sections = []
    
    # Extract headers and their content
    headers = main_content.find_all(['h1', 'h2', 'h3', 'h4'])
    
    if headers:
        for header in headers:
            header_text = header.get_text(strip=True)
            if not header_text:
                continue
            
            content_parts = []
            
            # Get all siblings until next header or end
            sibling = header.next_sibling
            while sibling and sibling.name not in ['h1', 'h2', 'h3', 'h4']:
                if hasattr(sibling, 'get_text'):
                    text = sibling.get_text(strip=True)
                    if text:
                        content_parts.append(text)
                sibling = sibling.next_sibling
                
            section_content = " ".join(content_parts)
            sections.append({
                'header': header_text,
                'text': section_content if section_content else "No content in this section"
            })
    
    # If no headers found or no content under headers, use paragraphs
    if not sections or not any(section.get('text') for section in sections):
        paragraphs = main_content.find_all('p')
        if paragraphs:
            all_text = " ".join(p.get_text(strip=True) for p in paragraphs if p.get_text(strip=True))
            sections.append({'text': all_text})
        else:
            # Last resort: get all text
            all_text = main_content.get_text(" ", strip=True)
            sections.append({'text': all_text})
    
    # Format as JSON string
    content_dict = {'sections': sections}
    return title, str(content_dict)

## Main Execution with Enhanced Link Collection

In [None]:
def main():
    # List of starting URLs for Clark University–related domains
    root_url = "https://www.clarku.edu/"
    
    # Initialize data structures
    all_entries = []
    visited_urls = set()  # To avoid duplicate processing
    urls_to_visit = collections.deque([root_url])  # Queue of URLs to process
    max_urls = 50  # Increased from 250 to process more URLs
    counter = 1  # For generating numerical IDs
    skipped_urls = 0
    empty_content_urls = 0
    
    # First get initial set of links from the root page
    print(f"Starting with root URL: {root_url}")
    html = fetch_url(root_url)
    if html:
        # Extract links from the homepage
        first_level_links = extract_links(html, root_url)
        print(f"Found {len(first_level_links)} links on homepage")
        urls_to_visit.extend(first_level_links)
    
    # Use a breadth-first approach to visit URLs
    while urls_to_visit and len(visited_urls) < max_urls:
        current_url = urls_to_visit.popleft()  # Using deque for better performance
        
        # Skip if already visited
        if current_url in visited_urls:
            continue
        
        # Remove anchor tags for visited_urls tracking but keep original for content extraction
        base_url = current_url.split('#')[0]
        if base_url in visited_urls:
            continue
            
        print(f"Processing URL {len(visited_urls)+1}/{max_urls}: {current_url}")
        visited_urls.add(base_url)
        
        # Fetch page content
        html = fetch_url(current_url)
        if not html:
            print(f"  - Failed to fetch content for {current_url}")
            skipped_urls += 1
            continue
            
        # Extract content
        try:
            title, content = extract_content(html, current_url)
            
            # Add to entries - only require title OR content, not both
            if title or content:
                try:
                    # Try to parse the content to check if it has actual text
                    content_dict = eval(content)
                    has_text = any(section.get('text') for section in content_dict.get('sections', []))
                    
                    if not has_text:
                        print(f"  - Warning: No text content found for {current_url}")
                        empty_content_urls += 1
                
                    entry = {
                        "id": str(counter),
                        "title": title if title else "Untitled Page",
                        "url": current_url,
                        "text": content
                    }
                    all_entries.append(entry)
                    counter += 1
                    print(f"  + Added entry #{counter-1}: {title[:50]}{'...' if len(title) > 50 else ''}")
                except:
                    # If parsing fails, still add the entry
                    entry = {
                        "id": str(counter),
                        "title": title if title else "Untitled Page",
                        "url": current_url,
                        "text": content if content else "Unable to extract content"
                    }
                    all_entries.append(entry)
                    counter += 1
                    print(f"  + Added entry #{counter-1}: {title[:50]}{'...' if len(title) > 50 else ''}")
            else:
                print(f"  - No content or title found for {current_url}")
                skipped_urls += 1
        except Exception as e:
            print(f"  ! Error processing content for {current_url}: {e}")
            skipped_urls += 1
            continue
            
        # Extract more links if we need them
        if len(visited_urls) < max_urls:
            try:
                new_links = extract_links(html, current_url)
                # Only add links we haven't visited yet
                added_links = 0
                for link in new_links:
                    base_link = link.split('#')[0]
                    if base_link not in visited_urls and link not in urls_to_visit:
                        urls_to_visit.append(link)
                        added_links += 1
                
                if added_links:
                    print(f"  > Found {added_links} new links, queue size: {len(urls_to_visit)}")
            except Exception as e:
                print(f"  ! Error extracting links from {current_url}: {e}")
    
    print(f"Processed {len(visited_urls)} URLs, extracted {len(all_entries)} content entries")
    print(f"Skipped {skipped_urls} URLs due to errors or no content")
    print(f"Found {empty_content_urls} URLs with empty content sections")
    
    # Save results
    output_file = "scraped_clark_data.json"
    with open(output_file, "w", encoding="utf-8") as f:
        json.dump(all_entries, f, indent=4, ensure_ascii=False)
    print(f"Scraping complete. Data saved to {output_file}")

# Run the main function
main()

Starting with root URL: https://www.clarku.edu/
Found 105 links on homepage
Processing URL 1/50: https://www.clarku.edu/
  + Added entry #1: Clark University | Challenge Convention. Change Ou...
Processing URL 2/50: https://www.clarku.edu/graduate-education/international-students/
  + Added entry #2: International Students | Graduate Education
  > Found 5 new links, queue size: 109
Processing URL 3/50: https://www.clarku.edu/academics/our-faculty/
  + Added entry #3: Our Faculty | Clark University
  > Found 5 new links, queue size: 113
Processing URL 4/50: https://www.clarku.edu/academics/research/science-facilities-and-labs/
  + Added entry #4: Science Facilities and Labs | Clark University
  > Found 7 new links, queue size: 119
Processing URL 5/50: https://www.clarku.edu/academics/undergraduate-curriculum/
  + Added entry #5: Undergraduate Curriculum | Clark University
  > Found 7 new links, queue size: 125
Processing URL 6/50: https://www.clarku.edu/give25/
Error fetching https://ww