In [None]:
!pip install beautifulsoup4
!pip install lxml
!pip install requests
!pip install langchain
!pip install langchain-text-splitters
!pip install tiktoken
!pip install docling

!mkdir input
!mkdir output
!mkdir in_progress


In [None]:
from pathlib import Path
import json
import re
from datetime import datetime
from bs4 import BeautifulSoup, Comment
from langchain.text_splitter import MarkdownHeaderTextSplitter

import tiktoken

input_dir = Path('/content/input')
output_dir = Path('/content/output')
in_progress_dir = Path('/content/in_progress')

In [None]:
"""inputディレクトリ内のすべてのHTMLファイルを読み込む"""

input_html_files = list(input_dir.glob('*.html')) + list(input_dir.glob('*.htm'))
input_html_contents = {}
for input_html_file in input_html_files:
    print(f"processing... {input_html_file.name}")
    # HTMLファイルを読み込み
    with open(input_html_file, 'r', encoding='utf-8') as f:
        input_html_contents[input_html_file.name] = f.read()
        # ファイルサイズをKB単位でログ出力
        file_size = input_html_file.stat().st_size / 1024
        print(f"{input_html_file.name} : {file_size:.1f} kb")

print(f"html files : {len(input_html_files)}")

In [None]:
"""HTMLファイルのコンテンツによっては除外する"""

# 除外パターン（後日設定予定。今は例として2つのパターンを仮置き）
exclude_patterns = [
    "これは除外パターンの例1",  # 例: "広告"
    "これは除外パターンの例2",  # 例: "サンプルテキスト"
]

filtered_html_contents = {}
excluded_files = []

for filename, html_content in input_html_contents.items():
    excluded = False
    for pattern in exclude_patterns:
        if pattern in html_content:
            excluded = True
            print(f"{filename} is excluded for [{pattern}]")
            break
    if excluded:
        excluded_files.append(filename)
    else:
        filtered_html_contents[filename] = html_content

print(f"excluded files : {len(excluded_files)}")
print(f"not excluded files : {len(filtered_html_contents)}")


In [None]:
"""SourcePageURLを抽出する（後で使う）"""

def extract_canonical_url(html_content):
    soup = BeautifulSoup(html_content, 'html.parser')
    # まずcanonicalを探す
    canonical = soup.find('link', rel='canonical')
    if canonical and canonical.get('href'):
        return canonical['href']
    # canonicalがなければog:urlを探す
    og_url = soup.find('meta', property='og:url')
    if og_url and og_url.get('content'):
        return og_url['content']
    return ""

# 各HTMLファイルからcanonicalURLを抽出
source_page_urls = {}
canonical_found_count = 0
canonical_not_found_count = 0

for filename, html_content in filtered_html_contents.items():
    print(f"processing... {filename}")
    canonical_url = extract_canonical_url(html_content)
    source_page_urls[filename] = canonical_url
    
    if canonical_url:
        print(f"{filename} : {canonical_url}")
        canonical_found_count += 1
    else:
        print(f"{filename} : No canonical_url")
        canonical_not_found_count += 1

print(f"SourcePageURL extracted : total = {len(source_page_urls)}, founded = {canonical_found_count}, missing = {canonical_not_found_count}")

# デバッグしやすいようにファイルに出力しておく
timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
in_progress_json_path = in_progress_dir / f"source_page_urls_{timestamp}.json"
# source_page_urlsをoutputフォルダに保存
with open(in_progress_json_path, 'w', encoding='utf-8') as f:
    json.dump(source_page_urls, f, ensure_ascii=False, indent=2)

print(f"SourcePageURLs are saved as {in_progress_json_path}")

In [None]:
"""メジャーノイズを除去する"""
major_noise_removed_html_contents = {}

major_noise_tags = ['script', 'style', 'noscript', 'iframe', 'object', 'embed']

for filename, html_content in filtered_html_contents.items():
    soup = BeautifulSoup(html_content, 'html.parser')
    # 基礎ノイズタグを削除
    for tag in major_noise_tags:
        for element in soup.find_all(tag):
            element.decompose()
    # コメントを削除
    for comment in soup.find_all(string=lambda text: isinstance(text, Comment)):
        comment.extract()
    # 基礎ノイズ除去後のHTMLを保存
    major_noise_removed_html_contents[filename] = str(soup)

# デバッグしやすいようにファイルに出力しておく
major_noise_removed_dir = in_progress_dir / "major_noise_removed"
major_noise_removed_dir.mkdir(parents=True, exist_ok=True)
# 各HTMLコンテンツをmajor_noise_removedフォルダにファイルとして保存
for filename, major_noise_removed_html in major_noise_removed_html_contents.items():
    output_path = major_noise_removed_dir / filename
    with open(output_path, 'w', encoding='utf-8') as f:
        f.write(major_noise_removed_html)
    print(f"{filename} is saved as {output_path}")

print(f"completed : {len(major_noise_removed_html_contents)}")

In [None]:
"""マイナーノイズを除去する"""

# minor_noise_pattern.txtからパターンを読み込む
minor_noise_patterns = []
minor_noise_pattern_file = Path("minor_noise_pattern.txt")
with open(minor_noise_pattern_file, "r", encoding="utf-8") as f:
    minor_noise_patterns = [line.strip() for line in f if line.strip()]
print(f"minor_noise_pattern.txtから{len(minor_noise_patterns)}件のパターンを読み込みました")

minor_noise_removed_html_contents = {}

for filename, html_content in major_noise_removed_html_contents.items():
    processed_content = html_content
    for pattern in minor_noise_patterns:
        if re.search(pattern, processed_content, flags=re.MULTILINE | re.DOTALL):
            print(f"ファイル: {filename} にパターン: {pattern} がヒットしました")
        processed_content = re.sub(pattern, '', processed_content, flags=re.MULTILINE | re.DOTALL)
    minor_noise_removed_html_contents[filename] = processed_content

# デバッグしやすいようにファイルに出力しておく
minor_noise_removed_dir = in_progress_dir / "minor_noise_removed"
minor_noise_removed_dir.mkdir(parents=True, exist_ok=True)
# 各HTMLコンテンツをminor_noise_removedフォルダにファイルとして保存
for filename, minor_noise_removed_html in minor_noise_removed_html_contents.items():
    output_path = minor_noise_removed_dir / filename
    with open(output_path, 'w', encoding='utf-8') as f:
        f.write(minor_noise_removed_html)
    print(f"{filename} is saved as {output_path}")

print(f"completed : {len(minor_noise_removed_html_contents)}")


In [None]:
"""ノイズ除去済みのHTMLをMarkdownに変換する（doclingを使用）"""

from docling.document_converter import DocumentConverter
from docling.datamodel.base_models import InputFormat

markdown_contents = {}
# in_progressフォルダ内にmarkdownフォルダを作成
markdown_dir = in_progress_dir / "markdown"
markdown_dir.mkdir(parents=True, exist_ok=True)

for filename, html_content in minor_noise_removed_html_contents.items():    
    # DocumentConverterを使用してHTMLファイルをMarkdownに変換
    document_converter = DocumentConverter()
    result = document_converter.convert_string(content=html_content, format=InputFormat.HTML, name=filename)
    markdown = result.document.export_to_markdown()
    
    print(f"{filename} のHTMLをMarkdownに変換しました")
    markdown_contents[filename] = markdown

    # デバッグしやすいようにファイルに出力しておく
    # 各Markdownコンテンツをmarkdownフォルダにファイルとして保存
    output_md_path = markdown_dir / (Path(filename).stem + ".md")
    with open(output_md_path, 'w', encoding='utf-8') as f:
        f.write(markdown)
    print(f"{filename} is saved as {output_md_path}")


In [None]:
"""Markdownをセクション分割する"""

splitted_jsons = []
encoder = tiktoken.encoding_for_model("gpt-4.1")

for filename, markdown_content in markdown_contents.items():
    # MarkdownHeaderTextSplitterのインスタンスを作成
    headers_to_split_on = [
        ("#", "Header1"),
        ("##", "Header2"),
        ("###", "Header3"),
        ("####", "Header4"),
    ]
    splitter = MarkdownHeaderTextSplitter(
        headers_to_split_on=headers_to_split_on,
        strip_headers=True,
        return_each_line=False,
    )
    splitted_docs = splitter.split_text(markdown_content)
    print(f"{filename} を {len(splitted_docs)} セクションに分割しました")

    # source_page_urlsからfilenameをkeyにしてsource_page_urlを取得し、metadataにセット
    url = source_page_urls.get(filename, "")
    # JSON形式に変換
    for i, doc in enumerate(splitted_docs):
        splitted_json = {
            "id": f"{filename}_{i}",
            "content": doc.page_content.strip(),
            "metadata": {
                "source": filename,
                "url": url,
                "section_id": i,
                **doc.metadata
            }
        }
        tokens = encoder.encode(json.dumps(splitted_json, ensure_ascii=False))
        print(f"{filename}_{i} : ({len(tokens)} tokens)")
        splitted_jsons.append(splitted_json)

# in_progress_dir配下にjsonディレクトリを作成
json_dir = in_progress_dir / "json"
json_dir.mkdir(parents=True, exist_ok=True)

output_json_path = json_dir / (Path(filename).stem + ".json")
with open(output_json_path, 'w', encoding='utf-8') as f:
    json.dump(splitted_jsons, f, ensure_ascii=False, indent=2)
