In [None]:
import requests
import re
from bs4 import BeautifulSoup
from collections import deque
from html.parser import HTMLParser
from urllib.parse import urlparse, urlunparse
import os
import time
from boilerpy3 import extractors


urls = [
    "https://docs.trychroma.com",
    "https://docs.langchain.com/docs",
    "https://js.langchain.com/docs",
    "https://python.langchain.com/en/latest",
    # "https://python.langchain.com/en/latest/getting_started/getting_started.html#",
    # "https://python.langchain.com/en/latest/modules/models.html",
    # "https://python.langchain.com/en/latest/modules/prompts.html", 
    # "https://python.langchain.com/en/latest/modules/indexes.html",
    # "https://python.langchain.com/en/latest/modules/memory.html",
    # "https://python.langchain.com/en/latest/modules/chains.html",
    # "https://python.langchain.com/en/latest/modules/agents.html",
    # "https://python.langchain.com/en/latest/modules/callbacks/getting_started.html",
    # "https://python.langchain.com/en/latest/use_cases/personal_assistants.html",
    # "https://python.langchain.com/en/latest/use_cases/autonomous_agents.html",
    # "https://python.langchain.com/en/latest/use_cases/question_answering.html",
    # "https://python.langchain.com/en/latest/use_cases/chatbots.html",
    # "https://python.langchain.com/en/latest/use_cases/tabular.html",
    # "https://python.langchain.com/en/latest/use_cases/code.html",
    # "https://python.langchain.com/en/latest/use_cases/apis.html",
    # "https://python.langchain.com/en/latest/use_cases/summarization.html",
    # "https://python.langchain.com/en/latest/use_cases/extraction.html",
    # "https://python.langchain.com/en/latest/use_cases/evaluation.html",
    ]

HTTP_URL_PATTERN = r'^http[s]*://.+'

class HyperlinkParser(HTMLParser):
    def __init__(self):
        super().__init__()
        self.hyperlinks = []

    def handle_starttag(self, tag, attrs):
        attrs = dict(attrs)

        if tag == "a" and "href" in attrs:
            self.hyperlinks.append(attrs["href"])

def fetch_url(url, headers, retries=5, backoff_factor=2):
    for retry in range(retries + 1):
        try:
            response = requests.get(url, headers=headers)
            return response
        except requests.exceptions.HTTPError as e:
            if e.response.status_code == 429:
                wait_time = backoff_factor ** retry
                print(f"HTTP Error 429: Too Many Requests. Retrying in {wait_time} seconds...")
                time.sleep(wait_time)
            elif e.response.status_code == 404:
                print(f"HTTP Error 404: Not Found. Ignoring URL: {url}")
                return None
            else:
                print(e)
                break
        except Exception as e:
            print(e)
            break
    return None

def get_hyperlinks(url, headers):
    response = fetch_url(url, headers)

    if response is None or not response.headers.get('Content-Type', '').startswith("text/html"):
        return []

    html = response.text
    parser = HyperlinkParser()
    parser.feed(html)

    return parser.hyperlinks

def get_common_path_prefix(urls):
    common_prefixes = {}
    for url in urls:
        domain = urlparse(url).netloc
        path = urlparse(url).path
        if domain not in common_prefixes:
            common_prefixes[domain] = path
        else:
            common_prefix = os.path.commonprefix([common_prefixes[domain], path])
            common_prefix = common_prefix[:common_prefix.rfind('/')]
            common_prefixes[domain] = common_prefix.rstrip('/') + '/'
    return common_prefixes




COMMON_PATH_PREFIXES = get_common_path_prefix(urls)


def get_domain_hyperlinks(local_domain, url, headers):
    clean_links = []
    for link in set(get_hyperlinks(url, headers)):
        clean_link = None

        if re.search(HTTP_URL_PATTERN, link):
            url_obj = urlparse(link)
            if url_obj.netloc == local_domain:
                clean_link = link
        else:
            if link.startswith("/"):
                link = link[1:]
            elif link.startswith("#") or link.startswith("mailto:"):
                continue
            clean_link = "https://" + local_domain
            if local_domain in COMMON_PATH_PREFIXES:
                common_prefix = COMMON_PATH_PREFIXES[local_domain]
                if not link.startswith(common_prefix):
                    if not clean_link.endswith("/") and not common_prefix.startswith("/"):
                        clean_link += "/"
                    clean_link += common_prefix
            if not clean_link.endswith("/") and not link.startswith("/"):
                clean_link += "/"
            clean_link += link

        if clean_link is not None:
            if clean_link.endswith("/"):
                clean_link = clean_link[:-1]
            clean_links.append(clean_link)

    return list(set(clean_links))

def get_text_from_url(url, headers):
    response = fetch_url(url, headers)
    if response is None:
        return None

    # Use boilerpy3 to extract main content
    extractor = extractors.DefaultExtractor()
    text = extractor.get_content(response.text)

    # Ignore pages with specific text patterns
    ignore_patterns = ["NOT FOUND", "404","  404 Not Found", "Page Not Found", "We could not find what you were looking for."]
    if any(pattern in text for pattern in ignore_patterns):
        print(f"Ignoring URL (matched ignore pattern): {url}")
        return None

    return text





In [None]:
def save_text_to_file(local_domain, url, text):
    if not text.strip():  # Check if the text is empty or contains only whitespaces
        return

    file_path = f"text/{local_domain}/{url[8:].replace('/', '_')}.txt"
    with open(file_path, "w") as f:
        f.write(text)


def create_directories(local_domain):
    if not os.path.exists("text/"):
        os.mkdir("text/")

    if not os.path.exists(f"text/{local_domain}/"):
        os.mkdir(f"text/{local_domain}/")

def crawl(url):
    local_domain = urlparse(url).netloc
    queue = deque([url])
    seen = set([url])
    seen_paths = set([urlparse(url).path])

    headers = {
        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/93.0.4577.63 Safari/537.36"
    }

    create_directories(local_domain)

    while queue:
        url = queue.pop()
        print(url)

        text = get_text_from_url(url, headers)
        if text is None:
            continue

        save_text_to_file(local_domain, url, text)

        for link in get_domain_hyperlinks(local_domain, local_domain, url, headers):
            link_path = urlparse(link).path
            if link not in seen and link_path not in seen_paths:
                queue.append(link)
                seen.add(link)
                seen_paths.add(link_path)




for url in urls:
    crawl(url)
