# Chunking 101

> Paired with Github Copilot

RAG技术的探讨大多从向量数据库中检索出文档开始，即已经**分好片**（chunked），还带有**元数据**（metadata）的文本，然后作为和LLM交流的提示词上文，再和Agent结合，如推理、使用工具、函数调用等等。

我们似乎默认向量数据库中的文档是已经分好片的，但是实际上，文档的分片是一个很复杂的问题，需要根据具体的应用场景来决定。这个过程叫做Chunking，一般来说分为两步：
- 第一步整理原始文档，去除各种噪音；
- 第二步拆分原书文档，变成合理的片段。

现实中我们面对的文档（尤其是长文档）可能是：
- 各种格式的文档，例如：Word, PDF, HTML, Markdown等等
- 长度不一的文档，有的文档可能只有几十字，有的文档可能有几十万字
- 文档内容有不同的语言，有的文档是中文，有的文档是英文

在拆分片段的时候我们要考虑：
- 文档作者自己的内容组织逻辑，可能是按照章节划分，也可能是按照主题划分
- 向量数据库和LLM的交互，需要考虑LLM的输入长度限制，以及LLM的理解能力
- 最后文档片段的使用场景，如提供答案、推理、使用工具、函数调用等等
- 元信息的提取，如标题、作者、时间、来源等等

上述这些都是需要根据具体的应用场景来决定的，因此需要频繁的调整和优化，也就意味着在实现RAG的时候需要不断的迭代和优化Chunking的过程。

在这个Notebook中，我们将由浅入深讨论如何对文档进行Chunking，以及如何使用LangChain进行Chunking。

> 我们可能要根据实际应用的场景不断调整分片，因此我们也期望尽量使用类似LangChain这样的库提供的开箱即用的函数，不需要太多自己的定制。


In [None]:
!pip install -q rich langchain

def print_docs_in_online(docs):
    for doc in docs:
        print(str(len(doc.page_content)) + ": " + doc.page_content[:35]+ "..."+doc.page_content[-35:])
            
# from rich import print # 让输出更好看一点:)

以一篇线上博客为为例：https://www.qinyu.info/post/wardley-maps/ch1/ （使用Hugo搭建）

Web文档通常是html格式，内容有结构组织，都是通过标签表示。Web文档内容有这样一些特点：
- 标记语言，Html格式本身就是文本，文本处理的效率较高
- 文字内容有分段落，段落一般是p标签
- 除了文本内容外还有标题，标题一般是h1, h2, h3等标签（作为meta信息，供后续加工）
- 在团队/组织/公司内，Web文档都是批量存放（Wiki/Confluence...），html 结构相对固定，可以批量处理（例如上面博客网站中所有博客的Html结构都是一致的）
- 不同的Web文档，html结构可能不同，需要根据**实际情况**（？）来解析

Web结构化内容的解析在自动化测试和爬虫中间已经有了有很多成熟的应用，Python也有不少封装好的库可以使用，比如BeautifulSoup4, selenium，playwright等等。
LangChain也有提供基本的[WebBaseLoader](https://python.langchain.com/docs/integrations/document_loaders/web_base/)，安装好LangChain就可以直接使用。

> 除了WebBaseLoader，LangChain还提供了功能接近的[*URLLoader](https://python.langchain.com/docs/integrations/document_loaders/url/)，包括了：PlaywrightURLLoader和SeleniumURLLoader。区别是WebBaseLoader直接使用http client请求Web页面，而Playwright和Selenium则需要通过浏览器（headless）操作。从效率上来说WebBaseLoader更高，但是如果是动态内容（例如JavaScript渲染的页面或是依赖操作）就需要使用Playwright或Selenium。

## 简单加载Web文档

下面是一个简单的例子，展示如何使用WebBaseLoader加载Web文档。

In [None]:
from langchain_community.document_loaders import WebBaseLoader
base_loader = WebBaseLoader("https://www.qinyu.info/post/wardley-maps/ch1/")
docs = base_loader.load()
# print(docs)
# print_docs_len(docs)
print_docs_in_online(docs)

## 自定义WebLoader处理特定的Web页面

我们可以看到，WebBaseLoader提供的基本功能把Web页面上的所有文字全部无差别地转换成了一个Document，这会带来很多不便。
- 长度太长
- 丢失了作者对内容的组织逻辑（章节）
- 很多无价值干扰信息，如banner，侧边栏等等

理解文章内容并结合Inspect后发现：
- 文章的主题内容全部在页面右侧class为post__content的div标签下，banner和侧边栏的内容不是我们需要的
- 文章的标题是是第一个h1标签，这是文章加载出来的所有Document都需要的的meta信息（最基本的摘要）
- 每个段落是p标签，这些标签的文字内容是我们需要的
- 段落中间还有h1标签，其后一组段落组成一个章节，h1标签是章节的小标题，这是章节需要的meta信息

因此对于这个特定博客的解析不能简单地使用WebBaseLoader，需要自定义一个Loader，来解析这个特定的博客页面。

In [None]:
from langchain_core.documents import Document
from typing import Iterator
from bs4 import BeautifulSoup

class SectionWebBaseLoader(WebBaseLoader):
    
    def lazy_load(self) -> Iterator[Document]:
        for path in self.web_paths:
            soup: BeautifulSoup = self._scrape(path, bs_kwargs=self.bs_kwargs) 
            # 通过观察页面结构，找到文章内容的div
            post = soup.find(name="div", attrs={"class":"post__content"})
            
            title = None
            section_content = ""
            for child in post.children:
                text = child.get_text()
                if child.name == "h1":
                    # 如果是h1标签，那么这个是一个新的章节
                    section = text
                    # 如果是第一个h1标签，那么这个是文章的标题
                    if title is None:
                        title = text
                        continue
                    
                    if section_content is not None:
                        # 如果不是第一个章节，把上一个章节的内容提取成Document
                        yield Document(page_content=section_content.strip(),
                                       metadata={"source": path, 
                                                 "title": title,
                                                 "section": section})
                    section_content = ""
                elif child.name == "p":
                    # 如果是p标签，那么这个是章节内容的一段
                    section_content = "".join([section_content, text])

base_loader = SectionWebBaseLoader("https://www.qinyu.info/post/wardley-maps/ch1/")
docs = base_loader.load()
# print(docs)
# print_docs_len(docs)
print_docs_in_online(docs)

## 对整理后的文档进行分片

我们可以看到，通过自定义的Loader，我们已经把文章内容按照章节（即作者的内容组织逻辑）进行了整理，但是这个时候的文档一部分还是太长，而且文档长度不一，需要进一步对文档继续分片。
我们期望进一步分片后每个片段的长度差不多是均匀的，且片段的完整性没有破坏（即至少是一句话）。


接下来我们就来看看LangChain提供的文本分片器，如何对文档进行分片。
- 基于自然语言处理的分片器（NLTK、SPACY）
- 基于句子的分片器（sentence-transformers）
- 基于字符的分片器


In [None]:
!pip install -q nltk spacy sentence-transformers
# !python -m spacy download en_core_web_sm # 下载spacy的模型

In [None]:
def split_docs(documents, splitter):
    _docs = []
    for doc in documents:
        chunked_text = splitter.split_text(doc.page_content)
        for text in chunked_text:
            _docs.append(Document(page_content=text,
                                  metadata=doc.metadata))
    return _docs


# import nltk
# nltk.download('punkt')
# 
# import spacy
# spacy.load("en_core_web_sm")

from langchain.text_splitter import NLTKTextSplitter, SpacyTextSplitter, SentenceTransformersTokenTextSplitter, RecursiveCharacterTextSplitter
nltk_splitter = NLTKTextSplitter()
spacy_splitter = SpacyTextSplitter(max_length=3000)
sentence_transformers_splitter = SentenceTransformersTokenTextSplitter()
text_splitter = RecursiveCharacterTextSplitter(chunk_size=500, 
                                               chunk_overlap=50,
                                               # keep_separator=True,
                                               is_separator_regex=True,
                                               separators=["\n", "\r\n", "\r", "。", "？", "！", "；", ";", "!", "?"])

chunked_docs = split_docs(docs, text_splitter)
# print(chunked_docs)
print_docs_in_online(chunked_docs)

# 第三方中文分片器

上面我们看到用LangChain提供的开箱即用的分片器：
- 处理中文文本非常地拉胯
- 按照句子拆分不必NLP这样的牛刀

> 但如果想从分片中提取实体或者关键字作为metadata，仍然需要利用NLP，但需要选择合适中文的NLP库，如jieba、THULAC、HanLP等等。（TODO：下一步调研）

我们可以使用第三方的中文分片器，例如：https://github.com/chatchat-space/Langchain-Chatchat/blob/master/text_splitter/chinese_recursive_text_splitter.py

In [None]:
import re
from typing import List, Optional, Any
from langchain.text_splitter import RecursiveCharacterTextSplitter
import logging

logger = logging.getLogger(__name__)


def _split_text_with_regex_from_end(
        text: str, separator: str, keep_separator: bool
) -> List[str]:
    # Now that we have the separator, split the text
    if separator:
        if keep_separator:
            # The parentheses in the pattern keep the delimiters in the result.
            _splits = re.split(f"({separator})", text)
            splits = ["".join(i) for i in zip(_splits[0::2], _splits[1::2])]
            if len(_splits) % 2 == 1:
                splits += _splits[-1:]
            # splits = [_splits[0]] + splits
        else:
            splits = re.split(separator, text)
    else:
        splits = list(text)
    return [s for s in splits if s != ""]


class ChineseRecursiveTextSplitter(RecursiveCharacterTextSplitter):
    def __init__(
            self,
            separators: Optional[List[str]] = None,
            keep_separator: bool = True,
            is_separator_regex: bool = True,
            **kwargs: Any,
    ) -> None:
        """Create a new TextSplitter."""
        super().__init__(keep_separator=keep_separator, **kwargs)
        self._separators = separators or [
            "\n\n",
            "\n",
            "。|！|？",
            "\.\s|\!\s|\?\s",
            "；|;\s",
            "，|,\s"
        ]
        self._is_separator_regex = is_separator_regex

    def _split_text(self, text: str, separators: List[str]) -> List[str]:
        """Split incoming text and return chunks."""
        final_chunks = []
        # Get appropriate separator to use
        separator = separators[-1]
        new_separators = []
        for i, _s in enumerate(separators):
            _separator = _s if self._is_separator_regex else re.escape(_s)
            if _s == "":
                separator = _s
                break
            if re.search(_separator, text):
                separator = _s
                new_separators = separators[i + 1:]
                break

        _separator = separator if self._is_separator_regex else re.escape(separator)
        splits = _split_text_with_regex_from_end(text, _separator, self._keep_separator)

        # Now go merging things, recursively splitting longer texts.
        _good_splits = []
        _separator = "" if self._keep_separator else separator
        for s in splits:
            if self._length_function(s) < self._chunk_size:
                _good_splits.append(s)
            else:
                if _good_splits:
                    merged_text = self._merge_splits(_good_splits, _separator)
                    final_chunks.extend(merged_text)
                    _good_splits = []
                if not new_separators:
                    final_chunks.append(s)
                else:
                    other_info = self._split_text(s, new_separators)
                    final_chunks.extend(other_info)
        if _good_splits:
            merged_text = self._merge_splits(_good_splits, _separator)
            final_chunks.extend(merged_text)
        return [re.sub(r"\n{2,}", "\n", chunk.strip()) for chunk in final_chunks if chunk.strip()!=""]


In [None]:
chunked_docs = split_docs(docs, ChineseRecursiveTextSplitter(chunk_size=500, chunk_overlap=50))
# print(chunked_docs)
print_docs_in_online(chunked_docs)

最终，我们得到了一篇Web文档的分片：
- 没有任何与主题无关的干扰信息
- 分片遵循文档原有的章节组织
- 分片的长度差不多均匀
- 分片没有破坏句子的完整性
- 分片除了source还有其他有价值的元信息（章节）

整个处理过程没有用到NLP或者LLM，仅是文本处理。处理过程的效率还不错。

## 总结

- 标记语言格式的文档的干扰信息容易排除（排除/保留特定标签）
- 标记语言格式的文档容易提取元信息
- 文档格式（标记）和文档内容组织（章节）有关系，一般需要分析并自定义处理
- 文档的分片一般使用基于字符的分片器就够了。（TODO：Embedding和检索时间效果待验证）
- 对LangChain提供的开箱即用的DocumentLoader和TextSplitter的能力有一个初步的了解

下一步工作（Chunking 102）：
- 了解更多的文档格式，如Word, PDF, Markdown等等
- 表格以及非结构化内容的处理，如图片等等
- 文本分片必要的语义处理（提取实体或者关键字、摘要等等）

文本分片以及元信息在具体RAG场景中使用的效果

## 参考
https://github.com/FullStackRetrieval-com/RetrievalTutorials/blob/8a30b5710b3dd99ef2239fb60c7b54bc38d3613d/tutorials/LevelsOfTextSplitting/5_Levels_Of_Text_Splitting.ipynb
