Init Agentic Chunking

In [2]:
# Import required libraries
import os
import json
import re
from IPython.display import Markdown
from phi.agent import Agent
from phi.model.ollama import Ollama
from langchain.embeddings import HuggingFaceEmbeddings
from langchain.vectorstores import FAISS
from langchain_core.documents import Document
from utils.document_processor import DocumentProcessor  

# Define constant paths and configuration
DATA_PATH = "./data"               # Directory containing input documents
INDEX_PATH = "faiss_index"         # Directory for storing FAISS index
CHUNKED_DATA_PATH = "./chunked_data"   # Directory for storing chunked text
METADATA_PATH = "./metadata"           # Directory for storing document metadata
OLLAMA_MODEL = "llama3.2"             # Specify the LLM model to use

# Create necessary directories if they don't exist
os.makedirs(CHUNKED_DATA_PATH, exist_ok=True)
os.makedirs(METADATA_PATH, exist_ok=True)

# Initialize the language model and processing components
llm = Ollama(id=OLLAMA_MODEL)                                     # Initialize LLM
embedding_model = HuggingFaceEmbeddings(model_name="all-MiniLM-L6-v2")  # Initialize embedding model
docs = DocumentProcessor()                                         # Initialize document processor
agent = Agent(model=llm, show_tool_calls=True, markdown=True)     # Initialize agent for text processing

# Configure chunking parameters
CHUNK_SIZE = 1200      # Size of each text chunk in characters
MIN_CHUNK_SIZE = 500   # Minimum chunk size before merging
MAX_CHUNKS = 30        # Maximum number of chunks per document

Chunking

In [3]:
# List to store processed documents
extracted_docs = []

def clean_text(text):
    """
    Clean input text by removing excess whitespace and non-ASCII characters
    
    Args:
        text (str): Input text to clean
    Returns:
        str: Cleaned text
    """
    text = re.sub(r'\s+', ' ', text).strip()  # Replace multiple spaces with single space
    text = re.sub(r'[^\x00-\x7F]+', ' ', text)  # Remove non-ASCII characters
    return text

def clean_agent_output(text):
    """
    Clean the output from the agent by removing unnecessary markdown and formatting
    
    Args:
        text (str): Agent output text to clean
    Returns:
        str: Cleaned text without markdown formatting
    """
    text = re.sub(r'\n?###.*?\n', '\n', text)  # Remove markdown headers
    text = re.sub(r'\n?\*\*\*.*?\n', '\n', text)  # Remove asterisk separators
    text = re.sub(r'\n?-{3,}\n?', '\n', text)  # Remove dash separators
    text = re.sub(r'(\s*-{2,}\s*)', ' ', text)  # Remove double dashes
    text = re.sub(r'(\s*\*{2,}\s*)', ' ', text)  # Remove bold markers
    text = re.sub(r'(\s*\*\s*)', ' ', text)  # Remove single asterisks
    text = re.sub(r'(\s*-\s*)', ' ', text)  # Remove single dashes
    text = re.sub(r'\n{2,}', '\n\n', text).strip()  # Normalize line breaks
    return text

# Main document processing loop
for filename in os.listdir(DATA_PATH):
    # Check if file has valid extension
    valid_extensions = ('.pdf', '.docx', '.txt')
    if not filename.lower().endswith(valid_extensions):
        continue

    filepath = os.path.join(DATA_PATH, filename)

    try:
        # Read and process the document
        with open(filepath, "rb") as f:
            document = f.read()
            result = docs.process_document(document, filename)

        # Skip if document processing failed
        if not result or len(result) < 4:
            print(f"[WARNING] Gagal memproses {filename}, melewati file ini.")
            continue

        # Clean and prepare text for chunking
        plain_text = clean_text(result[3])  
        print(f"[INFO] {filename} - Panjang teks sebelum pemrosesan: {len(plain_text)}")

        # Process text in chunks using the agent
        structured_text = ""
        start_idx = 0
        chunk_count = 0

        # Iterate through text in chunks
        while start_idx < len(plain_text) and chunk_count < MAX_CHUNKS:
            chunk_text = plain_text[start_idx:start_idx + CHUNK_SIZE]
            response = agent.run(
                f"Split the following text into meaningful segments ensuring logical separation:\n{chunk_text}",
                max_tokens=8000
            )
            
            # Handle different response types from agent
            if isinstance(response, str):
                structured_text += clean_agent_output(response) + "\n\n"
            elif isinstance(response, dict):
                structured_text += clean_agent_output(response.get("text", "")) + "\n\n"
            else:
                structured_text += clean_agent_output(getattr(response, "content", str(response))) + "\n\n"
            
            start_idx += CHUNK_SIZE
            chunk_count += 1

        # Process and optimize chunks
        structured_text = structured_text.strip()
        chunked_texts = structured_text.split("\n\n")

        # Combine small chunks to meet minimum size requirement
        optimized_chunks = []
        temp_chunk = ""

        for chunk in chunked_texts:
            chunk = chunk.strip()
            if len(chunk) < MIN_CHUNK_SIZE:
                temp_chunk += " " + chunk
            else:
                if temp_chunk:
                    optimized_chunks.append(temp_chunk.strip())
                    temp_chunk = ""
                optimized_chunks.append(chunk)

        if temp_chunk:
            optimized_chunks.append(temp_chunk.strip())

        # Create chunk data structure with metadata
        chunk_data = [{"chunk_id": i+1, "text": chunk.strip()} 
                      for i, chunk in enumerate(optimized_chunks[:MAX_CHUNKS]) if chunk.strip()]

        # Create metadata for the document
        metadata = {
            "filename": filename,
            "total_chunks": len(chunk_data),
            "total_length": len(plain_text)
        }

        # Create Document objects for vector store
        extracted_docs.extend([
            Document(page_content=chunk["text"], metadata={"chunk_id": chunk["chunk_id"], **metadata}) 
            for chunk in chunk_data
        ])

        # Save chunked text to file
        chunked_filepath = os.path.join(CHUNKED_DATA_PATH, f"chunked_{filename}.txt")
        with open(chunked_filepath, "w", encoding="utf-8") as chunked_file:
            for chunk in chunk_data:
                chunked_file.write(f"Chunk {chunk['chunk_id']}:\n")
                chunked_file.write(f"{chunk['text']}\n")
                chunked_file.write("\n---\n\n")  

        # Save metadata to separate JSON file
        metadata_filepath = os.path.join(METADATA_PATH, f"metadata_{filename}.json")
        with open(metadata_filepath, "w", encoding="utf-8") as metadata_file:
            json.dump(metadata, metadata_file, indent=4)

        print(f"[INFO] Total chunks generated for {filename}: {len(chunk_data)}")

    except Exception as e:
        print(f"[ERROR] Error processing {filename}: {e}")

[INFO] Dokumen (1).pdf - Panjang teks sebelum pemrosesan: 3843
[INFO] Total chunks generated for Dokumen (1).pdf: 3


Save Chunk

In [4]:
# Save processed documents to FAISS vector store if any exist
if extracted_docs:
    vector_store = FAISS.from_documents(extracted_docs, embedding_model)
    vector_store.save_local(INDEX_PATH)
    print("[SUCCESS] Proses chunking selesai. Hasilnya disimpan dalam 'chunked_data' dan metadata di 'metadata'.")
else:
    print("[INFO] Tidak ada dokumen yang berhasil diproses.")

[SUCCESS] Proses chunking selesai. Hasilnya disimpan dalam 'chunked_data' dan metadata di 'metadata'.
