## Imports


In [73]:
import pprint
import random
import re
import time
from datetime import UTC, datetime
from pathlib import Path

import pandas as pd
import requests
from bs4 import BeautifulSoup
from rich import print as rprint
from rich.console import Console
from rich.panel import Panel
from rich.progress import (
    BarColumn,
    Progress,
    TaskProgressColumn,
    TextColumn,
    TimeRemainingColumn,
)
from pygments import highlight
from pygments.formatters import TerminalFormatter
from pygments.lexers import HtmlLexer


## Config (params)


In [2]:
console = Console()
session = requests.Session()
session.headers.update(
    {
        "User-Agent": "Mozilla/5.0",
        "Accept": "text/html,application/json",
        "Authorization": "Bearer your-token",  # if needed
    },
)

url = "https://myanimelist.net/topanime.php"

## Functions (methods)


### Fetch page: Send a request, and extract the raw HTML


In [3]:
def fetch_page(session: requests.Session | None, url: str) -> str:
    """Fetch a page using a session."""
    time.sleep(random.uniform(3, 5))  # noqa: S311
    if not session:
        session = requests.Session()

    response = session.get(url)
    if response.status_code == 200:
        return response.text

    msg = f"Failed to fetch {url}: [bold red]{response.status_code}[/bold red]"
    raise Exception(
        msg,  # type: ignore
    )

### Get media items: Extract the items listed in the HTML raw


In [4]:
# TODO(kyoumas):
# 1. Change the `raw` parameter to a enum class
# 2. Add start/end functionality


def get_items_list(raw: str, max_items: int = 3) -> list[tuple[str, str]]:
    """Extract the media items of the current page to scrap."""
    soup = BeautifulSoup(raw, "html.parser")
    media_list = []

    for i, item in enumerate(soup.select("a[class=hoverinfo_trigger]")):
        if i >= max_items:
            break

        url = item["href"]
        name = item.text.strip()
        media_list.append((name, url))

    return media_list

In [90]:
# TODO: add read pandas DataFrame functionality
def get_item_detail(
    raw: str,
) -> dict[str, int | str | list[str] | None]:
    """Extract the detailed data for an item."""
    # TODO: replace with Media class and their get_stats_url method
    stats = {}
    soup = BeautifulSoup(raw, "html.parser")

    for div in soup.select("div[class*='spaceit_pad']"):
        label = div.find("span", class_="dark_text")

        if label:
            key = label.text.strip().rstrip(":").lower()
            if key == "score":
                score_span = div.find("span", class_="score-label")
                if score_span:
                    value = score_span.text.strip()
                else:
                    continue
            elif key == "ranked" or key == "popularity":
                value = label.next_sibling.strip()
                continue
            else:
                links = div.find_all("a")
                # Get the text after the label and clean it
                if len(links) > 1:
                    value = [link.text.strip() for link in links]
                elif len(links) == 1:
                    value = links[0].text.strip()
                else:
                    value = label.next_sibling.strip()
            stats[key] = value
            print(f"{key}: {value}")

            # Print html
            colored_html = highlight(div.prettify(), HtmlLexer(), TerminalFormatter())
            print(colored_html)

    return stats

In [22]:
class Storage:
    """Handle all file operations for the scraper."""

    def __init__(self, base_path: str | Path) -> None:
        self.base_path = Path(base_path)
        self.raw_path = self.base_path / "data" / "raw"
        self.scraped_path = self.base_path / "data" / "scraped"
        self._ensure_directories()

    def _ensure_directories(self) -> None:
        """Create necessary directories if they don't exist."""
        self.raw_path.mkdir(parents=True, exist_ok=True)

    def save_html(self, content: str, filename: str) -> Path:
        """Save raw HTML content to a file."""
        if not filename.endswith(".html"):
            filename += ".html"

        file_path = self.raw_path / filename
        with file_path.open("w", encoding="utf-8") as f:
            f.write(content)
        return file_path

    def read_html(self, filename: str) -> str:
        """Read raw HTML content from a file."""
        file_path = self.raw_path / filename
        if not file_path.exists():
            raise FileNotFoundError(file_path)
        with file_path.open("r", encoding="utf-8") as f:
            return f.read()

    def save_csv(self, content: list[dict], filename: str) -> Path:
        """Export the extracted data into a csv file."""
        df = pd.DataFrame(content)
        file_path = self.scraped_path / filename
        df.to_csv(file_path, index=False)

        return file_path

In [7]:
def name_formatter(input_string: str) -> str:
    """Convert string to arbitrary name convention."""
    # Convert the string to lowercase
    lower_case_string = input_string.lower()

    # Replace spaces and special characters with underscores
    snake_case_string = re.sub(r"[^a-z0-9]+", "_", lower_case_string).strip("_")

    # Get the current date
    current_date = datetime.now(tz=UTC).strftime("%Y-%m-%d")

    # Combine the snake_case string with the date
    return f"{snake_case_string}_{current_date}"


In [91]:
def demo_scrap():
    console = Console()
    # Initialize storage and scraper
    storage = Storage("../")

    filename = "index.html"
    raw_index = storage.read_html(filename=filename)

    media_items: list[dict] = []
    items_list = get_items_list(raw=raw_index)

    for i, item in enumerate(items_list, 1):
        if i >= 2:
            break

        name = item[0]
        item_url = item[1]
        filename = name_formatter(name) + ".html"

        raw_detail = storage.read_html(filename=filename)
        item_detail = get_item_detail(raw=raw_detail)

        # Append to final list
        media_items.append(item_detail)

demo_dict = demo_scrap()

synonyms: Frieren at the Funeral, Frieren The Slayer
<[94mdiv[39;49;00m [36mclass[39;49;00m=[33m"spaceit_pad"[39;49;00m>
 <[94mspan[39;49;00m [36mclass[39;49;00m=[33m"dark_text"[39;49;00m>
  Synonyms:
 </[94mspan[39;49;00m>
 Frieren at the Funeral, Frieren The Slayer
</[94mdiv[39;49;00m>

japanese: 葬送のフリーレン
<[94mdiv[39;49;00m [36mclass[39;49;00m=[33m"spaceit_pad"[39;49;00m>
 <[94mspan[39;49;00m [36mclass[39;49;00m=[33m"dark_text"[39;49;00m>
  Japanese:
 </[94mspan[39;49;00m>
 葬送のフリーレン
</[94mdiv[39;49;00m>

english: Frieren: Beyond Journey's End
<[94mdiv[39;49;00m [36mclass[39;49;00m=[33m"spaceit_pad"[39;49;00m>
 <[94mspan[39;49;00m [36mclass[39;49;00m=[33m"dark_text"[39;49;00m>
  English:
 </[94mspan[39;49;00m>
 Frieren: Beyond Journey's End
</[94mdiv[39;49;00m>

type: TV
<[94mdiv[39;49;00m [36mclass[39;49;00m=[33m"spaceit_pad"[39;49;00m>
 <[94mspan[39;49;00m [36mclass[39;49;00m=[33m"dark_text"[39;49;00m>
  Type:
 </[94mspan

In [51]:
demo_dict

In [26]:
def scrap(fetch: bool = False) -> list[dict[str, int | str | list[str] | None]]:
    """Execute functions only if is the main module."""
    storage = Storage("..")
    console = Console()

    if fetch:
        raw_index = fetch_page(session=session, url=url)
        storage.save_html(content=raw_index, filename="index.html")
    else:
        raw_index = storage.read_html(filename="index.html")

    media_items = []
    items_list = get_items_list(raw=raw_index, max_items=20)

    # Create progress bar with custom columns
    with Progress(
        TextColumn("[bold blue]{task.description}"),
        BarColumn(),
        TaskProgressColumn(),
        TimeRemainingColumn(),
        console=console,
    ) as progress:
        # Create the main task
        main_task = progress.add_task(
            f"[cyan]Processing {len(items_list)} items...",
            total=len(items_list),
        )

        for i, item in enumerate(items_list):
            name = item[0]
            filename = name_formatter(name) + ".html"

            # Update task description to show current item
            progress.update(
                main_task,
                description=f"[cyan]Processing {i + 1}/{len(items_list)}: {name[:30]}...",
            )

            if fetch:
                raw_detail = fetch_page(session=session, url=item[1])
                save_path = storage.save_html(content=raw_detail, filename=filename)
                console.print(f"[green]Saved in {save_path}")
            else:
                raw_detail = storage.read_html(filename=filename)

            # Show item name in panel
            # console.print(Panel(name))

            item_detail = get_item_detail(raw=raw_detail)

            # Pretty print the details
            # console.print(pprint.pformat(item_detail))
            media_items.append(item_detail)

            # Advance the progress bar
            progress.advance(main_task)

    return media_items

In [None]:
if __name__ == "__main__":


Output()