In [1]:
import sqlite3
import os
from datetime import datetime


def extract_and_save_pdf():
    # データベースに接続
    conn = sqlite3.connect("data/crawled_data.db")
    cursor = conn.cursor()

    try:
        # content_type が 'pdf' のレコードを1つ取得
        cursor.execute("SELECT url, content FROM pages WHERE content_type = 'pdf' LIMIT 1")
        result = cursor.fetchone()

        if result:
            url, content = result

            # ファイル名を生成（URLの最後の部分を使用）
            filename = os.path.basename(url)
            if not filename.endswith(".pdf"):
                filename = f"{filename}_{datetime.now().strftime('%Y%m%d%H%M%S')}.pdf"

            # コンテンツをファイルに書き込む
            with open(filename, "wb") as file:
                file.write(content)

            print(f"PDFファイルが正常に保存されました: {filename}")
        else:
            print("PDFコンテンツが見つかりませんでした。")

    except sqlite3.Error as e:
        print(f"SQLiteエラーが発生しました: {e}")
    finally:
        # 接続を閉じる
        conn.close()

In [None]:
extract_and_save_pdf()

In [None]:
import re
from bs4 import BeautifulSoup
import html2text


def html_to_markdown(html_content):
    # BeautifulSoupを使用してHTMLを解析
    soup = BeautifulSoup(html_content, "html.parser")

    # html2textコンバーターを初期化
    h = html2text.HTML2Text()
    h.body_width = 0  # 行の折り返しを無効化
    h.unicode_snob = True  # Unicodeを保持
    h.skip_internal_links = False  # 内部リンクを保持

    # 特定のHTML要素を処理
    for tag in soup(["h1", "h2", "h3", "h4", "h5", "h6"]):
        level = int(tag.name[1])
        tag.replace_with(f"{'#' * level} {tag.text}\n\n")

    for tag in soup(["strong", "b"]):
        tag.replace_with(f"**{tag.text}**")

    for tag in soup(["em", "i"]):
        tag.replace_with(f"*{tag.text}*")

    for tag in soup.find_all("a", href=True):
        tag.replace_with(f"[{tag.text}]({tag['href']})")

    for tag in soup.find_all("img", src=True):
        alt_text = tag.get("alt", "")
        tag.replace_with(f"![{alt_text}]({tag['src']})")

    for tag in soup.find_all("pre"):
        code = tag.find("code")
        if code:
            language = code.get("class", [""])[0].replace("language-", "")
            tag.replace_with(f"```{language}\n{code.text}\n```\n\n")
        else:
            tag.replace_with(f"```\n{tag.text}\n```\n\n")

    for tag in soup.find_all("code"):
        if tag.parent.name != "pre":
            tag.replace_with(f"`{tag.text}`")

    # テーブルの処理
    for table in soup.find_all("table"):
        markdown_table = []
        headers = []

        # ヘッダーの処理
        for th in table.find_all("th"):
            headers.append(th.text.strip())

        if headers:
            markdown_table.append("| " + " | ".join(headers) + " |")
            markdown_table.append("| " + " | ".join(["---" for _ in headers]) + " |")

        # 行の処理
        for row in table.find_all("tr"):
            cells = row.find_all(["td", "th"])
            if cells:
                markdown_row = "| " + " | ".join(cell.text.strip() for cell in cells) + " |"
                markdown_table.append(markdown_row)

        # テーブルをMarkdown形式に置き換え
        table.replace_with("\n".join(markdown_table) + "\n\n")

    # 残りのHTMLをMarkdownに変換
    markdown = h.handle(str(soup))

    # 余分な空白行を削除
    markdown = re.sub(r"\n{3,}", "\n\n", markdown)

    return markdown.strip()


# 使用例
html_content = """
<h1>Welcome to My Website</h1>
<p>This is a <strong>bold</strong> and <em>italic</em> text.</p>
<h2>Product Comparison</h2>
<table>
    <tr>
        <th>Product</th>
        <th>Price</th>
        <th>Rating</th>
    </tr>
    <tr>
        <td>Widget A</td>
        <td>$10.99</td>
        <td>4.5/5</td>
    </tr>
    <tr>
        <td>Gadget B</td>
        <td>$24.99</td>
        <td>4.2/5</td>
    </tr>
</table>
<p>Check out our <a href="https://example.com/products">product page</a> for more information.</p>
"""

markdown_result = html_to_markdown(html_content)
print(markdown_result)

In [13]:
import urllib.parse


import urllib.parse
import posixpath
from typing import List, Tuple, Dict

DEFAULT_PORTS: Dict[str, str] = {
    "http": ":80",
    "https": ":443",
    # 必要に応じて他のスキームも追加
}


def normalize_url(url: str) -> str:
    """
    URLを正規化する関数。

    Args:
        url (str): 正規化したいURL。

    Returns:
        str: 正規化されたURL。
    """
    try:
        # URLをパース
        parsed: urllib.parse.ParseResult = urllib.parse.urlparse(url)

        # スキームとホストを小文字に
        scheme: str = parsed.scheme.lower()
        netloc: str = parsed.netloc.lower()

        # デフォルトポートを削除
        for default_scheme, default_port in DEFAULT_PORTS.items():
            if scheme == default_scheme and netloc.endswith(default_port):
                netloc = netloc[: -len(default_port)]
                break

        # パスの正規化
        path: str = urllib.parse.unquote(parsed.path)
        path = posixpath.normpath(path)
        if parsed.path.endswith("/") and not path.endswith("/"):
            # 元のパスが末尾スラッシュを含んでいた場合、正規化後もスラッシュを保持
            path += "/"
        # ルートパスでない場合、末尾のスラッシュを削除
        if path != "/" and path.endswith("/"):
            path = path.rstrip("/")
        path = urllib.parse.quote(path, safe="/")

        # クエリのソート
        query_list: List[Tuple[str, str]] = urllib.parse.parse_qsl(parsed.query, keep_blank_values=True)
        query_list.sort()
        query: str = urllib.parse.urlencode(query_list)

        # フラグメントを削除
        fragment: str = ""

        # 正規化されたURLを再構築
        normalized: str = urllib.parse.urlunparse((scheme, netloc, path, "", query, fragment))

        return normalized

    except Exception as e:
        raise ValueError(f"Invalid URL provided: {url}") from e

In [14]:
urls = [
    "https://emosta.com/en/about",
    "https://emosta.com/en/about/",
]

for url in urls:
    normalized_url = normalize_url(url)
    print(url, "->", normalized_url)

https://emosta.com/en/about -> https://emosta.com/en/about
https://emosta.com/en/about/ -> https://emosta.com/en/about
