In [None]:
## Set notebook to auto reload updated modules. Put this at the top of your Jupyter
#  notebooks, and if you import code from local .py files, you won't have to restart
#  the kernel when you make changes in the code.
%load_ext autoreload
%autoreload 2

---

# Book Scraper

This notebook scrapes the [books.toscrape.com site](https://books.toscrape.com).


---

## Imports

Import packages to use in the notebook.


In [None]:
from pathlib import Path
import random
import json

In [None]:
import httpx
from hishel import SyncSqliteStorage
from hishel.httpx import SyncCacheTransport

In [None]:
from bs4 import BeautifulSoup

## Constants

Define constants for the notebook (values that are set at the beginning and don't change).

In [None]:
## Path to HTTP cache database
CACHE_PATH = ".cache/http.cache"

## Base URL for target site
BASE_URL: str = "https://books.toscrape.com"

## Functions

Re-usable code for the notebook to call.

In [None]:
def get_cache_transport(
    cache_file: str | Path = CACHE_PATH, ttl: int = 900
) -> httpx.HTTPTransport:
    """Return a Hishel SyncCacheTransport with SQLite cache.

    Params:
        cache_file (str | Path): Path to a SQLite file where responses will be cached.
        ttl (int): Time an object should live in the cache before being cleared/refreshed (default=900, which is 15 minutes).
    """
    ## Ensure cache parent directory exists. If it already exists, nothing will happen
    Path(cache_file).parent.mkdir(parents=True, exist_ok=True)

    ## Prepare a Hishel SQLite database to use as a cache
    storage = SyncSqliteStorage(
        database_path=str(cache_file),
        default_ttl=ttl,
        refresh_ttl_on_access=True,
    )

    ## Create the cache transport
    return SyncCacheTransport(
        next_transport=httpx.HTTPTransport(),
        storage=storage,
    )

In [None]:
def get_client(
    use_cache: bool = False, cache_file: str | Path = CACHE_PATH, ttl: int = 900
) -> httpx.Client:
    """Return a reusable HTTPX client optionally with cache transport.

    Params:
        use_cache (bool): Whether to enable Hishel SQLite caching (default=False).
        ttl (int): Time an object should live in the cache before being cleared/refreshed (default=900, which is 15 minutes).
        cache_file (str | Path): Path to a SQLite file where responses will be cached.
    """
    ## Creates a cache transport for HTTPX if use_cache=True, otherwise value is None
    transport: SyncCacheTransport = (
        get_cache_transport(cache_file, ttl) if use_cache else None
    )

    ## Create an HTTPX client with optional cache transport
    return httpx.Client(transport=transport, timeout=10.0)

## Request page HTML & create BeautifulSoup object

In [None]:
## Create HTTP client to use for requests. Add a cache so we're not repeatedly sending live requests
http_client: httpx.Client = get_client(use_cache=True)

In [None]:
## Do a HEAD request to check if the site is online (should get a 200 response)
#  Raise an exception for any non-successful response, i.e. 400, 404, etc
ping: httpx.Response = http_client.head(BASE_URL)
ping.raise_for_status()

display(ping.status_code)

In [None]:
## Instead of using httpx.get() directly, you can also pre-create a Request object and use the client to send it
homepage_request: httpx.Request = httpx.Request(method="GET", url=BASE_URL)
display(f"Homepage request URL: {homepage_request.url}")

In [None]:
## Instead of httpx.get(), use httpx.send() and give it the homepage_request object

homepage_res: httpx.Response = http_client.send(homepage_request)
homepage_res.raise_for_status()

In [None]:
## Decode the response content into a string with the page's URL
homepage_html: str = homepage_res.content.decode("utf-8")

In [None]:
## Create a 'soup' from the HTML response
homepage_soup: BeautifulSoup = BeautifulSoup(homepage_html, "html.parser")

In [None]:
"""You can use .prettify() to format the HTML, which is useful for saving the HTML to a file.

You can open this file to find the tags you want to scrape, or use it as a cache, only sending
a request if this file does not exist (otherwise reading from the saved file).

The code below just saves the file for demonstration purposes, the response is already cached in the
path defined in the CACHE_PATH constant.
"""

with open("homepage.html", "w") as f:
    f.write(homepage_soup.prettify())

## Scrape the HTML

In [None]:
page_title = homepage_soup.title

## Display the <title> tag & text
display(page_title)

In [None]:
## Display just the text
display(page_title.name)

In [None]:
## Get the sidebar contents (i.e. genre links)
sidebar_genres_div = homepage_soup.find("div", class_="side_categories")

display(type(sidebar_genres_div))

In [None]:
## Get the <ul> nav list from the sidebar div
sidebar_links_ul = sidebar_genres_div.find("ul", class_="nav nav-list")

all_links = []

## Extract text & link from all <li> elements in the <ul>
for li in sidebar_links_ul.find_all("li"):
    link = li.find("a")

    if link:
        ## Extract the href= link
        href = link.get("href")
        ## Extract the linked text
        text = link.get_text(strip=True)  # strip=True removes things like \n characters

        ## Add dict with link text & href to all_links list
        all_links.append({"text": text, "href": href})

display(f"Found [{len(all_links)}] link(s) in sidebar")

In [None]:
## Get a random link from the list
rand_index = random.randint(0, len(all_links) - 1)
rand_link = all_links[rand_index]

display(f"Example link: {rand_link}")

In [None]:
## Find all books by getting <article class="product_pod"> tags
products = homepage_soup.find_all("article", class_="product_pod")

display(f"Found [{len(products)}] product(s) on the page")

In [None]:
## List to hold scraped books
books_data = []

## Iterate over products found on page
for product in products:
    ## Extract book title and link (from h3 > a)
    title_link = product.find("h3").find("a")
    title = title_link.get_text(strip=True)

    ## Extract link to book
    book_url = title_link.get("href")

    ## Extract book Price
    price = product.find("p", class_="price_color").get_text(strip=True)

    ## Extract rating (class="star-rating" on element <p>)
    rating_elem = product.find("p", class_="star-rating")
    rating = rating_elem.get("class")[1] if rating_elem else "Unknown"

    ## Extract stock availability
    availability = product.find("p", class_="instock availability").get_text(strip=True)

    ## Add book data to list
    books_data.append(
        {
            "title": title,
            "url": book_url,
            "price": price,
            "rating": rating,
            "availability": availability,
        }
    )

display(f"Found [{len(books_data)}] book(s) on page")

In [None]:
rand_book_index = random.randint(0, len(books_data) - 1)
display(f"Example book: {books_data[rand_book_index]}")

In [None]:
cleaned_books = []

## Clean book data, i.e. extract currency symbol & amount
for book in books_data:
    ## Strip currency symbol
    price_text = book["price"].strip()

    ## Match currency symbols to strings
    currency_symbol = price_text[0]

    book["currency_symbol"] = currency_symbol

    match currency_symbol:
        case "£":
            book["currency"] = "GBP"
        case "$":
            book["currency"] = "USD"
        case "€":
            book["currency"] = "EUR"
        case "¥":
            book["currency"] = "JPY"
        case _:
            book["currency"] = "Unknown"

    ## Extract numeric price value, i.e. 51.77
    book["price_numeric"] = float(book["price"][1:].replace(",", "").strip())

    cleaned_books.append(book)

display(f"Cleaned [{len(cleaned_books)}] book(s)")

In [None]:
## Get a random cleaned book
rand_cleaned_book_index = random.randint(0, len(cleaned_books) - 1)
display(f"Example cleaned book: {cleaned_books[rand_cleaned_book_index]}")

In [None]:
## Convert books data to JSON and save
with open("books_data.json", "w") as f:
    ## Dump Python list of dicts to JSON string
    #  Note that currency symbols will be encoded, i.e. £ becomes \u00a3
    _data = json.dumps(cleaned_books, indent=2, default=str)

    f.write(_data)

In [None]:
## Reload file to demonstrate currency symbol decoding
with open("books_data.json", "r") as f:
    _data = f.read()

    _json = json.loads(_data)

display(type(_json))

In [None]:
## Get a random book from the loaded JSON to show currency symbol decoding
rand_cleaned_book_index = random.randint(0, len(cleaned_books) - 1)
display("Currency symbol (decoded from being stored as unicode):")
display(f"{cleaned_books[rand_cleaned_book_index].get('currency_symbol')}")

## Cleanup

In [None]:
## Close HTTP client
http_client.close()