[Chunking](https://community.databricks.com/t5/technical-blog/the-ultimate-guide-to-chunking-strategies-for-rag-applications/ba-p/113089)
- Fixed-Size Chunking (word, char or token counts (with overlaps))
- Semantic Chunking (break at paragraphs or sentences)
- Recursive Chunking
- Adaptive Chunking
- Context-Enriched Chunking
- AI-Driven Dynamic Chunking

In [21]:
%pip install -qU langchain-text-splitters

Note: you may need to restart the kernel to use updated packages.


In [22]:
document = None
with open("datasets/dsm.md") as f:
    document = f.read()

## Fixed-Size Chunking

This is the simplest method. This splits based on a given character sequence, which defaults to "\n\n". Chunk length is measured by number of characters.

1. How the text is split: by single character separator.
2. How the chunk size is measured: by number of characters.

To obtain the string content directly, use .split_text.
To create LangChain Document objects (e.g., for use in downstream tasks), use .create_documents.

https://python.langchain.com/docs/how_to/character_text_splitter/

In [23]:
from langchain_text_splitters import CharacterTextSplitter
text_splitter = CharacterTextSplitter(
    # separator="\n\n",
    chunk_size=1000,
    chunk_overlap=200,
    length_function=len,
    is_separator_regex=False,
)
texts = text_splitter.create_documents([document])
print(len(texts))

Created a chunk of size 1424, which is longer than the specified 1000
Created a chunk of size 1016, which is longer than the specified 1000
Created a chunk of size 1095, which is longer than the specified 1000
Created a chunk of size 1065, which is longer than the specified 1000
Created a chunk of size 1099, which is longer than the specified 1000
Created a chunk of size 1195, which is longer than the specified 1000
Created a chunk of size 1119, which is longer than the specified 1000
Created a chunk of size 1608, which is longer than the specified 1000
Created a chunk of size 1166, which is longer than the specified 1000
Created a chunk of size 1327, which is longer than the specified 1000
Created a chunk of size 1012, which is longer than the specified 1000
Created a chunk of size 1163, which is longer than the specified 1000
Created a chunk of size 1106, which is longer than the specified 1000
Created a chunk of size 1001, which is longer than the specified 1000
Created a chunk of s

4679


## Semantic Chunking

This text splitter is the recommended one for generic text. It is parameterized by a list of characters. It tries to split on them in order until the chunks are small enough. The default list is ["\n\n", "\n", " ", ""]. This has the effect of trying to keep all paragraphs (and then sentences, and then words) together as long as possible, as those would generically seem to be the strongest semantically related pieces of text.

1. How the text is split: by list of characters.
2. How the chunk size is measured: by number of characters.

Below we show example usage.

To obtain the string content directly, use .split_text.

To create LangChain Document objects (e.g., for use in downstream tasks), use .create_documents.

https://python.langchain.com/docs/how_to/recursive_text_splitter/

In [26]:
from langchain_text_splitters import RecursiveCharacterTextSplitter

text_splitter = RecursiveCharacterTextSplitter(
    # separators=["\n\n", "\n", ". ", " ", ""],
    chunk_size=1000,
    chunk_overlap=20,
    length_function=len,
    is_separator_regex=False,
)

texts = text_splitter.create_documents([document])
print(len(texts))

print(texts[0])

# print(texts[1])

4829
page_content='i

# DIAGNOSTIC AND STATISTICAL MANUAL OF MENTAL DISORDERS

### FIFTH EDITION TEXT REVISION

# DSM-5-TR™


ii

### **American Psychiatric Association** **DSM-5-TR**


_Officers 2021–2022_


P RESIDENT

V IVIAN B. P ENDER, M.D.


P RESIDENT -E LECT

R EBECCA W. B RENDEL, M.D., J.D.

T REASURER R ICHARD F. S UMMERS, M.D.

S ECRETARY S ANDRA D E J ONG, M.D., M.S C .


_Assembly_


S PEAKER

M ARY J O F ITZ -G ERALD, M.D., M.B.A.


S PEAKER -E LECT

A DAM P. N ELSON, M.D.


_Board of Trustees_

E LIE G. A OUN, M.D., M.R.O.

J ENNY L. B OYER, M.D., P H .D., J.D.


K ENNETH C ERTA, M.D.

C. F REEMAN, M.D., M.B.A.

M ARY H ASBAH R OESSEL, M.D.


G LENN A. M ARTIN, M.D.


E RIC M. P LAKUN, M.D.


M ICHELE R EID, M.D.

F ELIX T ORRES, M.D., M.B.A.


S ANYA V IRANI, M.D., M.P.H.


C HERYL D. W ILLS, M.D.


M ELINDA Y OUNG, M.D.


U ROOJ Y AZDANI, M.D.,

R ESIDENT -F ELLOW M EMBER T RUSTEE -E LECT

### **DSM-5**


_Officers 2012–2013_


P RESIDENT

D ILIP V. J ESTE, M.D.


P RE

## Evaluation

In [25]:
# import time
# import pandas as pd
# import matplotlib.pyplot as plt
# import re
# from collections import Counter


# def calculate_keyword_coverage(chunks, keywords):
#     """
#     Calculate what percentage of keywords appear in at least one chunk.

#     Args:
#         chunks (list): List of text chunks
#         keywords (list): List of keywords to search for

#     Returns:
#         float: Percentage of keywords covered (0-1)
#     """
#     # Convert chunks to lowercase for case-insensitive matching
#     lowercase_chunks = [chunk.lower() for chunk in chunks]
#     lowercase_keywords = [keyword.lower() for keyword in keywords]

#     # Count how many keywords appear in at least one chunk
#     keywords_found = 0
#     for keyword in lowercase_keywords:
#         if any(keyword in chunk for chunk in lowercase_chunks):
#             keywords_found += 1

#     # Calculate coverage
#     coverage = keywords_found / max(1, len(keywords))
#     return coverage

# def calculate_chunk_coherence(chunks):
#     """
#     Calculate the average coherence of chunks based on sentence completeness.

#     Args:
#         chunks (list): List of text chunks

#     Returns:
#         float: Coherence score (0-1)
#     """
#     # Count incomplete sentences at chunk boundaries
#     incomplete_boundaries = 0

#     for chunk in chunks:
#         # Check if chunk starts with lowercase letter or continuation punctuation
#         if chunk and (chunk[0].islower() or chunk[0] in ',;:)]}'):
#             incomplete_boundaries += 1

#         # Check if chunk ends without proper sentence-ending punctuation
#         if chunk and not re.search(r'[.!?]\s*$', chunk):
#             incomplete_boundaries += 1

#     # Calculate coherence (lower incomplete_boundaries = higher coherence)
#     max_boundaries = len(chunks) * 2  # Start and end of each chunk
#     coherence = 1 - (incomplete_boundaries / max(1, max_boundaries))
#     return coherence

# def calculate_concept_splitting(chunks, key_phrases):
#     """
#     Calculate how often key phrases are split across chunks.

#     Args:
#         chunks (list): List of text chunks
#         key_phrases (list): List of important phrases that should stay together

#     Returns:
#         float: Non-splitting score (0-1), higher is better
#     """
#     # Count how many key phrases are split
#     split_phrases = 0

#     for phrase in key_phrases:
#         phrase_lower = phrase.lower()

#         # Check if phrase appears completely in any chunk
#         complete_in_chunk = any(phrase_lower in chunk.lower() for chunk in chunks)

#         # Check if parts of the phrase appear in different chunks
#         words = phrase_lower.split()
#         if len(words) > 1:
#             parts_in_different_chunks = False

#             for i in range(len(words) - 1):
#                 part1 = " ".join(words[:i+1])
#                 part2 = " ".join(words[i+1:])

#                 for j, chunk1 in enumerate(chunks):
#                     if part1 in chunk1.lower():
#                         for chunk2 in chunks[j+1:]:
#                             if part2 in chunk2.lower() and part1 not in chunk2.lower():
#                                 parts_in_different_chunks = True
#                                 break

#             if parts_in_different_chunks and not complete_in_chunk:
#                 split_phrases += 1

#     # Calculate non-splitting score
#     non_splitting = 1 - (split_phrases / max(1, len(key_phrases)))
#     return non_splitting

# def evaluate_chunking_strategies(document, keywords, key_phrases, chunking_strategies):
#     """
#     Evaluates chunking strategies with custom metrics.

#     Args:
#         document (str): Document to chunk
#         keywords (list): Important keywords for coverage metric
#         key_phrases (list): Important phrases for concept splitting metric
#         chunking_strategies (dict): Dictionary of chunking strategies with parameters

#     Returns:
#         pd.DataFrame: Results of the evaluation
#     """
#     results = []

#     for name, strategy in chunking_strategies.items():
#         print(f"Evaluating strategy: {name}")
#         start_time = time.time()

#         # Perform chunking based on strategy type
#         if strategy["type"] == "fixed":
#             chunks = perform_fixed_size_chunking(
#                 document,
#                 chunk_size=strategy.get("size", 1000),
#                 chunk_overlap=strategy.get("overlap", 0)
#             )
#         elif strategy["type"] == "semantic":
#             chunks = perform_semantic_chunking(
#                 document,
#                 chunk_size=strategy.get("size", 500),
#                 chunk_overlap=strategy.get("overlap", 100)
#             )
#         elif strategy["type"] == "recursive":
#             chunks = perform_code_chunking(
#                 document,
#                 language=strategy.get("language", "python"),
#                 chunk_size=strategy.get("size", 100),
#                 chunk_overlap=strategy.get("overlap", 15)
#             )
#         elif strategy["type"] == "adaptive":
#             chunks = perform_adaptive_chunking(
#                 document,
#                 min_size=strategy.get("min_size", 300),
#                 max_size=strategy.get("max_size", 1000),
#                 complexity_measure=strategy.get("complexity_measure", "combined")
#             )
#         elif strategy["type"] == "context_enriched":
#             chunks = perform_context_enriched_chunking(
#                 document,
#                 chunk_size=strategy.get("size", 500),
#                 chunk_overlap=strategy.get("overlap", 50),
#                 window_size=strategy.get("window_size", 1)
#             )
#         elif strategy["type"] == "ai_driven":
#             chunks = perform_ai_driven_chunking(
#                 document,
#                 max_chunks=strategy.get("max_chunks", 10)
#             )
#         else:
#             raise ValueError(f"Unknown chunking strategy type: {strategy['type']}")

#         # Record processing time
#         processing_time = time.time() - start_time

#         # Convert to text for evaluation if they're Document objects
#         chunk_texts = []
#         for chunk in chunks:
#             if hasattr(chunk, 'page_content'):
#                 chunk_texts.append(chunk.page_content)
#             else:
#                 chunk_texts.append(chunk)

#         # Calculate custom metrics
#         keyword_coverage = calculate_keyword_coverage(chunk_texts, keywords)
#         chunk_coherence = calculate_chunk_coherence(chunk_texts)
#         concept_integrity = calculate_concept_splitting(chunk_texts, key_phrases)

#         # Calculate chunk statistics
#         total_chunks = len(chunks)

#         # Get chunk sizes
#         if hasattr(chunks[0], 'page_content'):
#             chunk_sizes = [len(chunk.page_content) for chunk in chunks]
#         else:
#             chunk_sizes = [len(chunk) for chunk in chunks]

#         avg_chunk_size = sum(chunk_sizes) / len(chunk_sizes)
#         chunk_size_std = (sum((size - avg_chunk_size) ** 2 for size in chunk_sizes) / len(chunk_sizes)) ** 0.5
#         size_consistency = 1 - (chunk_size_std / max(1, avg_chunk_size))

#         # Store results
#         results.append({
#             "strategy": name,
#             "processing_time": round(processing_time, 2),
#             "keyword_coverage": round(keyword_coverage, 2),
#             "chunk_coherence": round(chunk_coherence, 2),
#             "concept_integrity": round(concept_integrity, 2),
#             "size_consistency": round(size_consistency, 2),
#             "total_chunks": total_chunks,
#             "avg_chunk_size": round(avg_chunk_size, 2)
#         })

#     # Convert to DataFrame
#     results_df = pd.DataFrame(results)
#     return results_df

# def visualize_results(results_df):
#     """
#     Creates visualizations of the evaluation results.

#     Args:
#         results_df (pd.DataFrame): Evaluation results
#     """
#     # Set up the figure
#     fig, axs = plt.subplots(2, 3, figsize=(18, 12))

#     # Plot processing time
#     axs[0, 0].bar(results_df['strategy'], results_df['processing_time'])
#     axs[0, 0].set_title('Processing Time (seconds)')
#     axs[0, 0].set_ylabel('Time (s)')
#     axs[0, 0].set_xticklabels(results_df['strategy'], rotation=45, ha='right')

#     # Plot quality metrics
#     axs[0, 1].bar(results_df['strategy'], results_df['keyword_coverage'])
#     axs[0, 1].set_title('Keyword Coverage')
#     axs[0, 1].set_ylabel('Score (0-1)')
#     axs[0, 1].set_xticklabels(results_df['strategy'], rotation=45, ha='right')

#     # Plot concept integrity
#     axs[0, 2].bar(results_df['strategy'], results_df['concept_integrity'])
#     axs[0, 2].set_title('Concept Integrity')
#     axs[0, 2].set_ylabel('Score (0-1)')
#     axs[0, 2].set_xticklabels(results_df['strategy'], rotation=45, ha='right')

#     # Plot chunk coherence
#     axs[1, 0].bar(results_df['strategy'], results_df['chunk_coherence'])
#     axs[1, 0].set_title('Chunk Coherence')
#     axs[1, 0].set_ylabel('Score (0-1)')
#     axs[1, 0].set_xticklabels(results_df['strategy'], rotation=45, ha='right')

#     # Plot total chunks
#     axs[1, 1].bar(results_df['strategy'], results_df['total_chunks'])
#     axs[1, 1].set_title('Total Number of Chunks')
#     axs[1, 1].set_ylabel('Count')
#     axs[1, 1].set_xticklabels(results_df['strategy'], rotation=45, ha='right')

#     # Plot size consistency
#     axs[1, 2].bar(results_df['strategy'], results_df['size_consistency'])
#     axs[1, 2].set_title('Chunk Size Consistency')
#     axs[1, 2].set_ylabel('Score (0-1)')
#     axs[1, 2].set_xticklabels(results_df['strategy'], rotation=45, ha='right')

#     plt.tight_layout()
#     plt.show()

# # Example usage
# if __name__ == "__main__":
#     # Create test document
#     document = create_dummy_document()

#     # Define important keywords for evaluation
#     keywords = [
#         "machine learning", "supervised learning", "unsupervised learning",
#         "neural networks", "LLMs", "fine-tuning", "pre-training",
#         "reinforcement learning", "multimodal learning", "federated learning",
#         "clustering", "classification", "regression", "PCA"
#     ]

#     # Define key phrases that should remain together
#     key_phrases = [
#         "Large Language Models",
#         "Reinforcement Learning from Human Feedback",
#         "Principal Component Analysis",
#         "Support Vector Machines",
#         "decision becomes more difficult",
#         "train-test split",
#         "natural language processing"
#     ]

#     # Define chunking strategies to evaluate
#     chunking_strategies = {
#         "fixed_500": {
#             "type": "fixed",
#             "size": 500,
#             "overlap": 0
#         },
#         "fixed_500_overlap_100": {
#             "type": "fixed",
#             "size": 500,
#             "overlap": 100
#         },
#         "semantic_500": {
#             "type": "semantic",
#             "size": 500,
#             "overlap": 100
#         },
#         "adaptive_300_1000": {
#             "type": "adaptive",
#             "min_size": 300,
#             "max_size": 1000,
#             "complexity_measure": "combined"
#         },
#         "context_enriched_500": {
#             "type": "context_enriched",
#             "size": 500,
#             "overlap": 50,
#             "window_size": 1
#         },
#         "ai_driven_10": {
#             "type": "ai_driven",
#             "max_chunks": 10
#         }
#     }

#     # Run evaluation
#     results_df = evaluate_chunking_strategies(document, keywords, key_phrases, chunking_strategies)

#     # Print results
#     print("\n----- EVALUATION RESULTS -----")
#     print(results_df)

#     # Create visualizations
#     try:
#         visualize_results(results_df)
#     except Exception as e:
#         print(f"Visualization error: {e}")

#     # Export results to CSV
#     results_df.to_csv("chunking_evaluation_results.csv", index=False)
#     print("\nResults exported to 'chunking_evaluation_results.csv'")