## Collecting the website urls

In [2]:
import requests
from urllib.parse import urljoin, urlparse, urlunparse
from bs4 import BeautifulSoup
from collections import deque
import time
import re

def is_valid_url(url):
    """Check if URL is valid and belongs to the target domain/subdomain."""
    parsed = urlparse(url)
    return bool(parsed.netloc) and bool(parsed.scheme)

def get_base_domain(url):
    """Extract base domain (e.g., 'langchain.com' from 'blog.langchain.com')."""
    parsed = urlparse(url)
    return ".".join(parsed.netloc.split(".")[-2:])  # Handles subdomains [[6]][[9]]

def normalize_url(url):
    """Normalize URL to avoid duplicates."""
    parsed = urlparse(url)
    return urlunparse((
        parsed.scheme,
        parsed.netloc,
        parsed.path.rstrip('/'),  # Remove trailing slash
        parsed.params,
        parsed.query,
        ''  # Remove fragments
    ))

def recursive_crawl(start_url, delay=0.5):
    """Recursively crawl all internal links within the domain/subdomain."""
    base_domain = get_base_domain(start_url)
    visited = set()
    queue = deque([normalize_url(start_url)])
    headers = {
        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3"
    }

    while queue:
        current_url = queue.popleft()
        
        if current_url in visited:
            continue
            
        visited.add(current_url)
        time.sleep(delay)  # Politeness delay [[1]][[7]]

        try:
            response = requests.get(current_url, headers=headers, timeout=10)
            if response.status_code != 200:
                print(f"Skipping {current_url} (Status {response.status_code})")
                continue

            soup = BeautifulSoup(response.text, 'html.parser')
            print(f"Crawling: {current_url} (Found {len(soup.find_all('a'))} links)")

            for a_tag in soup.find_all('a', href=True):
                raw_href = a_tag['href']
                absolute_url = urljoin(current_url, raw_href)
                normalized_url = normalize_url(absolute_url)

                # Filter criteria [[2]][[4]]
                if (
                    is_valid_url(normalized_url) and
                    get_base_domain(normalized_url) == base_domain and
                    not re.search(r"\.(pdf|jpg|png|zip|gz|svg)$", normalized_url, re.IGNORECASE) and
                    not re.match(r".*\s.*", normalized_url)  # Avoid URLs with spaces
                ):
                    if normalized_url not in visited and normalized_url not in queue:
                        queue.append(normalized_url)
                        print(f"Discovered: {normalized_url}")

        except Exception as e:
            print(f"Error fetching {current_url}: {str(e)}")

    return visited

if __name__ == "__main__":
    start_url = "https://www.langchain.com/"
    print(f"Starting recursive crawl on {start_url}...\n")
    all_links = recursive_crawl(start_url, delay=0.5)

    # Save to file
    with open("langchain_all_links.txt", "w") as f:
        for link in sorted(all_links):
            f.write(link + "\n")

    print(f"\nTotal links discovered: {len(all_links)}")

Starting recursive crawl on https://www.langchain.com/...

Crawling: https://www.langchain.com (Found 83 links)
Discovered: https://www.langchain.com/langgraph
Discovered: https://www.langchain.com/langsmith
Discovered: https://www.langchain.com/langchain
Discovered: https://www.langchain.com/resources
Discovered: https://www.langchain.com/customers
Discovered: https://academy.langchain.com
Discovered: https://www.langchain.com/community
Discovered: https://www.langchain.com/experts
Discovered: https://changelog.langchain.com
Discovered: https://docs.smith.langchain.com
Discovered: https://python.langchain.com/docs/introduction
Discovered: https://js.langchain.com/docs/introduction
Discovered: https://www.langchain.com/about
Discovered: https://www.langchain.com/careers
Discovered: https://www.langchain.com/pricing-langsmith
Discovered: https://www.langchain.com/pricing-langgraph-platform
Discovered: https://www.langchain.com/contact-sales
Discovered: https://smith.langchain.com
Discov


Assuming this really is an XML document, what you're doing might work, but you should know that using an XML parser will be more reliable. To parse this document as XML, make sure you have the Python package 'lxml' installed, and pass the keyword argument `features="xml"` into the BeautifulSoup constructor.




  soup = BeautifulSoup(response.text, 'html.parser')


Crawling: https://status.smith.langchain.com/history.atom (Found 0 links)
Crawling: https://status.smith.langchain.com/history.rss (Found 0 links)
Crawling: https://status.smith.langchain.com/uptime (Found 21 links)
Crawling: https://status.smith.langchain.com/incidents/tslql6lfbjrj (Found 7 links)
Crawling: https://status.smith.langchain.com/incidents/z8fll0v0l1c1 (Found 7 links)
Crawling: https://status.smith.langchain.com/history (Found 21 links)
Error fetching https://journal.langchain.com: HTTPSConnectionPool(host='journal.langchain.com', port=443): Max retries exceeded with url: / (Caused by SSLError(SSLCertVerificationError(1, '[SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed: certificate has expired (_ssl.c:1010)')))
Crawling: https://academy.langchain.com/enroll/3033117?et=free (Found 21 links)
Crawling: https://www.langchain.com/breakoutagents/replit (Found 39 links)
Crawling: https://docs.smith.langchain.com/old (Found 79 links)
Discovered: https://smith.langchain.

KeyboardInterrupt: 

### Improved performance

In [None]:
import requests
from urllib.parse import urljoin, urlparse, urlunparse, parse_qs, urlencode
from bs4 import BeautifulSoup
from collections import deque
import time
import re
import logging
import concurrent.futures

# Set up logging
logging.basicConfig(level=logging.INFO, 
                   format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

def is_valid_url(url):
    """Check if URL is valid and belongs to the target domain/subdomain."""
    parsed = urlparse(url)
    return bool(parsed.netloc) and bool(parsed.scheme)

def get_base_domain(url):
    """Extract base domain (e.g., 'langchain.com' from 'blog.langchain.com')."""
    parsed = urlparse(url)
    return ".".join(parsed.netloc.split(".")[-2:])

def normalize_url(url):
    """Enhanced URL normalization to avoid duplicates."""
    parsed = urlparse(url.lower())  # Convert to lowercase
    
    # Remove trailing slash from path
    path = parsed.path.rstrip('/')
    if not path:
        path = '/'
        
    # Remove tracking parameters (utm_, ref, etc.)
    if parsed.query:
        query_params = parse_qs(parsed.query)
        filtered_params = {k: v for k, v in query_params.items() 
                         if not k.startswith('utm_') and k not in ('ref', 'source')}
        query = urlencode(filtered_params, doseq=True) if filtered_params else ''
    else:
        query = ''
    
    # Handle www vs non-www
    netloc = parsed.netloc
    if netloc.startswith('www.'):
        netloc = netloc[4:]
    
    return urlunparse((parsed.scheme, netloc, path, parsed.params, query, ''))

def fetch_url(url, headers, visited):
    """Process a single URL and return new links to visit."""
    if url in visited:
        return []
        
    visited.add(url)
    new_links = []
    
    try:
        response = requests.get(url, headers=headers, timeout=10)
        if response.status_code != 200:
            logger.warning(f"Skipping {url} (Status {response.status_code})")
            return []

        soup = BeautifulSoup(response.text, 'html.parser')
        logger.info(f"Crawled: {url} (Found {len(soup.find_all('a'))} links)")
        
        return soup.find_all('a', href=True)
        
    except Exception as e:
        logger.error(f"Error fetching {url}: {str(e)}")
        return []

def recursive_crawl(start_url, delay=0.5, max_pages=None, concurrency=1):
    """Recursively crawl all internal links within the domain/subdomain."""
    base_domain = get_base_domain(start_url)
    visited = set()
    discovered = set()  # Track all URLs we've seen (visited + queued)
    queue = deque([normalize_url(start_url)])
    discovered.add(queue[0])
    
    headers = {
        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3"
    }

    page_count = 0
    
    with concurrent.futures.ThreadPoolExecutor(max_workers=concurrency) as executor:
        while queue and (max_pages is None or page_count < max_pages):
            # Get a batch of URLs to process
            batch_size = min(concurrency, len(queue))
            current_batch = [queue.popleft() for _ in range(batch_size)]
            
            # Wait between batches to be polite
            time.sleep(delay)
            
            # Process the batch concurrently
            future_to_url = {executor.submit(fetch_url, url, headers, visited): url 
                           for url in current_batch}
            
            for future in concurrent.futures.as_completed(future_to_url):
                current_url = future_to_url[future]
                page_count += 1
                
                for a_tag in future.result():
                    raw_href = a_tag['href']
                    absolute_url = urljoin(current_url, raw_href)
                    normalized_url = normalize_url(absolute_url)

                    # Filter criteria
                    if (
                        is_valid_url(normalized_url) and
                        get_base_domain(normalized_url) == base_domain and
                        not re.search(r"\.(pdf|jpg|jpeg|png|gif|zip|gz|svg|mp4|mp3)$", normalized_url, re.IGNORECASE) and
                        not re.match(r".*\s.*", normalized_url) and
                        normalized_url not in discovered
                    ):
                        queue.append(normalized_url)
                        discovered.add(normalized_url)
                        logger.debug(f"Discovered: {normalized_url}")

    return visited

if __name__ == "__main__":
    start_url = "https://www.langchain.com/"
    logger.info(f"Starting recursive crawl on {start_url}...")
    all_links = recursive_crawl(start_url, delay=0.5, max_pages=None, concurrency=3)

    # Save to file
    with open("langchain_all_links.txt", "w") as f:
        for link in sorted(all_links):
            f.write(link + "\n")

    logger.info(f"Total links crawled: {len(all_links)}")

# Create CSV

In [None]:
import requests
from urllib.parse import urljoin, urlparse, urlunparse
from bs4 import BeautifulSoup
from collections import deque
import time
import re
import csv

def is_valid_url(url):
    """Check if URL is valid and belongs to the target domain/subdomain."""
    parsed = urlparse(url)
    return bool(parsed.netloc) and bool(parsed.scheme)

def get_base_domain(url):
    """Extract base domain (e.g., 'langchain.com' from 'blog.langchain.com')."""
    parsed = urlparse(url)
    return ".".join(parsed.netloc.split(".")[-2:])  # Handles subdomains

def normalize_url(url):
    """Normalize URL to avoid duplicates."""
    parsed = urlparse(url)
    return urlunparse((
        parsed.scheme,
        parsed.netloc,
        parsed.path.rstrip('/'),  # Remove trailing slash
        parsed.params, 
        parsed.query,
        ''  # Remove fragments
    ))

def categorize_url(url):
    """Categorize URL by its file type."""
    lower_url = url.lower()
    if re.search(r"\.pdf$", lower_url):
        return "pdf"
    elif re.search(r"\.(jpg|jpeg|png|gif|svg|webp|bmp|ico)$", lower_url):
        return "image"
    elif re.search(r"\.(zip|gz|tar|rar|7z)$", lower_url):
        return "archive"
    elif re.search(r"\.(mp4|avi|mov|wmv|flv|mkv)$", lower_url):
        return "video"
    elif re.search(r"\.(mp3|wav|ogg|flac|aac)$", lower_url):
        return "audio"
    elif re.search(r"\.(doc|docx|xls|xlsx|ppt|pptx)$", lower_url):
        return "document"
    else:
        return "page"

def recursive_crawl(start_url, delay=0.5):
    """Recursively crawl all internal links and categorize all link types."""
    base_domain = get_base_domain(start_url)
    
    # Track visited pages (we'll only crawl HTML pages)
    visited_pages = set()
    
    # Track all discovered links by category
    all_links = {
        "page": set(),
        "image": set(),
        "pdf": set(),
        "archive": set(),
        "video": set(),
        "audio": set(),
        "document": set(),
        "other": set()
    }
    
    queue = deque([normalize_url(start_url)])
    headers = {
        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3"
    }

    while queue:
        current_url = queue.popleft()
        
        if current_url in visited_pages:
            continue
            
        visited_pages.add(current_url)
        time.sleep(delay)  # Politeness delay

        try:
            response = requests.get(current_url, headers=headers, timeout=10)
            if response.status_code != 200:
                print(f"Skipping {current_url} (Status {response.status_code})")
                continue

            # Only parse HTML content
            content_type = response.headers.get('Content-Type', '').lower()
            if 'text/html' not in content_type:
                link_type = categorize_url(current_url)
                all_links[link_type].add(current_url)
                print(f"Added non-HTML content: {current_url} (Type: {link_type})")
                continue

            soup = BeautifulSoup(response.text, 'html.parser')
            print(f"Crawling: {current_url} (Found {len(soup.find_all('a'))} links)")
            
            # Add current page to page links
            category = categorize_url(current_url)
            all_links[category].add(current_url)

            # Extract all links from the page
            for a_tag in soup.find_all('a', href=True):
                raw_href = a_tag['href']
                absolute_url = urljoin(current_url, raw_href)
                normalized_url = normalize_url(absolute_url)
                
                if not is_valid_url(normalized_url) or re.match(r".*\s.*", normalized_url):
                    continue
                
                # Categorize the link
                link_category = categorize_url(normalized_url)
                all_links[link_category].add(normalized_url)
                
                # Only queue HTML pages from the same domain for crawling
                if (get_base_domain(normalized_url) == base_domain and 
                    link_category == "page" and
                    normalized_url not in visited_pages and 
                    normalized_url not in queue):
                    queue.append(normalized_url)
                    print(f"Discovered page: {normalized_url}")
            
            # Extract image links
            for img_tag in soup.find_all('img', src=True):
                src = img_tag['src'] 
                if src and not src.startswith('data:'):
                    absolute_url = urljoin(current_url, src)
                    normalized_url = normalize_url(absolute_url)
                    if is_valid_url(normalized_url):
                        all_links["image"].add(normalized_url)
            
            # Extract other embedded content
            for tag in soup.find_all(['source', 'video', 'audio', 'iframe', 'embed'], src=True):
                src = tag['src']
                if src and not src.startswith('data:'):
                    absolute_url = urljoin(current_url, src)
                    normalized_url = normalize_url(absolute_url)
                    if is_valid_url(normalized_url):
                        link_type = categorize_url(normalized_url)
                        all_links[link_type].add(normalized_url)

        except Exception as e:
            print(f"Error fetching {current_url}: {str(e)}")

    return all_links

def export_links_to_csv(links, filename):
    """Export all links to a CSV file."""
    with open(filename, 'w', newline='', encoding='utf-8') as csvfile:
        writer = csv.writer(csvfile)
        writer.writerow(['Type', 'URL'])
        
        for link_type, urls in links.items():
            for url in urls:
                writer.writerow([link_type, url])
                
    print(f"Links exported to {filename}")

if __name__ == "__main__":
    start_url = "https://www.langchain.com/"
    print(f"Starting recursive crawl on {start_url}...\n")
    all_links = recursive_crawl(start_url, delay=0.5)

    # Save all links to CSV
    export_links_to_csv(all_links, "langchain_all_links.csv")
    
    # Print summary
    total_links = sum(len(links) for links in all_links.values())
    print("\nCrawl Summary:")
    print(f"Total links discovered: {total_links}")
    for link_type, urls in all_links.items():
        print(f"  {link_type}: {len(urls)}")

## Improved version

In [1]:
%%writefile deep_crawler.py
#!/usr/bin/env python3
import asyncio
import csv
import logging
import re
import time
from collections import defaultdict
from dataclasses import dataclass
from pathlib import Path
from typing import Dict, List, Optional, Set, Union
from urllib.parse import urljoin, urlparse, urlunparse
from urllib.robotparser import RobotFileParser

import aiohttp
from aiohttp import ClientTimeout
from bs4 import BeautifulSoup
import argparse


@dataclass
class CrawlConfig:
    """Configuration for the crawler."""
    start_url: str
    output_file: str = "crawl_results.csv"
    delay: float = 0.5
    max_depth: int = 5
    timeout: int = 10
    max_retries: int = 3
    concurrency: int = 5
    respect_robots_txt: bool = True
    user_agent: str = "DeepCrawler/1.0"
    log_level: str = "INFO"


class LinkCategory:
    """Link categories enum."""
    PAGE = "page"
    IMAGE = "image"
    PDF = "pdf"
    ARCHIVE = "archive"
    VIDEO = "video"
    AUDIO = "audio"
    DOCUMENT = "document"
    OTHER = "other"


class UrlUtils:
    """URL utility functions."""
    
    @staticmethod
    def is_valid_url(url: str) -> bool:
        """Check if URL is valid."""
        parsed = urlparse(url)
        return bool(parsed.netloc) and bool(parsed.scheme)
    
    @staticmethod
    def get_base_domain(url: str) -> str:
        """Extract base domain from URL."""
        parsed = urlparse(url)
        return ".".join(parsed.netloc.split(".")[-2:])
    
    @staticmethod
    def normalize_url(url: str) -> str:
        """Normalize URL to avoid duplicates."""
        parsed = urlparse(url)
        path = parsed.path.rstrip('/') or '/'
        return urlunparse((
            parsed.scheme,
            parsed.netloc,
            path,
            parsed.params, 
            parsed.query,
            ''  # Remove fragments
        ))
    
    @staticmethod
    def categorize_url(url: str) -> str:
        """Categorize URL by its file type."""
        lower_url = url.lower()
        if re.search(r"\.pdf$", lower_url):
            return LinkCategory.PDF
        elif re.search(r"\.(jpg|jpeg|png|gif|svg|webp|bmp|ico)$", lower_url):
            return LinkCategory.IMAGE
        elif re.search(r"\.(zip|gz|tar|rar|7z)$", lower_url):
            return LinkCategory.ARCHIVE
        elif re.search(r"\.(mp4|avi|mov|wmv|flv|mkv)$", lower_url):
            return LinkCategory.VIDEO
        elif re.search(r"\.(mp3|wav|ogg|flac|aac)$", lower_url):
            return LinkCategory.AUDIO
        elif re.search(r"\.(doc|docx|xls|xlsx|ppt|pptx)$", lower_url):
            return LinkCategory.DOCUMENT
        else:
            return LinkCategory.PAGE


class RobotsTxtManager:
    """Manages robots.txt parsing and checking."""
    
    def __init__(self, user_agent: str):
        self.user_agent = user_agent
        self.parsers: Dict[str, RobotFileParser] = {}
        self.checked_domains: Set[str] = set()
        
    async def can_fetch(self, session: aiohttp.ClientSession, url: str) -> bool:
        """Check if a URL can be fetched according to robots.txt rules."""
        parsed_url = urlparse(url)
        base_url = f"{parsed_url.scheme}://{parsed_url.netloc}"
        
        if base_url not in self.parsers:
            robots_url = f"{base_url}/robots.txt"
            parser = RobotFileParser(robots_url)
            
            try:
                async with session.get(robots_url, timeout=ClientTimeout(total=5)) as response:
                    if response.status == 200:
                        content = await response.text()
                        parser.parse(content.splitlines())
                    else:
                        # No robots.txt or can't access it, assume all is allowed
                        parser.allow_all = True
            except Exception as e:
                logging.debug(f"Error fetching robots.txt from {base_url}: {e}")
                parser.allow_all = True
                
            self.parsers[base_url] = parser
            
        return self.parsers[base_url].can_fetch(self.user_agent, url)


class WebCrawler:
    """Asynchronous web crawler with improved features."""
    
    def __init__(self, config: CrawlConfig):
        self.config = config
        self.base_domain = UrlUtils.get_base_domain(config.start_url)
        self.visited_urls: Set[str] = set()
        self.urls_to_visit: List[tuple] = [(UrlUtils.normalize_url(config.start_url), 0)]  # (url, depth)
        self.discovered_links: Dict[str, Set[str]] = defaultdict(set)
        self.robots_manager = RobotsTxtManager(config.user_agent)
        
        # Configure logging
        logging.basicConfig(
            level=getattr(logging, config.log_level),
            format='%(asctime)s - %(levelname)s - %(message)s',
            datefmt='%Y-%m-%d %H:%M:%S'
        )
        
    async def crawl(self) -> Dict[str, Set[str]]:
        """Main crawling method."""
        logging.info(f"Starting crawl at {self.config.start_url} with max depth {self.config.max_depth}")
        
        async with aiohttp.ClientSession(
            headers={"User-Agent": self.config.user_agent},
            timeout=ClientTimeout(total=self.config.timeout)
        ) as session:
            semaphore = asyncio.Semaphore(self.config.concurrency)
            tasks = []
            
            # Process URLs until we've visited them all or reached limits
            while self.urls_to_visit:
                url, depth = self.urls_to_visit.pop(0)
                
                if url in self.visited_urls or depth > self.config.max_depth:
                    continue
                
                if self.config.respect_robots_txt:
                    can_fetch = await self.robots_manager.can_fetch(session, url)
                    if not can_fetch:
                        logging.info(f"Skipping {url} (disallowed by robots.txt)")
                        continue
                
                # Add task to process the URL
                task = asyncio.create_task(self.process_url(url, depth, session, semaphore))
                tasks.append(task)
                
                # Add a small delay between starting tasks
                if self.config.delay > 0:
                    await asyncio.sleep(self.config.delay)
            
            # Wait for all tasks to complete
            if tasks:
                await asyncio.gather(*tasks)
        
        return dict(self.discovered_links)
    
    async def process_url(self, url: str, depth: int, session: aiohttp.ClientSession, semaphore: asyncio.Semaphore):
        """Process a single URL."""
        if url in self.visited_urls:
            return
        
        self.visited_urls.add(url)
        
        for attempt in range(self.config.max_retries):
            try:
                async with semaphore:
                    logging.debug(f"Crawling: {url} (depth: {depth}, attempt: {attempt+1})")
                    
                    async with session.get(url) as response:
                        if response.status != 200:
                            logging.warning(f"Failed to fetch {url} - Status {response.status}")
                            continue
                        
                        # Add URL to appropriate category
                        category = UrlUtils.categorize_url(url)
                        self.discovered_links[category].add(url)
                        
                        # Only parse HTML content for extraction
                        content_type = response.headers.get('Content-Type', '').lower()
                        if 'text/html' not in content_type:
                            logging.debug(f"Skipping non-HTML content: {url}")
                            return
                        
                        html_content = await response.text()
                        await self.extract_links(url, html_content, depth)
                        return
                        
            except asyncio.TimeoutError:
                logging.warning(f"Timeout while fetching {url} (attempt {attempt+1})")
            except Exception as e:
                logging.error(f"Error processing {url}: {str(e)}")
        
        logging.error(f"Failed to process {url} after {self.config.max_retries} attempts")
    
    async def extract_links(self, url: str, html_content: str, depth: int):
        """Extract links from HTML content."""
        soup = BeautifulSoup(html_content, 'html.parser')
        logging.info(f"Processing: {url} (Found {len(soup.find_all('a'))} links)")
        
        # Extract regular links
        for a_tag in soup.find_all('a', href=True):
            href = a_tag['href']
            self.process_extracted_url(url, href, depth)
        
        # Extract image links
        for img_tag in soup.find_all('img', src=True):
            src = img_tag['src']
            if src and not src.startswith('data:'):
                absolute_url = urljoin(url, src)
                normalized_url = UrlUtils.normalize_url(absolute_url)
                if UrlUtils.is_valid_url(normalized_url):
                    self.discovered_links[LinkCategory.IMAGE].add(normalized_url)
        
        # Extract other embedded content
        for tag in soup.find_all(['source', 'video', 'audio', 'iframe', 'embed'], src=True):
            src = tag['src']
            if src and not src.startswith('data:'):
                absolute_url = urljoin(url, src)
                normalized_url = UrlUtils.normalize_url(absolute_url)
                if UrlUtils.is_valid_url(normalized_url):
                    category = UrlUtils.categorize_url(normalized_url)
                    self.discovered_links[category].add(normalized_url)
    
    def process_extracted_url(self, source_url: str, href: str, depth: int):
        """Process an extracted URL."""
        if not href or href.startswith(('javascript:', 'mailto:', 'tel:')):
            return
            
        absolute_url = urljoin(source_url, href)
        normalized_url = UrlUtils.normalize_url(absolute_url)
        
        if not UrlUtils.is_valid_url(normalized_url) or re.match(r".*\s.*", normalized_url):
            return
        
        # Categorize the link
        category = UrlUtils.categorize_url(normalized_url)
        self.discovered_links[category].add(normalized_url)
        
        # Queue HTML pages from same domain for crawling
        next_depth = depth + 1
        if (UrlUtils.get_base_domain(normalized_url) == self.base_domain and 
            category == LinkCategory.PAGE and
            next_depth <= self.config.max_depth and
            normalized_url not in self.visited_urls and
            (normalized_url, next_depth) not in self.urls_to_visit):
            
            self.urls_to_visit.append((normalized_url, next_depth))
    
    def export_links_to_csv(self, filename: Optional[str] = None):
        """Export discovered links to a CSV file."""
        output_path = filename or self.config.output_file
        
        with open(output_path, 'w', newline='', encoding='utf-8') as csvfile:
            writer = csv.writer(csvfile)
            writer.writerow(['Type', 'URL'])
            
            for link_type, urls in self.discovered_links.items():
                for url in urls:
                    writer.writerow([link_type, url])
        
        logging.info(f"Links exported to {output_path}")
    
    def print_summary(self):
        """Print a summary of the crawl results."""
        total_links = sum(len(links) for links in self.discovered_links.values())
        
        logging.info("\nCrawl Summary:")
        logging.info(f"Total links discovered: {total_links}")
        for link_type, urls in self.discovered_links.items():
            logging.info(f"  {link_type}: {len(urls)}")


async def main(args):
    """Main entry point for the crawler."""
    config = CrawlConfig(
        start_url=args.url,
        output_file=args.output,
        delay=args.delay,
        max_depth=args.depth,
        timeout=args.timeout,
        max_retries=args.retries,
        concurrency=args.concurrency,
        respect_robots_txt=not args.ignore_robots,
        user_agent=args.user_agent,
        log_level=args.log_level
    )
    
    crawler = WebCrawler(config)
    await crawler.crawl()
    crawler.export_links_to_csv()
    crawler.print_summary()


if __name__ == "__main__":
    parser = argparse.ArgumentParser(description='Deep Web Crawler')
    parser.add_argument('url', default="https://www.langchain.com/",help='Starting URL to crawl')
    parser.add_argument('-o', '--output', default='crawl_results.csv', help='Output CSV file')
    parser.add_argument('-d', '--delay', type=float, default=0.5, help='Delay between requests')
    parser.add_argument('--depth', type=int, default=5, help='Maximum crawl depth')
    parser.add_argument('--timeout', type=int, default=10, help='Request timeout in seconds')
    parser.add_argument('--retries', type=int, default=3, help='Maximum number of retries per URL')
    parser.add_argument('--concurrency', type=int, default=5, help='Number of concurrent requests')
    parser.add_argument('--ignore-robots', action='store_true', help='Ignore robots.txt restrictions')
    parser.add_argument('--user-agent', default='DeepCrawler/1.0', help='User-Agent string')
    parser.add_argument('--log-level', default='INFO', choices=['DEBUG', 'INFO', 'WARNING', 'ERROR'], 
                        help='Logging level')
    
    args = parser.parse_args()
    
    asyncio.run(main(args))

Overwriting deep_crawler.py


In [1]:
!python deep_crawler.py https://www.langchain.com/ -o langchain_crawl_results.csv

2025-03-26 10:23:12 - INFO - Starting crawl at https://www.langchain.com/ with max depth 5
2025-03-26 10:23:12 - INFO - Processing: https://www.langchain.com/ (Found 83 links)
2025-03-26 10:23:13 - INFO - Processing: https://www.langchain.com/langgraph (Found 65 links)
2025-03-26 10:23:13 - INFO - Processing: https://www.langchain.com/langsmith (Found 67 links)
2025-03-26 10:23:14 - INFO - Processing: https://www.langchain.com/langchain (Found 60 links)
2025-03-26 10:23:14 - INFO - Processing: https://www.langchain.com/resources (Found 61 links)
2025-03-26 10:23:15 - INFO - Processing: https://www.langchain.com/customers (Found 137 links)
Traceback (most recent call last):
  File "/home/zeus/miniconda3/envs/cloudspace/lib/python3.12/asyncio/runners.py", line 118, in run
    return self._loop.run_until_complete(task)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/zeus/miniconda3/envs/cloudspace/lib/python3.12/asyncio/base_events.py", line 691, in run_until_complete
    ret