In [1]:
# MODEL
EMBEDDING_PROVIDER="aws"
EMBEDDING_MODEL_ID="amazon.titan-embed-text-v2:0"
LLM_PROVIDER="aws"
LLM_ID = "apac.amazon.nova-lite-v1:0"
MAX_TOKEN = 1500
TEMPERATURE = 0.0
STREAMING = True

# DATABASE
REDIS_URL="redis://localhost:6379"
PERSIST_DIR="./data/chroma_dbs"
COLLECTION_NAME="parent-child-chunk-v1"
BM25_INDEX_PATH = "./data/bm25-index/bm25-index-v1.pkl"
STOPWORDS_PATH = "./data/stopwords.txt"

# RETRIEVER
RETRIEVAL_STRATEGY: str = "hybrid" 
MAX_RESULTS: int = 2
BM25_SEARCH_K: int = 5
VECTOR_SEARCH_K: int = 80
BM25_WEIGHT: float = 0.3
VECTOR_WEIGHT: float = 0.7 
RRF_CONSTANT: int = 5
SCORE_THRESHOLD: float = 0.01

# PROMPT
PROMPT_VERSION = "v1"
PROMPT_DIR = "./prompts"

# SPLITTER
PARENT_CHUNK_SIZE=2400
PARENT_CHUNK_OVERLAP=260
CHILD_CHUNK_SIZE=300
CHILD_CHUNK_OVERLAP=60

: 

### Loader

In [24]:
import os
import glob
from typing import List
from langchain_community.document_loaders import PyMuPDFLoader
from langchain_core.documents import Document

class PDFFileLoader:
    def load(self, source: str) -> List[Document]:
        try:
            loader = PyMuPDFLoader(file_path=source)
            documents = loader.load()
            for doc in documents:
                doc.metadata["source_type"] = "pdf"
                doc.metadata["file_path"] = source
            return documents
        except Exception as e:
            error_msg = f"Gagal memuat PDF {source}: {str(e)}"
            raise RuntimeError(error_msg) from e

class PDFDirectoryLoader:
    def __init__(self):
        self.single_loader = PDFFileLoader()
    def load(self, source: str, recursive: bool = True) -> List[Document]:
        documents: List[Document] = []
        search_pattern = os.path.join(source, "*.pdf")
        # Ambil list semua file path
        pdf_files = glob.glob(search_pattern)
        if not pdf_files:
            return []
        # Loop setiap file di dalam direktori
        for file_path in pdf_files:
            # Skip hidden files
            if "/." in file_path or "\\." in file_path:
                continue
            # Panggil loader satuan
            docs = self.single_loader.load(file_path)
            documents.extend(docs)
        return documents

## Splitter

In [25]:
import re
import uuid
from collections import defaultdict
from typing import List, Dict, Tuple
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_core.documents import Document

class ParentChildSplitter:
    def __init__(self, parent_chunk_size: int, parent_chunk_overlap: int, child_chunk_size: int, child_chunk_overlap: int):
        self.parent_splitter = RecursiveCharacterTextSplitter(chunk_size=parent_chunk_size,
                                                            chunk_overlap=parent_chunk_overlap,
                                                            separators=["\n", ". ", ", ", " ", ""])
        self.child_splitter = RecursiveCharacterTextSplitter(chunk_size=child_chunk_size,
                                                            chunk_overlap=child_chunk_overlap,
                                                            separators=["\n\n", "\n", ". ", ", ", " ", ""])
                                                            
    def _preprocess_text(self, chunk_text: str) -> Tuple[str, List[str]]:
        # Ekstrak URL
        urls = re.findall(r'https?://\S+', chunk_text)
        # Replace URL dengan placeholder LINK
        text_clean = re.sub(r'https?://\S+', '[LINK]', chunk_text)
        # Hapus karakter yang tidak diinginkan tapi
        text_clean = re.sub(r'[^\w\s\[\],.:()%=+\-/]', '', text_clean)
        # Rapikan newline ganda > 2 menjadi 1
        text_clean = re.sub(r'\n{2,}', '\n', text_clean)
        # Rapikan spasi berlebihan
        text_clean = re.sub(r'[ ]{2,}', ' ', text_clean)
        # Fix multiple blank lines -> single newline
        text_clean = re.sub(r'\n\s*\n+', '\n', text_clean)
        return text_clean.strip(), urls

    def split_documents(self, documents: List[Document]) -> Dict[str, List[Document]]:
        # Grouping by Source/File Path
        docs_by_source = defaultdict(list)
        for doc in documents:
            source_key = doc.metadata.get("file_path")
            docs_by_source[source_key].append(doc)
        all_parent_docs = []
        all_child_docs = []
        # Proses per File Source
        for source_name, doc_group in docs_by_source.items():
            # Gabungkan semua text dari satu file (merge pages)
            combined_text = "\n".join([d.page_content for d in doc_group])
            # Ambil metadata dasar dari halaman pertama
            base_metadata = doc_group[0].metadata.copy() if doc_group else {}
            # Buat satu dokumen besar sementara
            combined_doc = Document(page_content=combined_text, metadata=base_metadata)
            # Generate Parent Chunks
            parent_chunks = self.parent_splitter.split_documents([combined_doc])
            for p_doc in parent_chunks:
                # Cleaning Text
                clean_text, specific_urls = self._preprocess_text(p_doc.page_content)
                # Generate Parent ID (UUID)
                parent_id = str(uuid.uuid4())
                # Update Metadata Parent
                parent_meta = p_doc.metadata.copy()
                parent_meta.update({
                    "doc_id": parent_id,
                    "type": "parent",
                    "source": source_name,
                    "urls": specific_urls, 
                })
                final_parent_doc = Document(page_content=clean_text, metadata=parent_meta)
                all_parent_docs.append(final_parent_doc)
                # Generate Child Chunk dari Parent Chunk
                child_texts = self.child_splitter.split_text(clean_text)
                for c_text in child_texts:
                    child_meta = {
                        "parent_id": parent_id,
                        "type": "child",
                        "source": source_name,
                    }
                    child_doc = Document(page_content=c_text, metadata=child_meta)
                    all_child_docs.append(child_doc)

        return {"parents": all_parent_docs,
                "children": all_child_docs}

## Embedding

In [26]:
import os
from functools import lru_cache
from langchain_core.embeddings import Embeddings
from langchain_aws import BedrockEmbeddings

@lru_cache(maxsize=1)
def get_embeddings(provider: str = EMBEDDING_PROVIDER, model_id: str = EMBEDDING_MODEL_ID) -> Embeddings:
    if provider == "aws":
        return BedrockEmbeddings(model_id=model_id,     
                                region_name=os.getenv("AWS_REGION"))
    else:
        raise ValueError(f"Provider embedding tidak didukung")


## Document Serialization

In [27]:
from typing import Any
from langchain_core.documents import Document
from langchain_core.load import dumps, loads

def encode(doc: Document) -> bytes:
    return dumps(doc).encode("utf-8")

def decode(data: bytes) -> Document:
    if isinstance(data, str):
        data = data.encode("utf-8")
    return loads(data.decode("utf-8"))

def encode_key(key: Any) -> str:
    if isinstance(key, bytes):
        return key.decode("utf-8")
    return str(key)

## VectorStore

In [28]:
from langchain_community.vectorstores import Chroma
from langchain_core.embeddings import Embeddings

class VectorStore:
    @staticmethod
    def get_vector_store(embedding_model: Embeddings, collection_name: str = COLLECTION_NAME, persist_directory: str = PERSIST_DIR):
        return Chroma(embedding_function=embedding_model, 
                    persist_directory=persist_directory,
                    collection_name=collection_name,
                    collection_metadata={"hnsw:space": "cosine"})

## Docstore

In [29]:
import logging
from langchain_community.storage import RedisStore
from langchain.storage import EncoderBackedStore

logger = logging.getLogger(__name__)

class DocStore:
    @staticmethod
    def get_doc_store(collection_name: str = COLLECTION_NAME, redis_url: str = REDIS_URL):
        namespace = f"docstore_{collection_name}"
        try:
            logger.info(f"Connecting to Redis DocStore {redis_url}, Namespace: {namespace})")
            raw_store = RedisStore(redis_url=redis_url, namespace=namespace)
            return EncoderBackedStore(store=raw_store,
                                    key_encoder=encode_key,
                                    value_serializer=encode,
                                    value_deserializer=decode)
        except Exception as e:
            logger.error(f"Gagal koneksi ke Redis {e}")
            raise e

## Text Preprocessing

In [30]:
import os
import re
import logging
from typing import Set, List

logger = logging.getLogger(__name__)

def load_stopwords(path: str) -> Set[str]:
    try:
        if os.path.exists(path):
            with open(path, "r", encoding="utf-8") as f:
                stopwords = {line.strip().lower() for line in f if line.strip()}
            logger.info("Stopwords berhasil di load")
            return stopwords
        else:
            logger.warning(f"Stopword file tidak ditemukan {path}")
            return set()
    except Exception as e:
        logger.error(f"Gagal load stopwords: {e}")
        return set()

def preprocess_text(text: str, stopwords: Set[str]) -> List[str]:
    text = text.lower()
    tokens = re.findall(r"[a-z0-9]+(?:-[a-z0-9]+)*", text)
    if stopwords:
        tokens = [t for t in tokens if t not in stopwords]
    return tokens

## Parent-Child Indexer

In [31]:
import uuid
import logging
import pickle
from langchain_community.retrievers import BM25Retriever
from typing import List, Dict
from functools import partial
from langchain_core.documents import Document

logger = logging.getLogger(__name__)

class ParentChildIndexer:
    def __init__(self):
        self.embedding_model = get_embeddings()
        self.collection_name = COLLECTION_NAME
        self.vector_store = VectorStore.get_vector_store(
            embedding_model=self.embedding_model,
            collection_name=self.collection_name
        )
        self.doc_store = DocStore.get_doc_store(collection_name=self.collection_name)

    def index_documents(self, split_result: Dict[str, List[Document]]):
        parent_docs = split_result.get("parents", [])
        child_docs = split_result.get("children", [])
        logger.info(f"Indexing {len(parent_docs)} Parents & {len(child_docs)} Child")
        # SIMPAN PARENTS KE REDIS
        if parent_docs:
            try:
                parent_key_value_pairs = []
                for doc in parent_docs:
                    doc_id = doc.metadata.get("doc_id") or str(uuid.uuid4())
                    doc.metadata["doc_id"] = doc_id
                    parent_key_value_pairs.append((doc_id, doc))
                self.doc_store.mset(parent_key_value_pairs)
                logger.info(f"Berhasil menyimpan {len(parent_docs)} Parent Chunk ke Redis.")
            except Exception as e:
                logger.error(f"Error simpan Parent Chunk ke Redis: {e}")
                raise e
        # SIMPAN CHILDREN KE CHROMA
        if child_docs:
            try:
                valid_children = [d for d in child_docs if "parent_id" in d.metadata]
                self.vector_store.add_documents(valid_children)
                logger.info(f"Berhasil menyimpan {len(valid_children)} Child Chunk ke ChromaDB.")
            except Exception as e:
                logger.error(f"Error simpan Children Chunk ke Chroma: {e}")
                raise e
        return {"Parents indexed": len(parent_docs), "Children indexed": len(child_docs)}

class BM25Indexer:
    def __init__(self):
        self.vector_store = VectorStore.get_vector_store(
            collection_name=COLLECTION_NAME,
            embedding_model=get_embeddings()
        )
    def build_and_save_index(self):
        # 1. Load stopwords
        stopwords = load_stopwords(STOPWORDS_PATH)
        # 2. Fetch documents child dari vectordb
        try:
            result = self.vector_store.get(
                where={"type": "child"},
                include=["documents", "metadatas"]
            )
        except Exception as e:
            logger.error(f"Gagal fetch data dari Chroma: {e}")
            raise e
        raw_docs = result.get("documents", [])
        raw_metadatas = result.get("metadatas", [])
        if not raw_docs:
            logger.warning("Tidak ditemukan dokumen 'child' di Chroma")
            return
        logger.info(f"Indexing {len(raw_docs)} dokumen")
        # 3. Convert ke LangChain Document
        documents = [
            Document(page_content=text, metadata=meta)
            for text, meta in zip(raw_docs, raw_metadatas)
        ]
        # 4. Build BM25 retriever
        bm25_retriever = BM25Retriever.from_documents(documents, preprocess_func=partial(preprocess_text, stopwords=stopwords))
        # 5. Save index
        try:
            with open(BM25_INDEX_PATH, "wb") as f:
                pickle.dump(bm25_retriever, f)
            logger.info(f"BM25 Index berhasil disimpan di: {BM25_INDEX_PATH}")
        except Exception as e:
            logger.error(f"Gagal menyimpan file pickle: {e}")
            raise e

## Ingestion Pipeline

In [32]:
import logging

logger = logging.getLogger(__name__)

def run_ingestion_pipeline(folder_path: str):
    print("Loading Data")
    loader = PDFDirectoryLoader()
    raw_docs = loader.load(folder_path)
    print("Splitting Dokumen")
    splitter = ParentChildSplitter(
        parent_chunk_size=PARENT_CHUNK_SIZE,
        parent_chunk_overlap=PARENT_CHUNK_OVERLAP,
        child_chunk_size=CHILD_CHUNK_SIZE,
        child_chunk_overlap=CHILD_CHUNK_OVERLAP
    )
    split_result = splitter.split_documents(raw_docs)
    print("Indexing Document Chunks")
    vector_indexer = ParentChildIndexer()
    result = vector_indexer.index_documents(split_result)
    print(f"{result}")
    print("Building BM25 Index")
    keyword_indexer = BM25Indexer()
    keyword_indexer.build_and_save_index()
    print("Ingestion DONE")

In [33]:
run_ingestion_pipeline("MajaAI_Data")

Loading Data
Splitting Dokumen
Indexing Document Chunks
{'Parents indexed': 90, 'Children indexed': 631}
Building BM25 Index
Ingestion DONE
