In [None]:
from __future__ import annotations

import csv
import os
from typing import Final, Iterator, Sequence

import polars as pl
import requests
from bs4 import BeautifulSoup, SoupStrainer
from lxml import etree as et
from requests.adapters import HTTPAdapter, Retry

ROOT: Final[str] = "https://www.metacritic.com/game"

In [None]:
def get_last_sitemap_index(s: requests.Session) -> int:
    """Return index value of the last sitemap in the sitemap catalog.

    Args:
        s: Session object to fetch sitemap catalog over a persistent http connection.
    """
    sitemap_catalog = s.get(f"{ROOT}s.xml")
    sitemap_catalog.raise_for_status()
    catalog_etree = et.XML(sitemap_catalog.text)
    nsmap = {"ns": catalog_etree.nsmap[None]}
    # XPath query tested faster compared to ElementPath methods
    last_url = catalog_etree.xpath(
        "(//ns:loc/text())[last()]",
        namespaces=nsmap,
        smart_strings=False,
    )
    if not last_url:
        raise IndexError("No sitemaps found.")
    last_index = int(last_url[0].rpartition("/")[-1].partition(".")[0])
    return last_index

In [None]:
def extract_url_slugs(loc_elements: Iterator[et._Element], url_slugs: set[str]) -> None:
    """Process a single sitemap's loc tags and update the set of URL slugs.

    Args:
        loc_elements: A generator of loc tags containing URL text.
        url_slugs: The set in which game URL slugs are to be stored.
    """
    for loc in loc_elements:
        if (url := loc.text) is None:
            continue
        url_slug = url.rstrip("/").rpartition("/")[-1]
        url_slugs.add(url_slug)

In [None]:
def scrape_sitemaps(s: requests.Session, last_index: int) -> list[str]:
    """Retrieve URL slugs associated with all games from sitemaps.

    Colloquially, a slug is the unique identifying part of a web address,
    typically at the end of the URL. Each sitemap contains approximately 1000 URLs.

    Args:
        s: Session object to fetch sitemaps over a persistent http connection.
        last_index: End value for iteration of all sitemaps. (There appear to
            be more unlisted sitemaps of higher indices in the catalog,
            but these appear to be duplicates or vestigial in nature).

    Returns:
        A list of URL slugs corresponding to every game indexed in metacritic's sitemaps.
    """
    url_slugs: set[str] = set()
    for i in range(1, last_index + 1):
        sitemap_response = s.get(f"{ROOT}s/{i}.xml")
        sitemap_response.raise_for_status()
        sitemap_etree = et.XML(sitemap_response.text)
        loc_elements = sitemap_etree.iterfind(".//loc", namespaces=sitemap_etree.nsmap)
        extract_url_slugs(loc_elements, url_slugs)

    return list(url_slugs)

In [None]:
def scrape_game_details(game_pg: requests.Response):
    soup = BeautifulSoup(game_pg.content, features="lxml")
    detail_tags = soup.select(
        "meta[name='description'], "
        ".c-productHero_title > div, "
        ".c-productHero_scoreInfo .c-siteReviewScore > span, "
        ".c-productionDetailsGame_esrb_title > span:first-child, "
        ".c-gameDetails_Platforms > ul, "
        ".c-gameDetails_ReleaseDate :last-child, "
        ".c-gameDetails_Developer ul, "
        ".c-gameDetails_Distributor :last-child, "
        "ul.c-genreList"
    )
    processed_details: list[str | list[str]] = []
    for i, tag in enumerate(detail_tags):
        # Special case as game description text is only found in full as an attribute value.
        if i == 0:
            processed_details.append(
                str(tag["content"]).replace("\t", "").replace("\n", " ")
            )
        elif i in [1, 2, 3, 4, 6, 8]:
            stripped = tag.get_text(strip=True)
            # Scale Userscore to int range for later memory optimization
            if i == 3:
                processed_details.append(str(int(10 * float(stripped))))
            # Remove unnecessary "Rated " text preceding esrb label
            elif i == 4:
                processed_details.append(stripped.partition(" ")[-1])
            else:
                processed_details.append(stripped)
        # Tags containing multiple children
        elif i in [5, 7, 9]:
            processed_details.append([li.get_text(strip=True) for li in tag])
    processed_details.append(processed_details.pop(0))
    with open("../data/game_details.tsv", "w", newline="") as file:
        writer = csv.writer(file, delimiter="\t")
        writer.writerow(processed_details)

In [None]:
def scrape_critic_info():
    pass

In [None]:
def scrape_games(s: requests.Session, url_slugs: list[str]):
    """ """
    game_url = f"{ROOT}/elden-ring"
    game_pg = s.get(game_url)
    if not game_pg.ok and game_pg.status_code != 429:
        print("error fetching game page")
        return
        # continue
    critic_pg = s.get(f"{game_url}/critic-reviews")
    scrape_game_details(game_pg)
    scrape_critic_info()


with requests.Session() as s:
    s.headers = {"User-Agent": "Edge"}
    scrape_games(s, [])

In [None]:
def scraper(
    user_agent: str = "Edge",
    num_retries: int = 5,
    backoff_factor: float = 0.5,
    backoff_jitter: float = 0.5,
    status_forcelist: list[int] = [429],
    respect_retry_after_header: bool = False,
) -> None:
    """ """
    with requests.Session() as s:
        s.headers = {"User-Agent": user_agent}
        retry_config = Retry(
            total=num_retries,
            backoff_factor=backoff_factor,
            backoff_jitter=backoff_jitter,
            status_forcelist=status_forcelist,
            respect_retry_after_header=respect_retry_after_header,
        )
        s.mount("https://", HTTPAdapter(max_retries=retry_config))
        if not os.path.isfile("../data/games.tsv"):
            last_index = get_last_sitemap_index(s)
            url_slugs = scrape_sitemaps(s, last_index)
            pl.DataFrame({"url_slugs": url_slugs}).write_csv(
                file="../data/games.tsv",
                separator="\t",
            )
        else:
            url_slugs = pl.read_csv("../data/games.tsv").to_series().to_list()
        scrape_games(s, url_slugs)


scraper()

In [None]:
def create_template(
    url_slugs: Sequence[str | int] = [], *, is_index: bool = False
) -> str:
    """Return string adhereing to XML schema for the Sitemap protocol
    containing user defined dummy urls.

    Utility function to construct body content for mock http responses.
    Any type of string is accepted, but only ints or int-like strings are
    expected within the list if is_index is true (constructing a sitemap index).

    Args:
        url_slugs: Corresponds to game titles when constructing a sitemap, while
            represents ordinal values when constructing a sitemap index.
        is_index: Determines whether to construct a template for a sitemap
            or a sitemap index.
    """
    ns = 'xmlns="http://www.sitemaps.org/schemas/sitemap/0.9"'
    container_start_tag = f"<urlset {ns}>"
    item_start_tag = "<url><loc>"
    ext = "/"
    item_end_tag = "</loc></url>"
    container_end_tag = "</urlset>"
    if is_index:
        container_start_tag = f"<sitemapindex {ns}>"
        item_start_tag = "<sitemap><loc>"
        ext = ".xml"
        item_end_tag = "</loc></sitemap>"
        container_end_tag = "</sitemapindex>"
    # The empty string element is a special case where the url root is also omitted.
    url_generator = (
        (
            f"{item_start_tag}{item_end_tag}"
            if slug == ""
            else f"{item_start_tag}{ROOT}/{slug}{ext}{item_end_tag}"
        )
        for slug in url_slugs
    )
    return f"{container_start_tag}{" ".join(url_generator)}{container_end_tag}"

In [None]:
import unittest

import responses


class TestPrototype(unittest.TestCase):
    @classmethod
    def setUpClass(cls) -> None:
        cls.s = cls.enterClassContext(requests.Session())
        cls.r = cls.enterClassContext(responses.RequestsMock())

    def test_get_last_sitemap_index(self) -> None:
        statuses = [200] * 5 + [404]
        slugs_to_test = [
            [7],
            ["a", 20],
            [11, 3, 94],
            [],
            [20, "a"],
            [0, 0],
        ]
        for status, slugs in zip(statuses, slugs_to_test):
            self.r.get(
                url=f"{ROOT}s.xml",
                status=status,
                body=create_template(slugs, is_index=True),
            )

        for i in range(3):
            self.assertEqual(get_last_sitemap_index(self.s), slugs_to_test[i][-1])
        self.assertRaisesRegex(
            IndexError, "No sitemaps found.", get_last_sitemap_index, self.s
        )
        self.assertRaises(ValueError, get_last_sitemap_index, self.s)
        self.assertRaises(requests.HTTPError, get_last_sitemap_index, self.s)

    def test_scrape_sitemaps(self) -> None:
        statuses = [200, 200, 404]
        slugs_to_test = [
            ["dark-summit", "warhawk", "dark-summit"],
            [""],
            ["bioshock"],
        ]
        for status, slugs in zip(statuses, slugs_to_test):
            self.r.get(
                url=f"{ROOT}s/1.xml",
                status=status,
                body=create_template(slugs),
            )
        self.r.get(
            url=f"{ROOT}s/2.xml",
            status=200,
            body=create_template(["warhawk", "metal-slug-2"]),
        )

        TEST_INDEX = 2
        self.assertCountEqual(
            scrape_sitemaps(self.s, TEST_INDEX),
            ["dark-summit", "warhawk", "metal-slug-2"],
        )
        self.assertCountEqual(
            scrape_sitemaps(self.s, TEST_INDEX),
            ["warhawk", "metal-slug-2"],
        )
        self.assertRaises(requests.HTTPError, scrape_sitemaps, self.s, TEST_INDEX)


unittest.main(argv=[""], verbosity=2, exit=False)