# RAG chunking strategy


In [27]:
import time
import requests
from pathlib import Path
import pymupdf
from tqdm.auto import tqdm

In [28]:
try:  # inside a script
	BASE_DIR = Path(__file__).resolve().parent.parent
except NameError:  # inside a notebook
	BASE_DIR = Path.cwd().parent

In [29]:
print(f"Project root set to: {BASE_DIR}")

Project root set to: /Users/tejaspancholi/Developer/python/vizuara


In [30]:
pdf_path = BASE_DIR / "data" / "human_nutrition_text.pdf"

In [31]:
def download_pdf_requests(
	url: str, dest: Path, timeout: int = 30, max_retries: int = 3
) -> None:
	"""Download a PDF file from URL with progress tracking and error handling."""
	dest.parent.mkdir(parents=True, exist_ok=True)
	for attempt in range(max_retries):
		try:
			response = requests.get(url, stream=True, timeout=timeout)
			response.raise_for_status()

			content_type = response.headers.get("content-type", "").lower()
			if "pdf" not in content_type:
				raise ValueError(f"Invalid content type: {content_type}")
			total = int(response.headers.get("content-length", 0))
			with tqdm(
				total=total, unit="iB", unit_scale=True, desc="Downloading PDF"
			) as t:
				with dest.open("wb") as f:
					for chunk in response.iter_content(chunk_size=8192):
						if chunk:
							f.write(chunk)
							t.update(len(chunk))
			print(f"\nSuccessfully downloaded PDF to {dest}")
			return
		except requests.exceptions.RequestException as e:
			print(f"Download failed: {e}")
			if attempt == max_retries - 1:
				raise
			time.sleep(2**attempt)

In [32]:
if not pdf_path.is_file():
	download_pdf_requests(
		"https://pressbooks.oer.hawaii.edu/humannutrition2/open/download?type=pdf",
		pdf_path,
	)

In [33]:
def text_formatter(text: str) -> str:
	"""Performs minor text formatting."""
	import re

	cleaned_text = re.sub(
		r"\s+", " ", text
	)  # Replace multiple whitespace with single space
	cleaned_text = cleaned_text.strip()
	return cleaned_text

In [34]:
import re
from typing import Dict, List, Union
from pathlib import Path


def open_and_read_pdf(file_path: Union[str, Path]) -> Union[List[Dict], None]:
	"""
	Opens a pdf file and reads its content page by page, and collects statistics.
	Parameters:
	    file_path (str | Path): The path to the pdf file to be opened and read.
	Returns:
	    list[dict]: A list of dictionaries containing the page number, character count, word count, sentence count, token count, and extracted text for each page.
	"""
	if not Path(file_path).exists():
		raise FileNotFoundError(f"PDF file not found: {file_path}")
	try:
		doc = pymupdf.open(file_path)
		pages_and_texts = []
		for page_number, page in tqdm(enumerate(doc)):
			text = page.get_text()
			if not text or not text.strip():  # Skip empty pages
				continue
			if text and text.strip():
				text = text_formatter(text)
				sentences = re.split(r"[.!?]+", text)  # Simple sentence splitter
				sentence_count = len(
					[s for s in sentences if s.strip()]
				)  # Count non-empty sentences
				pages_and_texts.append(
					{
						"page_number": page_number - 41,
						"page_char_count": len(text),
						"page_word_count": len(text.split()),
						"page_sentence_count_raw": sentence_count,
						"page_token_count": int(len(text) / 4),
						"text": text,
					}
				)
		return pages_and_texts
	except Exception as e:
		print(f"Error reading PDF file: {e}")
		return None

In [35]:
pages_and_texts = open_and_read_pdf(file_path=pdf_path)
if pages_and_texts:
	print(pages_and_texts[:2])

1208it [00:00, 1253.23it/s]

[{'page_number': -41, 'page_char_count': 29, 'page_word_count': 4, 'page_sentence_count_raw': 1, 'page_token_count': 7, 'text': 'Human Nutrition: 2020 Edition'}, {'page_number': -39, 'page_char_count': 308, 'page_word_count': 42, 'page_sentence_count_raw': 1, 'page_token_count': 77, 'text': 'Human Nutrition: 2020 Edition UNIVERSITY OF HAWAI‘I AT MĀNOA FOOD SCIENCE AND HUMAN NUTRITION PROGRAM ALAN TITCHENAL, SKYLAR HARA, NOEMI ARCEO CAACBAY, WILLIAM MEINKE-LAU, YA-YUN YANG, MARIE KAINOA FIALKOWSKI REVILLA, JENNIFER DRAPER, GEMADY LANGFELDER, CHERYL GIBBY, CHYNA NICOLE CHUN, AND ALLISON CALABRESE'}]





In [None]:
import random

random.sample(pages_and_texts, k=3)

In [None]:
import polars as pl

df = pl.DataFrame(pages_and_texts)
summary = df.describe()
numeric_cols = [c for c, t in summary.schema.items() if t.is_numeric()]
summary = summary.with_columns(
	[pl.col(c).round(2) if c in numeric_cols else pl.col(c) for c in summary.columns]
)
summary

## Method 1: Fixed size chunking

In [36]:
def chunk_text(text: str, chunk_size: int = 500) -> List[str]:
	"""Splits text into chunks of specified size with overlap.
	Args:
	    text (str): The text to be chunked.
	    chunk_size (int): The size of each chunk in words.
	Returns:
	    List[str]: A list of text chunks.
	"""
	chunks = []
	current_chunk = ""
	words = text.split()

	for word in words:
		if len(current_chunk) + len(word) + 1 <= chunk_size:
			current_chunk += word + " "
		else:
			chunks.append(current_chunk.strip())
			current_chunk = word + " "

	if current_chunk:
		chunks.append(current_chunk.strip())

	return chunks

In [38]:
def chunk_pdf_pages(pages_and_texts: list, chunk_size: int = 500) -> List[Dict]:
	"""Chunks the text of each page into smaller segments.
	Args:
	    pages_and_texts (list): List of dictionaries containing page information and text.
	    chunk_size (int): The size of each chunk in words.
	Returns:
	    List[Dict]: A list of dictionaries with chunked text and associated metadata.
	"""
	chunked_data = []

	for page in pages_and_texts:
		page_number = page["page_number"]
		text = page["text"]
		chunks = chunk_text(text, chunk_size)

		for i, chunk in enumerate(chunks):
			chunked_data.append(
				{
					"page_number": page_number,
					"chunk_index": i,
					"chunk_text": chunk,
					"chunk_word_count": len(chunk.split()),
					"chunk_char_count": len(chunk),
					"chunk_token_count": int(len(chunk) / 4),
				}
			)

	return chunked_data

In [39]:
chunked_pages = chunk_pdf_pages(pages_and_texts, chunk_size=500)

In [None]:
print(f"Total chunks created: {len(chunked_pages)}")
print(
	f"25th chunk (page {chunked_pages[24]['page_number']}): {chunked_pages[24]['chunk_text'][:200]}..."
)

In [None]:
import random
import textwrap


def _scattered_indices(n: int, k: int, jitter_frac: float = 0.08) -> list[int]:
	"""Generate k scattered indices over range n with some jitter.
	Args:
	    n (int): The total number of items.
	    k (int): The number of indices to generate.
	    jitter_frac (float): Fractional jitter to apply to each index.
	Returns:
	    list[int]: A list of k scattered indices.
	"""
	if k <= 0:
		return []
	if k == 1:
		return [random.randrange(n)]
	anchors = [int(round(i * (n - 1) / (k - 1))) for i in range(k)]
	out, seen = [], set()
	radius = max(1, int(jitter_frac * n))
	for a in anchors:
		lo, hi = max(0, a - radius), min(n - 1, a + radius)
		j = random.randint(lo, hi)
		if j not in seen:
			out.append(j)
			seen.add(j)
	while len(out) < k:
		r = random.randrange(n)
		if r not in seen:
			out.append(r)
			seen.add(r)
	return out

In [None]:
def _draw_boxed_chunk(c: dict, wrap_at: int = 96) -> str:
	"""Draws a boxed representation of a text chunk.
	Args:
	    c (dict): A dictionary containing chunk metadata and text.
	    wrap_at (int): The maximum width for text wrapping.
	Returns:
	    str: A string representation of the boxed chunk.
	"""
	header = (
		f" Chunk p{c['page_number']} - idx {c['chunk_index']}  | "
		f"chars {c['chunk_char_count']} - words {c['chunk_word_count']} - tokens {c['chunk_token_count']}"
	)
	# wrap body text, avoid breaking long words awkwardly
	wrapped_lines = textwrap.wrap(
		c["chunk_text"], width=wrap_at, break_long_words=False, replace_whitespace=False
	)
	content_width = max([0, *map(len, wrapped_lines)])
	box_width = max(len(header), content_width + 2)  # +2 - side padding

	top = "┌" + "─" * (box_width) + "┐"
	hline = "|" + header.ljust(box_width) + "|"
	sep = "├" + "─" * (box_width) + "┤"
	body = "\n".join(
		"│ " + line.ljust(box_width - 2) + " │" for line in wrapped_lines
	) or ("|" + "".ljust(box_width - 2) + " |")
	bottom = "└" + "─" * (box_width) + "┘"
	return "\n".join([top, hline, sep, body, bottom])

In [None]:
def show_random_chunks(
	pages_and_texts: list, chunk_size: int = 500, k: int = 5, seed: int | None = 42
) -> None:
	"""Displays n random chunks from the chunked pages.
	Args:
	    pages_and_texts (list): List of tuples (page_number, text) for each page.
	    chunk_size (int): Size of each text chunk.
	    k (int): Number of random chunks to display.
	    seed (int | None): Random seed for reproducibility.
	"""
	if seed is not None:
		random.seed(seed)

	# Chunk the text from each page
	all_chunks = []
	all_chunks = chunk_pdf_pages(pages_and_texts, chunk_size)
	if not all_chunks:
		print("No chunks available to display.")
		return
	indices = _scattered_indices(len(all_chunks), k)
	print(
		f"Showing {len(indices)} scattered random chunks out of {len(all_chunks)} total chunks:\n"
	)
	for i, idx in enumerate(indices, 1):
		print(f"#{i}")
		print(_draw_boxed_chunk(all_chunks[idx]))
		print()  # extra newline between chunks

In [None]:
assert pages_and_texts is not None
show_random_chunks(pages_and_texts, chunk_size=500, k=5, seed=42)

### here you might have seen some chunk are smaller than 500 even though we have mentioned as chunk size as 500, its because the processes is happening at page level and it can happen once a couple of chunk are done at the page, rest of text is smaller than 500.

## Method 2: Semantic chunking

In [40]:
from sentence_transformers import SentenceTransformer
from sklearn.metrics.pairwise import cosine_similarity
import numpy as np
import nltk

nltk.download("punkt", quiet=True)

True

In [41]:
semantic_model = SentenceTransformer("all-MiniLM-L6-v2")

In [42]:
def semantic_chunk_text(
	text: str, similarity_threshold: float = 0.8, max_tokens: int = 500
) -> list:
	"""Splits text into semantically coherent chunks based on sentence embeddings.
	Args:
	    text (str): The text to be chunked.
	    similarity_threshold (float): Cosine similarity threshold to determine chunk boundaries.
	    max_tokens (int): Maximum number of tokens per chunk.
	Returns:
	    list: A list of semantically coherent text chunks.
	"""
	sentences = nltk.sent_tokenize(text)
	if not sentences:
		return []

	# ensure embeddings is a numpy array of shape (n_sentences, dim)
	embeddings = semantic_model.encode(sentences, convert_to_numpy=True)
	if not isinstance(embeddings, np.ndarray):
		embeddings = np.array(embeddings)

	chunks = []
	current_chunk = [sentences[0]]
	current_indices = [0]

	for i in range(1, len(sentences)):
		current_embedding = np.mean(embeddings[current_indices], axis=0)
		next_embedding = embeddings[i]
		sim = float(
			cosine_similarity(
				current_embedding.reshape(1, -1), next_embedding.reshape(1, -1)
			)[0, 0]
		)

		chunk_token_count = len(" ".join(current_chunk)) // 4

		if sim >= similarity_threshold and chunk_token_count < max_tokens:
			current_chunk.append(sentences[i])
			current_indices.append(i)
		else:
			chunks.append(" ".join(current_chunk))
			current_chunk = [sentences[i]]
			current_indices = [i]
	if current_chunk:
		chunks.append(" ".join(current_chunk))
	return chunks

In [43]:
def semantic_chunk_pdf_pages(
	pages_and_texts: list, similarity_threshold: float = 0.8, max_tokens: int = 500
) -> list[dict]:
	"""Chunks the text of each page into semantically coherent segments.
	Args:
	    pages_and_texts (list): List of dictionaries containing page information and text.
	    similarity_threshold (float): Cosine similarity threshold to determine chunk boundaries.
	    max_tokens (int): Maximum number of tokens per chunk.
	Returns:
	    list[dict]: A list of dictionaries with semantically chunked text and associated metadata.
	"""
	all_chunks = []

	for page in tqdm(pages_and_texts, desc="Semantic chunking pages"):
		page_number = page["page_number"]
		text = page["text"]
		chunks = semantic_chunk_text(text, similarity_threshold, max_tokens)

		for i, chunk in enumerate(chunks):
			all_chunks.append(
				{
					"page_number": page_number,
					"chunk_index": i,
					"chunk_text": chunk,
					"chunk_word_count": len(chunk.split()),
					"chunk_char_count": len(chunk),
					"chunk_token_count": int(len(chunk) / 4),
				}
			)

	return all_chunks

In [44]:
import nltk

nltk.download("punkt_tab")
semantic_chunk_pages = semantic_chunk_pdf_pages(
	pages_and_texts, similarity_threshold=0.75, max_tokens=500
)

[nltk_data] Downloading package punkt_tab to
[nltk_data]     /Users/tejaspancholi/nltk_data...
[nltk_data]   Package punkt_tab is already up-to-date!
Semantic chunking pages: 100%|██████████| 1179/1179 [00:28<00:00, 41.29it/s]


In [None]:
print(f"Total semantic chunks created: {len(semantic_chunk_pages)}")
print(
	f"25th semantic chunk (page {semantic_chunk_pages[24]['page_number']}): {semantic_chunk_pages[24]['chunk_text'][:200]}..."
)

### Number of chunks have drastically increased is because there is very limited similiarity meaning between pages or paras, leading to smaller chunk sizes

In [None]:
import random


def _scattered_indices(n: int, k: int, jitter_frac: float = 0.08) -> list[int]:
	"""Evenely spaced anchors + random jitters + indices scattered across [0,n-1]
	Args:
	    n (int): The total number of items.
	    k (int): The number of indices to generate.
	    jitter_frac (float): Fractional jitter to apply to each index.
	Returns:
	    list[int]: A list of k scattered indices.
	"""
	if k <= 0:
		return []
	if k == 1:
		return [random.randrange(n)]
	anchors = [int(round(i * (n - 1) / (k - 1))) for i in range(k)]
	out, seen = [], set()
	radius = max(1, int(jitter_frac * n))
	for a in anchors:
		lo, hi = max(0, a - radius), min(n - 1, a + radius)
		j = random.randint(lo, hi)
		if j not in seen:
			out.append(j)
			seen.add(j)
	while len(out) < k:
		r = random.randrange(n)
		if r not in seen:
			out.append(r)
			seen.add(r)
	return out

In [None]:
def _draw_boxed_chunk(c: dict, wrap_at: int = 96) -> str:
	"""Draws a boxed representation of a text chunk.
	Args:
	    c (dict): A dictionary containing chunk metadata and text.
	    wrap_at (int): The maximum width for text wrapping.
	Returns:
	    str: A string representation of the boxed chunk.
	"""
	approx_tokens = c.get("chunk_token_count", len(c["chunk_text"]) / 4)
	header = (
		f" Chunk p{c['page_number']} - idx {c['chunk_index']}  | "
		f"chars {c['chunk_char_count']} - words {c['chunk_word_count']} - tokens {round(approx_tokens, 2)}"
	)
	# wrap body text, avoid breaking long words awkwardly
	wrapped_lines = textwrap.wrap(
		c["chunk_text"], width=wrap_at, break_long_words=False, replace_whitespace=False
	)
	content_width = max([0, *map(len, wrapped_lines)])
	box_width = max(len(header), content_width + 2)  # +2 - side padding

	top = "┌" + "─" * (box_width) + "┐"
	hline = "|" + header.ljust(box_width) + "|"
	sep = "├" + "─" * (box_width) + "┤"
	body = "\n".join(
		"│ " + line.ljust(box_width - 2) + " │" for line in wrapped_lines
	) or ("|" + "".ljust(box_width - 2) + " |")
	bottom = "└" + "─" * (box_width) + "┘"
	return "\n".join([top, hline, sep, body, bottom])

In [None]:
def show_semantic_random_chunks(
	semantic_chunked_pages: list[dict], k: int = 5, seed: int | None = 42
) -> None:
	"""Displays n random chunks from the semantic chunked pages.
	Args:
	    pages_and_texts (list): List of tuples (page_number, text) for each page.
	    k (int): Number of random chunks to display.
	    seed (int | None): Random seed for reproducibility.
	"""
	if seed is not None:
		random.seed(seed)

	n = len(semantic_chunked_pages)
	if n == 0:
		print("No semantic chunks available to display.")
		return
	idxs = _scattered_indices(n, k)
	print(
		f"Showing {len(idxs)} scattered random semantic chunks out of {n} total chunks:\n"
	)
	for i, idx in enumerate(idxs, 1):
		print(f"#{i}")
		print(_draw_boxed_chunk(semantic_chunked_pages[idx]))
		print()  # extra newline between chunks

In [None]:
assert semantic_chunk_pages is not None
show_semantic_random_chunks(semantic_chunk_pages, k=5, seed=42)

## Method 3 - Recursive Chunking

### How it works

- if chunk is smaller than `max_chunk_size`, its chunked as is
- if its larger, it will try to split by `\n\n` which is usually between two sections
- if that also does not work it will try to split by `\n` which is usually a para
-  if para is still too large, its split by sentence
- repeat the process recursively
- *note* the list of separator can be changed, by supplying it during method invocation. 

In [None]:
import nltk

nltk.download("punkt", quiet=True)

In [45]:
def recursive_chunk_text(
	text: str, max_chunk_size: int = 1000, min_chunk_size: int = 100
) -> list:
	"""Recursively chunks text into smaller segments based on size constraints.
	Tries splitting by sections, then newlines, then sentences.
	Args:
	    text (str): The text to be chunked.
	    max_chunk_size (int): Maximum size of each chunk in characters.
	    min_chunk_size (int): Minimum size of each chunk in characters.
	Returns:
	    list: A list of text chunks.
	"""

	def split_chunk(chunk: str) -> list:
		if len(chunk) <= max_chunk_size:
			return [chunk.strip()]
		# first try splitting by sections (double newlines)
		sections = chunk.split("\n\n")
		if len(sections) > 1:
			result = []
			for section in sections:
				if section.strip():
					result.extend(split_chunk(section.strip()))
			return result
		# next try splitting by single newlines
		sections = chunk.split("\n")
		if len(sections) > 1:
			result = []
			for section in sections:
				if section.strip():
					result.extend(split_chunk(section.strip()))
			return result
		# finally split by sentences
		sentences = nltk.sent_tokenize(chunk)
		chunks, current_chunk, current_size = [], [], 0
		for sentence in sentences:
			if current_size + len(sentence) > max_chunk_size:
				if current_chunk:
					chunks.append(" ".join(current_chunk).strip())
				current_chunk, current_size = [sentence], len(sentence)
			else:
				current_chunk.append(sentence)
				current_size += len(sentence)
		if current_chunk:
			chunks.append(" ".join(current_chunk).strip())
		return chunks

	return split_chunk(text)

In [46]:
def recursive_chunk_pdf_pages(
	pages_and_texts: list, max_chunk_size: int = 1000, min_chunk_size: int = 100
) -> list[dict]:
	"""Chunks the text of each page into smaller segments using recursive chunking.
	Args:
	    pages_and_texts (list): List of dictionaries containing page information and text.
	    max_chunk_size (int): Maximum size of each chunk in characters.
	    min_chunk_size (int): Minimum size of each chunk in characters.
	Returns:
	    list[dict]: A list of dictionaries with recursively chunked text and associated metadata.
	"""
	all_chunks = []

	for page in tqdm(pages_and_texts, desc="Recursive chunking pages"):
		page_number = page["page_number"]
		text = page["text"]
		chunks = recursive_chunk_text(text, max_chunk_size, min_chunk_size)

		for i, chunk in enumerate(chunks):
			all_chunks.append(
				{
					"page_number": page_number,
					"chunk_index": i,
					"chunk_text": chunk,
					"chunk_word_count": len(chunk.split()),
					"chunk_char_count": len(chunk),
					"chunk_token_count": int(len(chunk) / 4),
				}
			)

	return all_chunks

In [47]:
recursive_chunked_pages = recursive_chunk_pdf_pages(
	pages_and_texts, max_chunk_size=1000, min_chunk_size=100
)
print(f"Total recursive chunks created: {len(recursive_chunked_pages)}")
print(
	f"25th recursive chunk (page {recursive_chunked_pages[24]['page_number']}): {recursive_chunked_pages[24]['chunk_text'][:200]}..."
)

Recursive chunking pages: 100%|██████████| 1179/1179 [00:00<00:00, 15581.01it/s]

Total recursive chunks created: 1949
25th recursive chunk (page -17): Preface UNIVERSITY OF HAWAI‘I AT MĀNOA FOOD SCIENCE AND HUMAN NUTRITION PROGRAM AND HUMAN NUTRITION PROGRAM ‘A‘ohe pau ka ‘ike i ka hālau ho‘okahi Knowledge isn’t taught in all one place This open acc...





In [None]:
import random


def _scattered_indices(n: int, k: int, jitter_frac: float = 0.08) -> list[int]:
	"""Generate k scattered indices over range n with some jitter.
	Args:
	    n (int): The total number of items.
	    k (int): The number of indices to generate.
	    jitter_frac (float): Fractional jitter to apply to each index.
	Returns:
	    list[int]: A list of k scattered indices.
	"""
	if k <= 0:
		return []
	if k == 1:
		return [random.randrange(n)]
	anchors = [int(round(i * (n - 1) / (k - 1))) for i in range(k)]
	out, seen = [], set()
	radius = max(1, int(jitter_frac * n))
	for a in anchors:
		lo, hi = max(0, a - radius), min(n - 1, a + radius)
		j = random.randint(lo, hi)
		if j not in seen:
			out.append(j)
			seen.add(j)
	while len(out) < k:
		r = random.randrange(n)
		if r not in seen:
			out.append(r)
			seen.add(r)
	return out


def _draw_boxed_chunk(c: dict, wrap_at: int = 96) -> str:
	"""Draws a boxed representation of a text chunk.
	Args:
	    c (dict): A dictionary containing chunk metadata and text.
	    wrap_at (int): The maximum width for text wrapping.
	Returns:
	    str: A string representation of the boxed chunk.
	"""
	approx_tokens = c.get("chunk_token_count", len(c["chunk_text"]) / 4)
	header = (
		f" Chunk p{c['page_number']} - idx {c['chunk_index']}  | "
		f"chars {c['chunk_char_count']} - words {c['chunk_word_count']} - tokens {round(approx_tokens, 2)}"
	)
	# wrap body text, avoid breaking long words awkwardly
	wrapped_lines = textwrap.wrap(
		c["chunk_text"], width=wrap_at, break_long_words=False, replace_whitespace=False
	)
	content_width = max([0, *map(len, wrapped_lines)])
	box_width = max(len(header), content_width + 2)  # +2 - side padding

	top = "┌" + "─" * (box_width) + "┐"
	hline = "|" + header.ljust(box_width) + "|"
	sep = "├" + "─" * (box_width) + "┤"
	body = "\n".join(
		"│ " + line.ljust(box_width - 2) + " │" for line in wrapped_lines
	) or ("|" + "".ljust(box_width - 2) + " |")
	bottom = "└" + "─" * (box_width) + "┘"
	return "\n".join([top, hline, sep, body, bottom])


def show_recursive_random_chunks(
	recursive_chunked_pages: list[dict], k: int = 5, seed: int | None = 42
) -> None:
	"""Displays n random chunks from the recursive chunked pages.
	Args:
	    pages_and_texts (list): List of tuples (page_number, text) for each page.
	    k (int): Number of random chunks to display.
	    seed (int | None): Random seed for reproducibility.
	"""
	if seed is not None:
		random.seed(seed)

	n = len(recursive_chunked_pages)
	if n == 0:
		print("No recursive chunks available to display.")
		return
	idxs = _scattered_indices(n, k)
	print(
		f"Showing {len(idxs)} scattered random recursive chunks out of {n} total chunks:\n"
	)
	for i, idx in enumerate(idxs, 1):
		print(f"#{i}")
		print(_draw_boxed_chunk(recursive_chunked_pages[idx]))
		print()  # extra newline between chunks

In [None]:
assert recursive_chunked_pages is not None and len(recursive_chunked_pages) > 0
show_recursive_random_chunks(recursive_chunked_pages, k=5, seed=42)

## Method 4: Structured based chunking

### How it works
- The function looks for `headers` such as Chapter numbers(e.g. CHAPTER 1) or section heading(i.e. 1.1 Introduction)
- Every time it finds a header it starts a new chunk till either another heading is reached or max token size limit is exceeded
- This preserves logical flow

### Engineer's decision
- Works well with documents which have clear hierarchy(chapter, section, subsection)

In [48]:
# helper function to detect chapter start
def _is_chapter_start(text: str) -> bool:
	"""Detects if a line indicates the start of a new chapter or section."""
	# chapter_patterns = [
	#     r'^\s*CHAPTER\s+\d+',  # Matches "CHAPTER 1", "CHAPTER 2", etc.
	#     r'^\s*\d+(\.\d+)*\s+[A-Z][a-zA-Z\s]*',  # Matches "1. Introduction", "2.1 Background", etc.
	# ]
	# for pattern in chapter_patterns:
	#     if re.match(pattern, line):
	#         return True
	# return False
	return re.search(r"university\s+of\s+hawai", text, flags=re.IGNORECASE) is not None


def _guess_title_from_page(text: str) -> str:
	"""the previous line of 'University of Hawaii' is likely the title
	falls back to the first ~120 characters
	Args:
	    text (str): The text of the page.
	Returns:
	    str: The guessed title of the page.
	"""
	match = re.search(r"university\s+of\s+hawai", text, flags=re.IGNORECASE)
	if match:
		title = text[: match.start()].strip()
		title = re.sub(r"\s+", " ", title.strip())  # Clean up whitespace
		if 10 <= len(title) <= 180:
			return title
	# fallback to first ~120 characters
	title = re.sub(r"\s+", " ", text).strip()
	return title[:120] if title else "Untitled"


def chapter_chunk_pdf_pages(pages_and_texts: list) -> list[dict]:
	"""
	Chunks PDF pages into sections based on chapter titles.
	Args:
	    pages_and_texts (list): List of tuples (page_number, text) for each page.
	Returns:
	    list[dict]: A list of dictionaries with chapter titles and associated pages.
	"""
	if not pages_and_texts:
		return []
	chapter_starts = []
	for i, page in enumerate(pages_and_texts):
		text = page["text"]
		if _is_chapter_start(text):
			chapter_starts.append(i)
	# if nothing detected, return all pages as a single chunk
	if not chapter_starts:
		# No chapters found, return all pages as a single chunk
		all_text = " ".join(page["text"] for page in pages_and_texts).strip()
		return [
			{
				"chapter_index": 0,
				"chapter_title": _guess_title_from_page(pages_and_texts[0]["text"]),
				"start_page": pages_and_texts[0]["page_number"],
				"end_page": pages_and_texts[-1]["page_number"],
				"chunk_char_count": len(all_text),
				"chunk_word_count": len(all_text.split()),
				"chunk_token_count": int(len(all_text) / 4),
				"chunk_text": all_text,
			}
		]
	# build chapter ranges (start -> next start - 1)
	chapter_chunks = []
	for idx, start in enumerate(chapter_starts):
		end = (
			chapter_starts[idx + 1] - 1
			if idx + 1 < len(chapter_starts)
			else len(pages_and_texts) - 1
		)
		if end < start:
			continue  # skip invalid ranges
		chapter_pages = pages_and_texts[start : end + 1]
		chapter_text = " ".join(page["text"] for page in chapter_pages).strip()
		chapter_title = _guess_title_from_page(chapter_pages[0]["text"])
		chapter_chunks.append(
			{
				"chapter_index": idx,
				"chapter_title": chapter_title,
				"start_page": chapter_pages[0]["page_number"],
				"end_page": chapter_pages[-1]["page_number"],
				"chunk_char_count": len(chapter_text),
				"chunk_word_count": len(chapter_text.split()),
				"chunk_token_count": int(len(chapter_text) / 4),
				"chunk_text": chapter_text,
			}
		)
	return chapter_chunks

In [49]:
structured_chunk_pages = chapter_chunk_pdf_pages(pages_and_texts)
print(f"Total chapter chunks created: {len(structured_chunk_pages)}")
print(
	f"1st chapter chunk (pages {structured_chunk_pages[0]['start_page']} to {structured_chunk_pages[0]['end_page']}): {structured_chunk_pages[0]['chunk_text'][:200]}..."
)

Total chapter chunks created: 171
1st chapter chunk (pages -39 to -39): Human Nutrition: 2020 Edition UNIVERSITY OF HAWAI‘I AT MĀNOA FOOD SCIENCE AND HUMAN NUTRITION PROGRAM ALAN TITCHENAL, SKYLAR HARA, NOEMI ARCEO CAACBAY, WILLIAM MEINKE-LAU, YA-YUN YANG, MARIE KAINOA FI...


In [None]:
import random


def _draw_boxed_chunk(c: dict, wrap_at: int = 96) -> str:
	"""Draws a boxed representation of a text chunk.
	Args:
	    c (dict): A dictionary containing chunk metadata and text.
	    wrap_at (int): The maximum width for text wrapping.
	Returns:
	    str: A string representation of the boxed chunk.
	"""
	approx_tokens = c.get("chunk_token_count", len(c["chunk_text"]) / 4)
	header = (
		f" Chapter idx {c['chapter_index']} - '{c['chapter_title']}'  | "
		f"pages {c['start_page']} to {c['end_page']} | "
		f"chars {c['chunk_char_count']} - words {c['chunk_word_count']} - tokens {round(approx_tokens, 2)}"
	)
	# wrap body text, avoid breaking long words awkwardly
	wrapped_lines = textwrap.wrap(
		c["chunk_text"], width=wrap_at, break_long_words=False, replace_whitespace=False
	)
	content_width = max([0, *map(len, wrapped_lines)])
	box_width = max(len(header), content_width + 2)  # +2 - side padding

	top = "┌" + "─" * (box_width) + "┐"
	hline = "|" + header.ljust(box_width) + "|"
	sep = "├" + "─" * (box_width) + "┤"
	body = "\n".join(
		"│ " + line.ljust(box_width - 2) + " │" for line in wrapped_lines
	) or ("|" + "".ljust(box_width - 2) + " |")
	bottom = "└" + "─" * (box_width) + "┘"
	return "\n".join([top, hline, sep, body, bottom])


def show_chapter_random_chunks(
	structured_chunked_pages: list[dict], k: int = 5, seed: int | None = 42
) -> None:
	"""Displays n random chunks from the chapter chunked pages.
	Args:
	    pages_and_texts (list): List of tuples (page_number, text) for each page.
	    k (int): Number of random chunks to display.
	    seed (int | None): Random seed for reproducibility.
	"""
	if seed is not None:
		random.seed(seed)

	n = len(structured_chunked_pages)
	if n == 0:
		print("No chapter chunks available to display.")
		return
	k = min(k, len(structured_chunked_pages))
	idxs = random.sample(range(n), k)
	print(f"Showing {len(idxs)} random chapter chunks out of {n} total chunks:\n")
	for i, idx in enumerate(idxs, 1):
		print(f"#{i}")
		print(_draw_boxed_chunk(structured_chunked_pages[idx]))
		print()  # extra newline between chunks


assert structured_chunk_pages is not None and len(structured_chunk_pages) > 0
show_chapter_random_chunks(structured_chunk_pages, k=5, seed=42)

## Strategy 5: LLM Based chunking

In [None]:
import os
from dotenv import load_dotenv, find_dotenv

load_dotenv(find_dotenv())
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")

In [None]:
from openai import OpenAI

client = OpenAI(api_key=OPENAI_API_KEY)

In [None]:
def llm_based_chunk(
	text: str, chunk_size: int = 500, model: str = "gpt-4.1-mini"
) -> list[str]:
	"""
	Uses OpenAI's GPT model to chunk text into smaller segments.
	Args:
	    text (str): The text to be chunked.
	    chunk_size (int): The desired size of each chunk in words.
	    model (str): The OpenAI model to use for chunking.
	Returns:
	    list[str]: A list of text chunks.
	"""

	def get_chunk_boundary(text_sequence: str) -> int:
		"""Helper function to get chunk boundary from LLM.
		Args:
		    text_sequence (str): The text sequence to analyze.
		Returns:
		    int: The index to split the text at.
		"""
		prompt = f"""
        Analyze the following text and identify the best point to split it into two semantically coherent parts.
        The split should occur near the {chunk_size} characters.
        Text:
        \"\"\"{text_sequence}\"\"\"
        Return only the integer index (character position) where the split should occur.  
        Do not return explanations or any other text.
        """

		response = client.chat.completions.create(
			model=model,
			messages=[
				{
					"role": "system",
					"content": "You are a text analysis expert that identifies text chunk boundaries.",
				},
				{"role": "user", "content": prompt},
			],
			temperature=0.0,
		)
		if response is None or not response.choices[0].message.content:
			return chunk_size  # fallback to default chunk size
		else:
			if response.choices[0].message.content.strip().isdigit() is False:
				return chunk_size  # fallback to default chunk size
			else:
				split_index_str = (response.choices[0].message.content).strip()
				split_index = int(split_index_str)
				return split_index

	chunks = []
	remaining_text = text

	while len(remaining_text) > chunk_size:
		text_window = remaining_text[: chunk_size * 2]
		split_index_str = get_chunk_boundary(text_window)
		if split_index_str < 100 or split_index_str > len(text_window) - 100:
			split_index_str = chunk_size
		chunks.append(remaining_text[:split_index_str].strip())
		remaining_text = remaining_text[split_index_str:].strip()
	if remaining_text:
		chunks.append(remaining_text)
	return chunks


def llm_based_chunk_pdf_pages(
	pages_and_texts: list[dict], chunk_size: int = 1000, model: str = "gpt-4.1-mini"
) -> list[dict]:
	"""
	Chunks the text of each page into smaller segments using LLM-based chunking.
	Args:
	    pages_and_texts (list[dict]): List of dictionaries containing page information and text.
	    chunk_size (int): The desired size of each chunk in words.
	    model (str): The OpenAI model to use for chunking.
	Returns:
	    list[dict]: A list of dictionaries with LLM chunked text and associated metadata.
	"""
	all_chunks = []

	for page in tqdm(pages_and_texts, desc="LLM chunking pages"):
		page_number = page["page_number"]
		text = page["text"]
		chunks = llm_based_chunk(text, chunk_size, model)

		for i, chunk in enumerate(chunks):
			all_chunks.append(
				{
					"page_number": page_number,
					"chunk_index": i,
					"chunk_text": chunk,
					"chunk_word_count": len(chunk.split()),
					"chunk_char_count": len(chunk),
					"chunk_token_count": int(len(chunk) / 4),
				}
			)

	return all_chunks

In [None]:
llm_chunked_pages = llm_based_chunk_pdf_pages(
	pages_and_texts, chunk_size=500, model="gpt-4.1-mini"
)
print(f"Total LLM chunks created: {len(llm_chunked_pages)}")
print(
	f"25th LLM chunk (page {llm_chunked_pages[24]['page_number']}): {llm_chunked_pages[24]['chunk_text'][:200]}..."
)

In [22]:
import random


def _scattered_indices(n: int, k: int, jitter_frac: float = 0.08) -> list[int]:
	"""Generate k scattered indices over range n with some jitter.
	Args:
	    n (int): The total number of items.
	    k (int): The number of indices to generate.
	    jitter_frac (float): Fractional jitter to apply to each index.
	Returns:
	    list[int]: A list of k scattered indices.
	"""
	if k <= 0:
		return []
	if k == 1:
		return [random.randrange(n)]
	anchors = [int(round(i * (n - 1) / (k - 1))) for i in range(k)]
	out, seen = [], set()
	radius = max(1, int(jitter_frac * n))
	for a in anchors:
		lo, hi = max(0, a - radius), min(n - 1, a + radius)
		j = random.randint(lo, hi)
		if j not in seen:
			out.append(j)
			seen.add(j)
	while len(out) < k:
		r = random.randrange(n)
		if r not in seen:
			out.append(r)
			seen.add(r)
	return out


def _draw_boxed_chunk(c: dict, wrap_at: int = 96) -> str:
	"""Draws a boxed representation of a text chunk.
	Args:
	    c (dict): A dictionary containing chunk metadata and text.
	    wrap_at (int): The maximum width for text wrapping.
	Returns:
	    str: A string representation of the boxed chunk.
	"""
	approx_tokens = c.get("chunk_token_count", len(c["chunk_text"]) / 4)
	header = (
		f" Chunk p{c['page_number']} - idx {c['chunk_index']}  | "
		f"chars {c['chunk_char_count']} - words {c['chunk_word_count']} - tokens {round(approx_tokens, 2)}"
	)
	# wrap body text, avoid breaking long words awkwardly
	wrapped_lines = textwrap.wrap(
		c["chunk_text"], width=wrap_at, break_long_words=False, replace_whitespace=False
	)
	content_width = max([0, *map(len, wrapped_lines)])
	box_width = max(len(header), content_width + 2)  # +2 - side padding

	top = "┌" + "─" * (box_width) + "┐"
	hline = "|" + header.ljust(box_width) + "|"
	sep = "├" + "─" * (box_width) + "┤"
	body = "\n".join(
		"│ " + line.ljust(box_width - 2) + " │" for line in wrapped_lines
	) or ("|" + "".ljust(box_width - 2) + " |")
	bottom = "└" + "─" * (box_width) + "┘"
	return "\n".join([top, hline, sep, body, bottom])


def show_llm_random_chunks(
	llm_chunked_pages: list[dict], k: int = 5, seed: int | None = 42
) -> None:
	"""Displays n random chunks from the LLM chunked pages.
	Args:
	    pages_and_texts (list): List of tuples (page_number, text) for each page.
	    k (int): Number of random chunks to display.
	    seed (int | None): Random seed for reproducibility.
	"""
	if seed is not None:
		random.seed(seed)

	n = len(llm_chunked_pages)
	if n == 0:
		print("No LLM chunks available to display.")
		return
	idxs = _scattered_indices(n, k)
	print(f"Showing {len(idxs)} scattered random LLM chunks out of {n} total chunks:\n")
	for i, idx in enumerate(idxs, 1):
		print(f"#{i}")
		print(_draw_boxed_chunk(llm_chunked_pages[idx]))
		print()  # extra newline between chunks

In [23]:
assert llm_chunked_pages is not None and len(llm_chunked_pages) > 0
show_llm_random_chunks(llm_chunked_pages, k=5, seed=42)

Showing 5 scattered random LLM chunks out of 3313 total chunks:

#1
┌──────────────────────────────────────────────────────────────────────────────────────────────────┐
| Chunk p-8 - idx 0  | chars 315 - words 44 - tokens 78                                            |
├──────────────────────────────────────────────────────────────────────────────────────────────────┤
│ Gemady Langfelder Gemady Langfelder is an undergraduate dietetics student at the University of   │
│ Hawai‘i at Mānoa. She is an ACSM certified personal trainer and a novice horticulturist. Her     │
│ interests are nutritional epidemiology, infant and pre-/post-natal nutrition, and health policy. │
│ xxxiv | About the Contributors                                                                   │
└──────────────────────────────────────────────────────────────────────────────────────────────────┘

#2
┌─────────────────────────────────────────────────────────────────────────────────────────────────┐
| Chunk p198 - idx 0

## Analysis of chunking methods


In [51]:
import pandas as pd

# metric can be "chars", "words", or "tokens"

METRIC = "words"


def _size_val(c, metric: str):
	if metric == "chars":
		return c.get("chunk_char_count", len(c.get("chunk_text", "")))
	elif metric == "words":
		return c.get("chunk_word_count", len(c.get("chunk_text", "").split()))
	elif metric == "tokens":
		return c.get("chunk_token_count", len(c.get("chunk_text", "")) / 4)
	else:
		raise ValueError(f"Unknown metric: {metric}")


def analyze_chunks(chunks: list[dict], method_name: str, metric: str = "words") -> dict:
	"""Analyzes chunk sizes and provides statistics and histogram data.
	Args:
	    chunks (list[dict]): List of chunk dictionaries.
	    method_name (str): The name of the chunking method used.
	    metric (str): The metric to analyze ("chars", "words", or "tokens").
	Returns:
	    dict: A dictionary containing analysis results.
	"""
	if not chunks:
		return {}

	sizes = [_size_val(c, metric) for c in chunks]
	sizes = [s for s in sizes if s > 0]  # filter out non-positive sizes
	if not sizes:
		return {}

	sizes_array = np.array(sizes)
	analysis = {
		"method": method_name,
		"# of chunks": len(sizes),
		"min chunk size": int(np.min(sizes_array)),
		"max chunk size": int(np.max(sizes_array)),
		"avg. chunk size": float(np.mean(sizes_array)),
		"median chunk size": float(np.median(sizes_array)),
	}

	return analysis

In [52]:
datasets = [
	("Fixed Size Chunks", chunked_pages),
	("Semantic Chunks", semantic_chunk_pages),
	("Recursive Chunks", recursive_chunked_pages),
	("Chapter Chunks", structured_chunk_pages),
	("LLM-based Chunks", llm_chunked_pages),
]

results = [analyze_chunks(chunks, name, METRIC) for name, chunks in datasets]
df = pd.DataFrame(results).dropna().sort_values(by="avg. chunk size", ascending=False)
print(df.round(3).to_string(index=False))

print("\n Performance Analysis")
print("1. Fixed Size Chunks: Simple and fast, but may split sentences awkwardly.")
print("2. Semantic Chunks: More coherent chunks, but requires embedding computation.")
print("3. Recursive Chunks: Balances size and coherence, good for varied text.")
print(
	"4. Structure based Chunks: Best for structured documents, but relies on clear headings."
)
print("5. LLM-based Chunks: Highly coherent, but slower and dependent on API calls.")

           method  # of chunks  min chunk size  max chunk size  avg. chunk size  median chunk size
   Chapter Chunks          171              25            8900         1214.801              819.0
 Recursive Chunks         1949               3             227          106.586              122.0
 LLM-based Chunks         3313               1             117           63.141               74.0
Fixed Size Chunks         3321               1             117           62.552               73.0
  Semantic Chunks        12027               1             227           17.273               14.0

 Performance Analysis
1. Fixed Size Chunks: Simple and fast, but may split sentences awkwardly.
2. Semantic Chunks: More coherent chunks, but requires embedding computation.
3. Recursive Chunks: Balances size and coherence, good for varied text.
4. Structure based Chunks: Best for structured documents, but relies on clear headings.
5. LLM-based Chunks: Highly coherent, but slower and dependent on API c