In [None]:
%pip install nltk beautifulsoup4 sentence-transformers transformers scikit-learn html2text

In [None]:
import re
import json
from bs4 import BeautifulSoup
import html2text
from nltk.tokenize import sent_tokenize
from sentence_transformers import SentenceTransformer
from sklearn.cluster import AgglomerativeClustering
from transformers import AutoTokenizer


def clean_html(html: str) -> str:
    soup = BeautifulSoup(html, "html.parser")
    for tag in soup(["script", "style", "nav", "footer", "header", "aside"]):
        tag.extract()
    return html2text.html2text(str(soup))


def extract_sentences(text: str):
    # 统一多段落空行
    text = re.sub(r'\n{2,}', '\n\n', text.strip())
    return sent_tokenize(text)


def cluster_sentences(sentences, model_name='all-MiniLM-L6-v2', threshold=1.0):
    model = SentenceTransformer(model_name)
    embeddings = model.encode(sentences)
    clustering = AgglomerativeClustering(n_clusters=None, distance_threshold=threshold)
    labels = clustering.fit_predict(embeddings)

    segments = []
    current = []
    last_label = labels[0]
    for sent, label in zip(sentences, labels):
        if label != last_label:
            segments.append(" ".join(current))
            current = []
        current.append(sent)
        last_label = label
    segments.append(" ".join(current))
    return segments


def enforce_token_limit(segments, max_tokens=2048, tokenizer_name='mistralai/Mistral-7B-Instruct-v0.1'):
    tokenizer = AutoTokenizer.from_pretrained(tokenizer_name)
    final_chunks = []
    current_chunk = ""

    for segment in segments:
        if len(tokenizer.encode(current_chunk + segment)) > max_tokens:
            if current_chunk:
                final_chunks.append(current_chunk.strip())
                current_chunk = ""
            if len(tokenizer.encode(segment)) > max_tokens:
                # 分段过长，进一步切
                sentences = sent_tokenize(segment)
                for s in sentences:
                    if len(tokenizer.encode(current_chunk + s)) > max_tokens:
                        final_chunks.append(current_chunk.strip())
                        current_chunk = s
                    else:
                        current_chunk += " " + s
                continue
        current_chunk += "\n" + segment
    if current_chunk:
        final_chunks.append(current_chunk.strip())
    return final_chunks


def semantic_paragraph_split(text, is_html=False, max_tokens=2048):
    if is_html:
        text = clean_html(text)

    sentences = extract_sentences(text)
    segments = cluster_sentences(sentences)
    chunks = enforce_token_limit(segments, max_tokens=max_tokens)
    return [{"id": i, "content": chunk} for i, chunk in enumerate(chunks)]


In [None]:
# if __name__ == "__main__":
import os
path = "sample.md"
path = os.path.expanduser("~/Downloads/示例html.html")
with open(path, "r", encoding="utf-8") as f:
    raw_text = f.read()

results = semantic_paragraph_split(raw_text, is_html=False, max_tokens=2048)

with open("split_output.json", "w", encoding="utf-8") as f:
    json.dump(results, f, ensure_ascii=False, indent=2)

print("段落拆分完成，共分得 {} 段。".format(len(results)))
