# MarkdownHeaderTextSplitter

The objective is to split a markdown file by aribrary delimiters.

Each text split is associated with its associated delimiters as metadata.
 
**Given this example:**

# Foo

## Bar

Hi this is Jim

Hi this is Joe

## Baz

Hi this is Molly

**And these delimiters:**

```
[("#", "Header 1"), ("##", "Header 2"), ("\n", None)]
```

**We expect:** 

```
{"content": "Hi this is Jim", metadata={"Header 1": "Foo", "Header 2": "Bar"}},
{"content": "Hi this is Joe", metadata={"Header 1": "Foo", "Header 2": "Bar"}},
{"content": "Hi this is Molly", metadata={"Header 1": "Foo", "Header 2": "Baz"}},
```

In [3]:
from langchain.text_splitter import MarkdownHeaderTextSplitter

In [4]:
# Doc
markdown_document = '# Foo\n\n    ## Bar\n\nHi this is Jim\n\nHi this is Joe\n\n ## Baz\n\n Hi this is Molly' 
    
# Test case 1
splits = [
    ("#", "Header 1"),
    ("##", "Header 2"),
    ("\n", None)
]
 
markdown_splitter = MarkdownHeaderTextSplitter(splits=splits)
chunked_docs = markdown_splitter.split_text(markdown_document)
for chunk in chunked_docs:
    print(chunk)

{'content': 'Hi this is Jim', 'metadata': {'Header 2': 'Bar', 'Header 1': ''}}
{'content': 'Hi this is Joe', 'metadata': {'Header 2': 'Bar', 'Header 1': ''}}
{'content': 'Hi this is Molly', 'metadata': {'Header 2': 'Baz', 'Header 1': ''}}


In [None]:
{"content": "Hi this is Jim", metadata={"Header 1": "Foo", "Header 2": "Bar"}},
{"content": "Hi this is Joe", metadata={"Header 1": "Foo", "Header 2": "Bar"}},
{"content": "Hi this is Molly", metadata={"Header 1": "Foo", "Header 2": "Baz"}},

`Test case 2`

In [3]:
splits = [
    ("#", "Header 1"),
    ("##", "Header 2"),
    ("\n", None)
]
markdown_document = '# Introduction\n\n## Overview\n\nThis is an introductory paragraph.\
\n\n## Details\n\nThis is a detailed paragraph.\n\n# Conclusion\n\nThis is the conclusion.'

markdown_splitter = MarkdownHeaderTextSplitter(splits=splits)
chunked_docs = markdown_splitter.split_text(markdown_document)
for chunk in chunked_docs:
    print(chunk)

{'content': 'This is an introductory paragraph.', 'metadata': {'Header 2': 'Overview', 'Header 1': ''}}
{'content': 'This is a detailed paragraph.', 'metadata': {'Header 2': 'Details', 'Header 1': ''}}
{'content': 'This is the conclusion.', 'metadata': {'Header 2': 'Details', 'Header 1': ''}}


In [None]:
# Expected
{'content': 'This is an introductory paragraph.', 'metadata': {'Header 2': 'Overview', 'Header 1': 'Introduction'}}
{'content': 'This is a detailed paragraph.', 'metadata': {'Header 2': 'Details', 'Header 1': 'Introduction'}}
{'content': 'This is the conclusion.', 'metadata': {'Header 1': 'Conclusion'}}

`Test case 3`

In [7]:
splits = [
    ("#", "Header 1"),
    ("##", "Header 2"),
    ("###", "Header 3"),
    ("\n", None)
]
markdown_document = '# H1\n\n## H2\n\n### H3\n\nText under H3.\n\n# H1_2\n\n## H2_2\n\nText under H2_2.'
markdown_splitter = MarkdownHeaderTextSplitter(splits=splits)
chunked_docs = markdown_splitter.split_text(markdown_document)
for chunk in chunked_docs:
    print(chunk)

{'content': 'Text under H3.', 'metadata': {'Header 3': 'H3', 'Header 2': 'H2', 'Header 1': 'H1'}}
{'content': 'Text under H2_2.', 'metadata': {'Header 3': 'H3', 'Header 2': 'H2_2', 'Header 1': 'H1_2'}}


In [None]:
{'content': 'Text under H3.', 'metadata': {'Header 3': 'H3', 'Header 2': 'H2', 'Header 1': 'H1'}}
{'content': 'Text under H2_2.', 'metadata': {'Header 2': 'H2_2', 'Header 1': 'H1_2'}}

`Test case 4`

In [8]:
splits = [
    ("#", "Header 1"),
    ("##", "Header 2"),
    ("\n", None)
]
markdown_document = '# Heading 1\n\n## Heading 2\n\nParagraph under Heading 2.\
\n\nParagraph 2 under Heading 2.\n\n# New Heading 1\n\nParagraph under new Heading 1.'
markdown_splitter = MarkdownHeaderTextSplitter(splits=splits)
chunked_docs = markdown_splitter.split_text(markdown_document)
for chunk in chunked_docs:
    print(chunk)

{'content': 'Paragraph under Heading 2.', 'metadata': {'Header 2': 'Heading 2', 'Header 1': 'Heading 1'}}
{'content': 'Paragraph 2 under Heading 2.', 'metadata': {'Header 2': 'Heading 2', 'Header 1': 'Heading 1'}}
{'content': 'Paragraph under new Heading 1.', 'metadata': {'Header 2': 'Heading 2', 'Header 1': 'New Heading 1'}}


In [None]:
{'content': 'Paragraph under Heading 2.', 'metadata': {'Header 2': 'Heading 2', 'Header 1': 'Heading 1'}}
{'content': 'Paragraph 2 under Heading 2.', 'metadata': {'Header 2': 'Heading 2', 'Header 1': 'Heading 1'}}
{'content': 'Paragraph under new Heading 1.', 'metadata': {'Header 1': 'New Heading 1'}}

In [8]:
chunked_docs

[{'content': 'Hi this is Jim',
  'metadata': {'Header 2': 'Bar', 'Header 1': 'Foo'}},
 {'content': 'Hi this is Joe',
  'metadata': {'Header 2': 'Bar', 'Header 1': 'Foo'}},
 {'content': 'Hi this is Molly',
  'metadata': {'Header 2': 'Baz', 'Header 1': 'Foo'}}]

In [6]:
# Test case 2
splits = [
    ("#", "Header 1"),
    ("##", "Header 2"),
]

markdown_splitter = MarkdownHeaderTextSplitter(splits=splits)
chunked_docs = markdown_splitter.split_text(markdown_document)
for chunk in chunked_docs:
    print(chunk)

{'content': 'Hi this is JimHi this is Joe', 'metadata': {'Header 2': 'Bar', 'Header 1': 'Foo'}}
{'content': 'Hi this is Molly', 'metadata': {'Header 2': 'Baz', 'Header 1': 'Foo'}}


In [7]:
# Three levels
markdown_document = '# Foo\n\n    ## Bar\n\nHi this is Jim\n\nHi this is Joe\n\n ### Boo \n\n Hi this is Lance \n\n ## Baz\n\n Hi this is Molly'

# Test case 3
splits = [
    ("#", "Header 1"),
    ("##", "Header 2"),
    ("###", "Header 3"),
    ("\n", None)
]

# TODO: Reset header 3 Boo as empty
markdown_splitter = MarkdownHeaderTextSplitter(splits=splits)
chunked_docs = markdown_splitter.split_text(markdown_document)
for chunk in chunked_docs:
    print(chunk)

{'content': 'Hi this is Jim', 'metadata': {'Header 3': '', 'Header 2': 'Bar', 'Header 1': 'Foo'}}
{'content': 'Hi this is Joe', 'metadata': {'Header 3': '', 'Header 2': 'Bar', 'Header 1': 'Foo'}}
{'content': 'Hi this is Lance', 'metadata': {'Header 3': 'Boo', 'Header 2': 'Bar', 'Header 1': 'Foo'}}
{'content': 'Hi this is Molly', 'metadata': {'Header 3': 'Boo', 'Header 2': 'Baz', 'Header 1': 'Foo'}}


In [20]:
import re
from langchain.text_splitter import TextSplitter
from typing import Dict, Any, List, Optional, Tuple, Union

class MarkdownHeaderTextSplitter(TextSplitter):
    """Implementation of splitting markdown based on user-supplied delimiters.
    The text associated with each delimiter is returned w/ delimier as metadata."""

    def __init__(self, splits: List[Tuple[str, Optional[str]]], **kwargs: Any):
        super().__init__(**kwargs)
        self.splits = sorted(splits, key=lambda x: (-len(x[0]), -x[0].count("#")))

    def split_text(self, text: str) -> List[Dict[str, Union[str, str]]]:
        pattern = "|".join(
            "(%s\\s*(.*))" % re.escape(sep) for sep, _ in self.splits if sep != "\n"
        )

        chunks = []
        current_metadata = {name: "" for sep, name in self.splits if name}
        current_content = []

        for line in text.splitlines(keepends=True):
            stripped_line = line.lstrip()  # strip leading spaces
            match = re.match(pattern, stripped_line)

            if match:
                if current_content:
                    chunks.append(
                        {
                            "content": "".join(current_content).strip(),
                            "metadata": dict(current_metadata),
                        }
                    )
                    current_content = []

                for sep, name in self.splits:
                    if stripped_line.startswith(sep):
                        if name is not None:
                            # Update the rest of the line (after the separator) as the value for that header
                            current_metadata[name] = stripped_line[len(sep):].strip()
                            
                            # Clear out metadata for all headers lower in hierarchy
                            current_header_index = [index for index, (sep_, _) in enumerate(self.splits) if sep_ == sep][0]
                            for header, index in [(name_, index) for index, (sep_, name_) in enumerate(self.splits) if name_]:
                                if index > current_header_index:
                                    current_metadata[header] = ''

                        # Remove matched separator from the line to avoid confusing next header levels
                        stripped_line = stripped_line[len(sep):].lstrip()
                        break

            elif not stripped_line and ("\n", None) in self.splits:
                if current_content:
                    chunks.append(
                        {
                            "content": "".join(current_content).strip(),
                            "metadata": dict(current_metadata),
                        }
                    )
                    current_content = []

            elif stripped_line:
                current_content.append(line)  # append the original line (with leading spaces if any)

        if current_content:
            chunks.append(
                {
                    "content": "".join(current_content).strip(),
                    "metadata": dict(current_metadata),
                }
            )
        return [chunk for chunk in chunks if chunk["content"]]

In [21]:
# Doc
markdown_document = '# Foo\n\n    ## Bar\n\nHi this is Jim\n\nHi this is Joe\n\n ## Baz\n\n Hi this is Molly' 
    
# Test case 1
splits = [
    ("#", "Header 1"),
    ("##", "Header 2"),
    ("\n", None)
]
 
markdown_splitter = MarkdownHeaderTextSplitter(splits=splits)
chunked_docs = markdown_splitter.split_text(markdown_document)
for chunk in chunked_docs:
    print(chunk)

{'content': 'Hi this is Jim', 'metadata': {'Header 2': 'Bar', 'Header 1': ''}}
{'content': 'Hi this is Joe', 'metadata': {'Header 2': 'Bar', 'Header 1': ''}}
{'content': 'Hi this is Molly', 'metadata': {'Header 2': 'Baz', 'Header 1': ''}}


`Modified Code`

In [36]:
class MarkdownHeaderTextSplitter(TextSplitter):
    
    def __init__(self, splits: List[Tuple[str, Optional[str]]], **kwargs: Any):
        super().__init__(**kwargs)
        self.splits = sorted(splits, key=lambda x: (-len(x[0]), -x[0].count('#')))

    def split_text(self, text: str) -> List[Dict[str, Union[str, str]]]:
        pattern = '|'.join('(%s\\s*(.*))' % re.escape(sep) for sep, _ in self.splits if sep != "\n")
        chunks = []
        current_metadata = {name: '' for sep, name in self.splits if name}
        current_content = []
        
        for line in text.splitlines(keepends=True):
            stripped_line = line.strip()
            match = re.match(pattern, stripped_line)

            if match:
                if current_content:
                    chunks.append({
                        'content': ''.join(current_content).strip(),
                        'metadata': dict(current_metadata)
                    })
                    current_content = []

                for sep, name in self.splits:
                    if stripped_line.startswith(sep):
                        if name is not None:
                            current_metadata[name] = stripped_line.lstrip(sep).strip()
                            current_header_index = [index for index, (sep_, _) in enumerate(self.splits) if sep_ == sep][0]
                            for header_index, (sep_, name) in enumerate(self.splits):
                                if header_index > current_header_index:
                                    if name:
                                        current_metadata[name] = ''
                        break
            elif not stripped_line and ("\n", None) in self.splits:
                if current_content:
                    chunks.append({
                        'content': ''.join(current_content).strip(),
                        'metadata': dict(current_metadata)
                    })
                    current_content = []
            elif stripped_line:
                current_content.append(line)
        
        if current_content:
            chunks.append({
                'content': ''.join(current_content).strip(),
                'metadata': dict(current_metadata)
            })
        return [chunk for chunk in chunks if chunk['content']]


markdown_document = '# Foo\n\n    ## Bar\n\nHi this is Jim\n\nHi this is Joe\n\n ### Boo \n\n Hi this is Lance \n\n ## Baz\n\n Hi this is Molly' 
    
splits = [
    ("#", "Header 1"),
    ("##", "Header 2"),
    ("###", "Header 3"),
    ("\n", None)
]
 
markdown_splitter = MarkdownHeaderTextSplitter(splits=splits)
chunked_docs = markdown_splitter.split_text(markdown_document)
for chunk in chunked_docs:
    print(chunk)

{'content': 'Hi this is Jim', 'metadata': {'Header 3': '', 'Header 2': 'Bar', 'Header 1': ''}}
{'content': 'Hi this is Joe', 'metadata': {'Header 3': '', 'Header 2': 'Bar', 'Header 1': ''}}
{'content': 'Hi this is Lance', 'metadata': {'Header 3': 'Boo', 'Header 2': '', 'Header 1': ''}}
{'content': 'Hi this is Molly', 'metadata': {'Header 3': 'Boo', 'Header 2': 'Baz', 'Header 1': ''}}


In [39]:
class MarkdownHeaderTextSplitter(TextSplitter):
    
    def __init__(self, splits: List[Tuple[str, Optional[str]]], **kwargs: Any):
        super().__init__(**kwargs)
        self.splits = sorted(splits, key=lambda x: (-len(x[0]), -x[0].count('#')))

    def split_text(self, text: str) -> List[Dict[str, Union[str, str]]]:
        pattern = '|'.join('(%s\\s*(.*))' % re.escape(sep) for sep, _ in self.splits if sep != "\n")
        chunks = []
        current_metadata = {name: '' for sep, name in self.splits if name}
        current_content = []

        def remove_prefix(text, prefix):
            if text.startswith(prefix):
                return text[len(prefix):]
            return text
        
        for line in text.splitlines(keepends=True):
            stripped_line = line.strip()
            match = re.match(pattern, stripped_line)

            if match:
                if current_content:
                    chunks.append({
                        'content': ''.join(current_content).strip(),
                        'metadata': dict(current_metadata)
                    })
                    current_content = []

                for sep, name in self.splits:
                    if stripped_line.startswith(sep):
                        if name is not None:
                            current_metadata[name] = remove_prefix(stripped_line, sep).strip()
                            current_header_index = [index for index, (sep_, _) in enumerate(self.splits) if sep_ == sep][0]
                            for header_index, (sep_, name) in enumerate(self.splits):
                                if header_index > current_header_index:
                                    if name:
                                        current_metadata[name] = ''
                        break
            elif not stripped_line and ("\n", None) in self.splits:
                if current_content:
                    chunks.append({
                        'content': ''.join(current_content).strip(),
                        'metadata': dict(current_metadata)
                    })
                    current_content = []
            elif stripped_line:
                current_content.append(line)
        
        if current_content:
            chunks.append({
                'content': ''.join(current_content).strip(),
                'metadata': dict(current_metadata)
            })
        return [chunk for chunk in chunks if chunk['content']]


markdown_document = '# Foo\n\n    ## Bar\n\nHi this is Jim\n\nHi this is Joe\n\n ### Boo \n\n Hi this is Lance \n\n ## Baz\n\n Hi this is Molly' 
    
splits = [
    ("#", "Header 1"),
    ("##", "Header 2"),
    ("###", "Header 3"),
    ("\n", None)
]
 
markdown_splitter = MarkdownHeaderTextSplitter(splits=splits)
chunked_docs = markdown_splitter.split_text(markdown_document)
for chunk in chunked_docs:
    print(chunk)

{'content': 'Hi this is Jim', 'metadata': {'Header 3': '', 'Header 2': 'Bar', 'Header 1': ''}}
{'content': 'Hi this is Joe', 'metadata': {'Header 3': '', 'Header 2': 'Bar', 'Header 1': ''}}
{'content': 'Hi this is Lance', 'metadata': {'Header 3': 'Boo', 'Header 2': '', 'Header 1': ''}}
{'content': 'Hi this is Molly', 'metadata': {'Header 3': 'Boo', 'Header 2': 'Baz', 'Header 1': ''}}


In [33]:
# Three levels
markdown_document = '# Foo\n\n    ## Bar\n\nHi this is Jim\n\nHi this is Joe\n\n ### Boo \n\n Hi this is Lance \n\n ## Baz\n\n Hi this is Molly'

# Test case 3
splits = [
    ("#", "Header 1"),
    ("##", "Header 2"),
    ("###", "Header 3"),
    ("\n", None)
]

# TODO: Reset header 3 Boo as empty
markdown_splitter = MarkdownHeaderTextSplitter(splits=splits)
chunked_docs = markdown_splitter.split_text(markdown_document)
for chunk in chunked_docs:
    print(chunk)

{'content': 'Hi this is Jim', 'metadata': {'Header 3': '', 'Header 2': 'Bar', 'Header 1': 'Foo'}}
{'content': 'Hi this is Joe', 'metadata': {'Header 3': '', 'Header 2': 'Bar', 'Header 1': 'Foo'}}
{'content': 'Hi this is Lance', 'metadata': {'Header 3': 'Boo', 'Header 2': 'Bar', 'Header 1': 'Foo'}}
{'content': 'Hi this is Molly', 'metadata': {'Header 3': 'Boo', 'Header 2': 'Baz', 'Header 1': 'Foo'}}


`Original Code`

In [26]:
class MarkdownHeaderTextSplitter(TextSplitter):
    
    def __init__(self, splits: List[Tuple[str, Optional[str]]], **kwargs: Any):
        
        # Sort by seperator length and the number of "#"
        # E.g.,  Ensure "###" will come before "##" and "#"
        # TODO: Both means of sorting may not be required 
        """Create a new TextSplitter."""
        super().__init__(**kwargs)
        self.splits = sorted(splits, key=lambda x: (-len(x[0]), -x[0].count('#')))

    def split_text(self, text: str) -> List[Dict[str, Union[str, str]]]:
        
        # Regex matches any of the separators in self.splits (except for the newline)
        # (Newlines create line breaks, which we may want to split, or separate MD headers and their contents)
        pattern = '|'.join('(%s\\s*(.*))' % re.escape(sep) for sep, _ in self.splits if sep != "\n")
        
        # Text chunk with metadata
        chunks = []
        
        # Keys are names of metadata 
        #current_metadata = {name: '' for sep, name in self.splits if name is not None}
        current_metadata = {name: '' for sep, name in self.splits if name}
        current_content = []
        
        # Split by newlines, but preserve the exact formatting of the original text
        for line in text.splitlines(keepends=True):
            
            # Removes any leading or trailing whitespace
            stripped_line = line.strip()  
            
            # Match current line of text against the regular expression pattern
            match = re.match(pattern, stripped_line)
            
            # If the line starts w/ a separator defined in splits (like "#" or "##"), it will be a match
            # Start of a new chunk of content
            if match:
                
                # See if we have accumulated content from previous lines 
                if current_content:  
                    
                    # If so, append it as a chunk and write the chunk since we hit a new seperator
                    chunks.append({
                        'content': ''.join(current_content).strip(), 
                        'metadata': dict(current_metadata)
                    })
                    # Reset the content  
                    current_content = []  
                    
                # Check for the seperator
                for sep, name in self.splits:
                    if stripped_line.startswith(sep):
                        if name is not None:
                            # Update the rest of the line (after the separator) as the value for that header
                            current_metadata[name] = stripped_line[len(sep):].strip()
                            
                            # If the separator is "#", it also clears out the metadata for "Header 2"
                            # A new "Header 1" implies a new section of the document
                            # TODO: This is brittle since there can be many header names
                            #if sep == "#":
                            #    current_metadata["Header 2"] = ''   
                            if sep == "#":
                                # clear out metadata for all headers lower in hierarchy
                                current_header_index = [index for index, (sep_, _) in enumerate(self.splits) if sep_ == sep][0]
                                current_metadata_keys = list(current_metadata.keys())
                                for header in current_metadata_keys:
                                    header_index = [index for index, (_, name) in enumerate(self.splits) if name == header]
                                    if header_index and header_index[0] > current_header_index:
                                        del current_metadata[header]
                        break
            
            # If the line is empty (i.e., only contains whitespace, or is completely empty) 
            # and newline ("\n") is one of the separators, it appends the current content to 
            # the chunks list as a new chunk, and resets the current_content to an empty list  
            elif not stripped_line and ("\n", None) in self.splits:  
                
                # If we have accumulated content, append it as a chunk
                if current_content:  
                    chunks.append({
                        'content': ''.join(current_content).strip(), 
                        'metadata': dict(current_metadata)
                    })
                    current_content = []  # reset the content
            
            # Apend non-empty lines
            elif stripped_line:  
                current_content.append(stripped_line)
        
        # Append the last chunk
        if current_content:
            chunks.append({
                'content': ''.join(current_content).strip(), 
                'metadata': dict(current_metadata)
            })
        return [chunk for chunk in chunks if chunk['content']]

# Doc
markdown_document = '# Foo\n\n    ## Bar\n\nHi this is Jim\n\nHi this is Joe\n\n ## Baz\n\n Hi this is Molly' 
    
# Test case 1
splits = [
    ("#", "Header 1"),
    ("##", "Header 2"),
    ("\n", None)
]
 
markdown_splitter = MarkdownHeaderTextSplitter(splits=splits)
chunked_docs = markdown_splitter.split_text(markdown_document)
for chunk in chunked_docs:
    print(chunk)

{'content': 'Hi this is Jim', 'metadata': {'Header 2': 'Bar', 'Header 1': 'Foo'}}
{'content': 'Hi this is Joe', 'metadata': {'Header 2': 'Bar', 'Header 1': 'Foo'}}
{'content': 'Hi this is Molly', 'metadata': {'Header 2': 'Baz', 'Header 1': 'Foo'}}


In [27]:
# Three levels
markdown_document = '# Foo\n\n    ## Bar\n\nHi this is Jim\n\nHi this is Joe\n\n ### Boo \n\n Hi this is Lance \n\n ## Baz\n\n Hi this is Molly'

# Test case 3
splits = [
    ("#", "Header 1"),
    ("##", "Header 2"),
    ("###", "Header 3"),
    ("\n", None)
]

# TODO: Reset header 3 Boo as empty
markdown_splitter = MarkdownHeaderTextSplitter(splits=splits)
chunked_docs = markdown_splitter.split_text(markdown_document)
for chunk in chunked_docs:
    print(chunk)

{'content': 'Hi this is Jim', 'metadata': {'Header 3': '', 'Header 2': 'Bar', 'Header 1': 'Foo'}}
{'content': 'Hi this is Joe', 'metadata': {'Header 3': '', 'Header 2': 'Bar', 'Header 1': 'Foo'}}
{'content': 'Hi this is Lance', 'metadata': {'Header 3': 'Boo', 'Header 2': 'Bar', 'Header 1': 'Foo'}}
{'content': 'Hi this is Molly', 'metadata': {'Header 3': 'Boo', 'Header 2': 'Baz', 'Header 1': 'Foo'}}
