From 16f2cdbacb6c10ee4a8a41bc560b68fe67bd00e4 Mon Sep 17 00:00:00 2001 From: Drew Minnear Date: Mon, 21 Apr 2025 22:01:07 -0400 Subject: [PATCH] add chunk index to better find neighbors --- loaders/text.py | 78 ++++++++++++++++++++++++++----------------------- 1 file changed, 41 insertions(+), 37 deletions(-) diff --git a/loaders/text.py b/loaders/text.py index 01ab434..5e5922d 100644 --- a/loaders/text.py +++ b/loaders/text.py @@ -39,62 +39,66 @@ def __init__(self, config: Config): ) def load(self, paths: List[Path]) -> List[Document]: - """Load files → Unstructured elements → grouped chunks.""" - all_chunks: List[Document] = [] + """Partition → group → (optional) secondary split → add chunk indices.""" + grouped: list[Document] = [] for path in paths: try: logger.info("Partitioning %s", path) elements = partition(filename=str(path), strategy="fast") - buf: List[str] = [] - seen: set[str] = set() - buf_len = 0 + buf, buf_len, chunk_idx = [], 0, 0 fname = Path(path).name - for el in elements: - text = getattr(el, "text", "").strip() - if not text or text in seen: - continue # skip blanks & exact duplicates - seen.add(text) - - # If starting a new group, add a tiny heading once - if buf_len == 0: - buf.append(f"## {fname}\n") - - # Flush if adding this element would exceed chunk_size - if buf_len + len(text) > self.config.chunk_size and buf: - all_chunks.append( - Document( - page_content="\n".join(buf).strip(), - metadata={"source": str(path)}, - ) - ) - buf, seen, buf_len = [f"## {fname}\n"], set(), 0 - - buf.append(text) - buf_len += len(text) - - # — flush remainder — - if buf_len: - all_chunks.append( + def _flush(): + nonlocal buf, buf_len, chunk_idx + if not buf_len: + return + grouped.append( Document( page_content="\n".join(buf).strip(), - metadata={"source": str(path)}, + metadata={ + "source": str(path), + "chunk_id": chunk_idx, + }, ) ) + buf, buf_len = [], 0 + chunk_idx += 1 + + for el in elements: + txt = getattr(el, "text", "").strip() + if not txt: + continue + if buf_len == 0: + buf.append(f"## {fname}\n") # one heading per chunk + if buf_len + len(txt) > self.config.chunk_size: + _flush() + buf.append(txt) + buf_len += len(txt) + _flush() except Exception as e: logger.warning("Failed to load %s: %s", path, e) - # 2) secondary split for *very* large groups - final_docs: List[Document] = [] - for doc in all_chunks: + # — optional secondary split for ultra‑long groups — + final_docs = [] + for doc in grouped: if len(doc.page_content) > self.config.chunk_size * 2: final_docs.extend(self.splitter.split_documents([doc])) else: final_docs.append(doc) - avg = sum(len(d.page_content) for d in final_docs) / max(1, len(final_docs)) - logger.info("Produced %d chunks (avg %.0f chars)", len(final_docs), avg) + # annotate chunk_total (needed only once per file) + counts: dict[str, int] = {} + for d in final_docs: + counts[d.metadata["source"]] = counts.get(d.metadata["source"], 0) + 1 + for d in final_docs: + d.metadata["chunk_total"] = counts[d.metadata["source"]] + + logger.info( + "Produced %d chunks (avg %.0f chars)", + len(final_docs), + sum(len(d.page_content) for d in final_docs) / max(1, len(final_docs)), + ) return final_docs