In [None]:
%load_ext lab_black

In [None]:
import httpx
import pandas as pd
from time import time
from pathlib import Path
from asyncio import gather
from collections import Counter
from bs4 import BeautifulSoup as bsoup
from urllib.parse import urlparse, urljoin
from sqlitedict import SqliteDict as sqldict
from IPython.display import display, Markdown

# Note: This crawler will not run out of control because it locks stubbornly onto
# completing each click-depth and is only recording the link graph and no on-page
# content. This is useful for getting the URLs and visualizing the link graph.

homepage = "https://mikelev.in/blog/"
data_folder = "crawl"

Path(data_folder).mkdir(exist_ok=True)

# Configuration
max_crawl_per_run = 500
user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/105.0.0.0 Safari/537.36"
headers = {"user-agent": user_agent}
linkdb = f"{data_folder}/links.db"
depthdb = f"{data_folder}/depth.db"
start = time()

In [None]:
# Function to get absolute links from a URL
def onsite_links(href):
    response = httpx.get(href, headers=headers)
    soup = bsoup(response.text, "html.parser")
    ahrefs = soup.find_all("a")
    seen = set()
    for link in ahrefs:
        if "href" in link.attrs:
            href = link.attrs["href"]
            # Skip kooky protocols like email
            if ":" in href and "//" not in href:
                continue
            # Convert relative links to absolute
            if "://" not in href:
                href = urljoin(homepage, href)
            # Convert root slash to homepage
            if href == "/":
                href = homepage
            # Strip stuff after hash (not formal part of URL)
            if "#" in href:
                href = href[: href.index("#")]
            # Remove dupes and offsite links
            if href[: len(homepage)] == homepage:
                seen.add(href)
    return seen


def feedback(i, t=False):
    if not i % 1000:
        if t:
            print(f"\nProcessed: {i} of {t}")
        else:
            print(f"\nProcessed: {i}")
    elif not i % 10:
        print(".", end="")


# Let's make some headlines!
for i in range(1, 7):
    func_name = f"h{i}"
    num_hashes = "#" * i
    command = rf"{func_name} = lambda x: display(Markdown('{num_hashes} %s' % x))"
    exec(command)

# Seed Crawl with click-depth 1 & 2
h1(f"Getting links from {homepage}")
links = onsite_links(homepage)
table = []
with sqldict(linkdb) as db:
    db[homepage] = links
    for link in links:
        if link not in db:
            db[link] = None
            table.append(link)
    db.commit()

# Record the click-depth 1 & 2 pages
with sqldict(depthdb) as db:
    db[homepage] = 1
    for link in table:
        db[link] = 2
    db.commit()

h2("Finding unvisited links.")
table = []
with sqldict(linkdb) as db:
    for i, url in enumerate(db):
        feedback(i, len(db))
        row = (url, db[url])
        table.append(row)
    print()
df = pd.DataFrame(table)
df.columns = ["url", "links"]
df.set_index("url", inplace=True)

h2("Analyzing current click-depth.")
table = []
with sqldict(depthdb) as db:
    for i, link in enumerate(db):
        feedback(i, len(db))
        row = (link, db[link])
        table.append(row)
    print()
df_depth = pd.DataFrame(table)
df_depth.columns = ["url", "depth"]
df_depth.set_index("url", inplace=True)
df = df.join([df_depth])
max_depth = df["depth"].max()

to_crawl = df[(df["depth"] == max_depth) & (df["links"].isnull())]
to_crawl = list(to_crawl.index)
len_to_crawl = len(to_crawl)
len_to_crawl

if len_to_crawl:
    if len_to_crawl < max_crawl_per_run:
        max_crawl_per_run = len_to_crawl
    h2(
        f"Visiting {max_crawl_per_run} of {len_to_crawl} pages at click-depth {max_depth}:"
    )
    h3(f"Discovering unvisited click-depth {max_depth + 1} links.")
    with sqldict(linkdb) as db:
        for i, url in enumerate(to_crawl):
            db[url] = onsite_links(url)
            db.commit()
            print(f"{max_crawl_per_run - i} ", end="")
            if i >= max_crawl_per_run:
                h4(
                    f"Another {max_crawl_per_run} urls will be visited each time you run."
                )
                break
else:
    next_depth = max_depth + 1
    h2(f"Done click-depth {max_depth}. Setting up tables for click-depth {next_depth}.")
    table = []
    with sqldict(linkdb) as db:
        table = []
        for url in db:
            links = db[url]
            if links:
                for link in links:
                    table.append(link)
    with sqldict(linkdb) as db:
        for url in table:
            db[url] = None
        db.commit()
    with sqldict(depthdb) as db:
        for url in table:
            if url not in db:
                db[url] = next_depth
        db.commit()
    h3(f"On the next run click-dept {next_depth} will be crawled.")
h3("Done")

In [None]:
# Fetch All Pages All At Once
table = []
with sqldict(depthdb) as db:
    for url in db:
        table.append(url)

responsedb = f"{data_folder}/responses.db"
async with httpx.AsyncClient(headers=headers) as client:
    apromise = gather(*[client.get(url) for url in table], return_exceptions=True)
    with sqldict(responsedb) as db:
        for response in await apromise:
            db[str(response.url)] = response
        db.commit()

In [None]:
columns = ["url", "title", "description", "headlines", "html"]
heading_tags = ["h1", "h2", "h3", "h4", "h5", "h6"]
table = []
with sqldict(responsedb) as db:
    for i, url in enumerate(db):
        response = db[url]
        soup = bsoup(response.text, "html.parser")

        try:
            headlines = "\n\n".join(
                [f"Headline: {x.text}" for x in soup.find_all(heading_tags)]
            )
        except:
            headlines = None
        try:
            title = soup.title.string.strip()
        except:
            title = None
        try:
            description = soup.find("meta", attrs={"name": "description"}).attrs[
                "content"
            ]
        except:
            description = None
        try:
            html = str(soup.html)
        except:
            html = None
        row = (url, title, description, headlines, html)
        table.append(row)
df = pd.DataFrame(table, columns=columns)
df

In [None]:
import sqlite3
from contextlib import closing

types = ["TEXT PRIMARY KEY", "TEXT", "TEXT", "TEXT", "BLOB"]
sqlcols = ", ".join([f"{x} {types[i]}" for i, x in enumerate(columns)])
sqlcols
create_table = f"CREATE TABLE IF NOT EXISTS crawl ({sqlcols}) WITHOUT ROWID;"
create_table

with closing(sqlite3.connect("crawl/extract.db")) as conn:
    cursor = conn.cursor()
    cursor.execute(create_table)
    for record in df.to_records(index=False):
        values = "".join(["?, " for x in record])
        insert_record = f"INSERT INTO crawl ({', '.join(columns)}) VALUES ({values[:-2]})"
        data = ()
        cursor.execute(insert_record, record)
    conn.commit()