Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 41 additions & 37 deletions loaders/text.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,62 +39,66 @@ def __init__(self, config: Config):
)

def load(self, paths: List[Path]) -> List[Document]:
"""Load files → Unstructured elements → grouped chunks."""
all_chunks: List[Document] = []
"""Partition → group → (optional) secondary split → add chunk indices."""
grouped: list[Document] = []

for path in paths:
try:
logger.info("Partitioning %s", path)
elements = partition(filename=str(path), strategy="fast")

buf: List[str] = []
seen: set[str] = set()
buf_len = 0
buf, buf_len, chunk_idx = [], 0, 0
fname = Path(path).name

for el in elements:
text = getattr(el, "text", "").strip()
if not text or text in seen:
continue # skip blanks & exact duplicates
seen.add(text)

# If starting a new group, add a tiny heading once
if buf_len == 0:
buf.append(f"## {fname}\n")

# Flush if adding this element would exceed chunk_size
if buf_len + len(text) > self.config.chunk_size and buf:
all_chunks.append(
Document(
page_content="\n".join(buf).strip(),
metadata={"source": str(path)},
)
)
buf, seen, buf_len = [f"## {fname}\n"], set(), 0

buf.append(text)
buf_len += len(text)

# — flush remainder —
if buf_len:
all_chunks.append(
def _flush():
nonlocal buf, buf_len, chunk_idx
if not buf_len:
return
grouped.append(
Document(
page_content="\n".join(buf).strip(),
metadata={"source": str(path)},
metadata={
"source": str(path),
"chunk_id": chunk_idx,
},
)
)
buf, buf_len = [], 0
chunk_idx += 1

for el in elements:
txt = getattr(el, "text", "").strip()
if not txt:
continue
if buf_len == 0:
buf.append(f"## {fname}\n") # one heading per chunk
if buf_len + len(txt) > self.config.chunk_size:
_flush()
buf.append(txt)
buf_len += len(txt)
_flush()

except Exception as e:
logger.warning("Failed to load %s: %s", path, e)

# 2) secondary split for *very* large groups
final_docs: List[Document] = []
for doc in all_chunks:
# — optional secondary split for ultra‑long groups
final_docs = []
for doc in grouped:
if len(doc.page_content) > self.config.chunk_size * 2:
final_docs.extend(self.splitter.split_documents([doc]))
else:
final_docs.append(doc)

avg = sum(len(d.page_content) for d in final_docs) / max(1, len(final_docs))
logger.info("Produced %d chunks (avg %.0f chars)", len(final_docs), avg)
# annotate chunk_total (needed only once per file)
counts: dict[str, int] = {}
for d in final_docs:
counts[d.metadata["source"]] = counts.get(d.metadata["source"], 0) + 1
for d in final_docs:
d.metadata["chunk_total"] = counts[d.metadata["source"]]

logger.info(
"Produced %d chunks (avg %.0f chars)",
len(final_docs),
sum(len(d.page_content) for d in final_docs) / max(1, len(final_docs)),
)
return final_docs