In [None]:
import os
import json
import shutil
from tqdm import tqdm
from pydantic import ValidationError

# --- Core Application Imports ---
from config.config import Config
from rfiprocessor.utils.logger import get_logger
from rfiprocessor.utils.wlak_dir import list_all_file_paths
from rfiprocessor.services.markdown_converter import MarkdownConverter, ProcessorType
from rfiprocessor.core.agents.document_classifier import DocumentClassifierAgent
from rfiprocessor.core.agents.rfi_parser import RfiParserAgent
from rfiprocessor.services.chunker import ChunkerService
from rfiprocessor.services.vector_store_service import VectorStoreService


# --- Database Imports ---
from rfiprocessor.db.database import init_db, get_db_session
from rfiprocessor.services.db_handler import DatabaseHandler
from rfiprocessor.db.db_models import Chunk, IngestionStatus
from rfiprocessor.models.data_models import RFIJson
from rfiprocessor.services.db_handler import DatabaseHandler
from langchain_core.documents import Document
from rfiprocessor.db import db_models

# --- Initial Setup ---
config = Config()
logger = get_logger(__name__)

def run_markdown_conversion_pipeline():
    """
    Scans for new files, converts them to markdown, and tracks progress in the database.
    """

    # --- Step 1: Markdown Conversion ---
    logger.info("--- Starting Markdown Conversion Pipeline ---")
    
    # Initialize services
    converter = MarkdownConverter()
    classifier = DocumentClassifierAgent()
    rfi_parser = RfiParserAgent()
    chunker = ChunkerService(config.CHUNK_SIZE, config. CHUNK_OVERLAP)
    vector_store_service = VectorStoreService()

    batch_size: int = 100
    
    # Get a database session and initialize the handler
    db_session_generator = get_db_session()
    db_session = next(db_session_generator)
    try:
        db_handler = DatabaseHandler(db_session)

        # 1. Scan the incoming directory for all files
        all_files = list_all_file_paths(config.INCOMING_DATA_PATH)
        
        # 2. Register all found files in the database if they don't exist
        logger.info(f"Found {len(all_files)} files. Registering new files in the database...")
        for file_path in all_files:
            # This will add the document if new, or get the existing one.
            db_handler.add_or_get_document(source_filepath=file_path)
        
        # 3. Fetch only the documents that are pending conversion
        pending_docs = db_handler.get_documents_by_status(IngestionStatus.PENDING)
        
        # Filter out unsupported files
        valid_docs = [
            doc for doc in pending_docs 
            if any(doc.source_filename.lower().endswith(ext) for ext in config.VALID_FILE_EXTNS)
        ]
        
        if not valid_docs:
            logger.info("No new documents to convert. Pipeline finished.")
            return

        logger.info(f"Found {len(valid_docs)} new documents to process.")

        # 4. Process each pending document
        for doc in tqdm(valid_docs, desc="Converting files to Markdown"):
            try:
                logger.info(f"Processing document: {doc.source_filename} (ID: {doc.id})")
                
                # Determine which processor to use
                processor_to_use = ProcessorType.MARKITDOWN
                if any(doc.source_filename.lower().endswith(ext) for ext in config.UNSTRD_FILE_EXTNS):
                    processor_to_use = ProcessorType.UNSTRUCTURED

                # Convert the file to markdown
                markdown_content, markdown_path = converter.convert_to_markdown(
                    file_path=doc.source_filepath,
                    processor=processor_to_use
                )
                
                # Move the original raw file to the processed directory
                destination_path = os.path.join(config.PROCESSED_DATA_PATH, doc.source_filename)
                os.makedirs(os.path.dirname(destination_path), exist_ok=True)
                shutil.move(doc.source_filepath, destination_path)
                
                # Update the document record in the database with the new status and paths
                updates = {
                    "markdown_filepath": markdown_path,
                    "processed_filepath": destination_path,
                    "ingestion_status": IngestionStatus.MARKDOWN_CONVERTED,
                    "error_message": None # Clear any previous errors
                }
                db_handler.update_document(doc.id, updates)
                logger.info(f"Successfully converted and moved document ID {doc.id}.")

            except Exception as e:
                logger.error(f"Error processing document ID {doc.id} ('{doc.source_filename}'): {e}", exc_info=True)
                # Update the document record to reflect the failure
                db_handler.update_document(
                    doc.id,
                    {
                        "ingestion_status": IngestionStatus.FAILED,
                        "error_message": str(e)
                    }
                )

    finally:
        # Ensure the database session is closed
        db_session.close()
        logger.info("--- Markdown Conversion Pipeline Finished ---")

    """
    Scans for markdown_converted files, classify them to RFI/RFO or Supporting Document with grades, and tracks progress in the database.
    """

    # Get a database session and initialize the handler
    db_session_generator = get_db_session()
    db_session = next(db_session_generator)
    try:
        db_handler = DatabaseHandler(db_session)

        # --- Step 2: Document Classification ---
        logger.info("--- Starting Document Classification Step ---")
        docs_to_classify = db_handler.get_documents_by_status(IngestionStatus.MARKDOWN_CONVERTED)
        
        if not docs_to_classify:
            logger.info("No new documents to classify.")
        else:
            logger.info(f"Found {len(docs_to_classify)} documents to classify.")

        for doc in tqdm(docs_to_classify, desc="Classifying documents"):
            try:
                logger.info(f"Classifying document: {doc.source_filename} (ID: {doc.id})")
                
                # Read the markdown content from the file
                with open(doc.markdown_filepath, 'r', encoding='utf-8') as f:
                    markdown_content = f.read()
                
                # Call the classifier agent
                classification = classifier.classify(markdown_content)
                
                # Update the document in the database
                updates = {
                    "document_type": classification.get("document_type"),
                    "document_grade": classification.get("document_grade"),
                    "ingestion_status": IngestionStatus.CLASSIFIED,
                    "error_message": None
                }
                db_handler.update_document(doc.id, updates)
                logger.info(f"Successfully classified document ID {doc.id}.")

            except Exception as e:
                logger.error(f"Error classifying document ID {doc.id}: {e}", exc_info=True)
                db_handler.update_document(
                    doc.id,
                    {"ingestion_status": IngestionStatus.FAILED, "error_message": f"Classification failed: {e}"}
                )

    finally:
        db_session.close()
        logger.info("--- Ingestion Pipeline Finished ---")

    """
    Scans for classified RFI/RFP files and processes them through the RFI parser pipeline.
    """
    logger.info("--- Starting RFI Parsing Pipeline ---")
    
    db_session_generator = get_db_session()
    db_session = next(db_session_generator)
    try:
        db_handler = DatabaseHandler(db_session)

        # --- Step 3: RFI/RFP Parsing ---
        logger.info("--- Starting RFI/RFP Parsing Step ---")
        docs_to_parse = db_handler.get_documents_by_status(IngestionStatus.CLASSIFIED)

        if not docs_to_parse:
            logger.info("No new classified documents to parse.")
        else:
            logger.info(f"Found {len(docs_to_parse)} documents to potentially parse.")

        for doc in tqdm(docs_to_parse, desc="Parsing RFI/RFP documents"):
            try:
                # Only process documents that were classified as RFI/RFP
                if doc.document_type != "RFI/RFP":
                    logger.info(f"Skipping parsing for doc ID {doc.id} (type: {doc.document_type}). Marking as parsed.")
                    db_handler.update_document(doc.id, {"ingestion_status": IngestionStatus.PARSED})
                    continue

                logger.info(f"Parsing RFI/RFP document: {doc.source_filename} (ID: {doc.id})")
                
                # Read the markdown content
                with open(doc.markdown_filepath, 'r', encoding='utf-8') as f:
                    markdown_content = f.read()
                
                # Call the RFI Parser Agent
                parsed_data = rfi_parser.parse(markdown_content)
                
                # --- SAVE TO JSON DATA FOLDER ---
                output_dir = config.PARSED_JSON_PATH
                os.makedirs(output_dir, exist_ok=True)
                json_filename = os.path.splitext(doc.source_filename)[0] + ".json"
                output_path = os.path.join(output_dir, json_filename)
                with open(output_path, "w", encoding="utf-8") as f:
                    json.dump(parsed_data, f, indent=2, ensure_ascii=False)
                logger.info(f"Saved parsed output to {output_path}")
                # --- END SAVE SECTION ---

                # Validate the structured output with Pydantic
                try:
                    validated_data = RFIJson.model_validate(parsed_data)
                    logger.info(f"Successfully validated JSON structure for doc ID {doc.id}.")
                except ValidationError as ve:
                    # If validation fails, log the specific error and mark the doc as failed
                    error_details = f"Pydantic validation failed for doc ID {doc.id}: {ve}"
                    logger.error(error_details)
                    db_handler.update_document(
                        doc.id,
                        {"ingestion_status": IngestionStatus.FAILED, "error_message": error_details}
                    )
                    continue  # Skip to the next document
                
                # Update the document record with the JSON payload and new status
                updates = {
                    "rfi_json_payload": validated_data.model_dump(),  # Use the validated data
                    "ingestion_status": IngestionStatus.PARSED,
                    "error_message": None
                }
                db_handler.update_document(doc.id, updates)
                logger.info(f"Successfully parsed and stored JSON for document ID {doc.id}.")

            except Exception as e:
                error_msg = f"Error parsing document ID {doc.id}: {e}"
                logger.error(error_msg, exc_info=True)
                db_handler.update_document(
                    doc.id,
                    {"ingestion_status": IngestionStatus.FAILED, "error_message": error_msg}
                )

    finally:
        db_session.close()
        logger.info("--- Ingestion Pipeline Finished ---")

    """
    Scans for PARSED files and chunk them.
    """
    logger.info("--- Starting Chinking Pipeline ---")

    db_session_generator = get_db_session()
    db_session = next(db_session_generator)
    try:
        db_handler = DatabaseHandler(db_session)
        # --- Step 4: Document Chunking ---
        logger.info("--- Starting Document Chunking Step ---")
        docs_to_chunk = db_handler.get_documents_by_status(IngestionStatus.PARSED)

        if not docs_to_chunk:
            logger.info("No new parsed documents to chunk.")
        else:
            logger.info(f"Found {len(docs_to_chunk)} documents to chunk.")

        # Get the maximum chunk ID to start assigning new IDs
        max_chunk_id = db_session.query(func.max(Chunk.id)).scalar() or 0
        logger.info(f"Starting new chunk IDs from {max_chunk_id + 1}")
        current_chunk_id = max_chunk_id + 1

        for doc in tqdm(docs_to_chunk, desc="Chunking documents"):
            try:
                logger.info(f"Chunking document: {doc.source_filename} (ID: {doc.id})")

                if doc.document_type == "RFI/RFP" and doc.rfi_json_payload:
                    markdown_content = ""  # Not used for RFI/RFP
                    logger.info("Chunking as RFI/RFP (using JSON payload)...")

                elif doc.markdown_filepath:
                    with open(doc.markdown_filepath, 'r', encoding='utf-8') as f:
                        markdown_content = f.read()
                    logger.info("Chunking as supporting document (using markdown content)...")
                else:
                    logger.info("Skipping: Document is missing required data for chunking.")
                    continue

                # Call the chunker service
                chunks_data = chunker.create_chunks_for_document(doc, markdown_content)
                logger.info(f"Number of chunks: {len(chunks_data)}")

                if chunks_data:
                    # Assign new IDs to chunks starting from current_chunk_id
                    for chunk_data in chunks_data:
                        chunk_data['id'] = current_chunk_id
                        current_chunk_id += 1
                    db_handler.add_chunks_to_document(doc.id, chunks_data)
                    # Update the document status to CHUNKED
                    db_handler.update_document(doc.id, {"ingestion_status": IngestionStatus.CHUNKED})
                    logger.info(f"Chunked document ID {doc.id}: {len(chunks_data)} chunks (IDs {chunk_data['id'] - len(chunks_data) + 1} to {chunk_data['id']})")
                    logger.info(f"Successfully chunked and stored chunks for document ID {doc.id}.")
                    # Add the created chunks to the database
                    db_handler.add_chunks_to_document(doc.id, chunks_data)
                else:
                    logger.warning(f"No chunks were created for document ID {doc.id}. Moving to next stage.")
                    
            except Exception as e:
                error_msg = f"Error chunking document ID {doc.id}: {e}"
                logger.error(error_msg, exc_info=True)
                db_handler.update_document(
                    doc.id,
                    {"ingestion_status": IngestionStatus.FAILED, "error_message": error_msg}
                )

    finally:
        db_session.close()
        logger.info("--- Ingestion Chinking Finished ---")

        """
    Loads chunked documents from the database and adds them to the ChromaDB vector store.
    """
    logger.info("--- Starting Vector Store Pipeline ---")

    db_session_generator = get_db_session()
    db_session = next(db_session_generator)
    try:
        db_handler = DatabaseHandler(db_session)

        # Chunked Documents
        chunked_docs = db_handler.get_documents_by_status(IngestionStatus.CHUNKED)

        # --- Step: Load and Embed Chunks ---
        logger.info("--- Starting Vector Store Embedding Step ---")
        chunks_to_embed = db_session.query(Chunk).join(
            db_models.Document
        ).filter(
            db_models.Document.ingestion_status == IngestionStatus.CHUNKED
        ).all()

        if not chunks_to_embed:
            logger.info("No chunked documents to embed.")
            return

        logger.info(f"Found {len(chunks_to_embed)} chunks to embed.")

        # Convert chunks to LangChain Documents
        documents = []
        for chunk in tqdm(chunks_to_embed, desc="Preparing chunks for vector store"):
            try:
                document = Document(
                    page_content=chunk.chunk_text,
                    metadata=chunk.chunk_metadata,
                    id=str(chunk.id)
                )
                documents.append(document)
            except Exception as e:
                logger.error(f"Error preparing chunk ID {chunk.id}: {e}", exc_info=True)
                continue

        # Add documents to vector store in batches
        try:
            logger.info(f"Adding {len(documents)} chunks to vector store...")
            vector_ids = vector_store_service.add_documents(batch_size=batch_size)
            logger.info(f"Successfully added {len(vector_ids)} chunks to ChromaDB.")

            # Update document statuses to VECTORIZED
            for doc in chunked_docs:
                doc_id = int(doc.id)
                db_handler.update_document(
                    doc_id,
                    {"ingestion_status": IngestionStatus.VECTORIZED}
                )
        except Exception as e:
            logger.error(f"Error adding chunks to vector store: {e}", exc_info=True)
            for doc in documents:
                chunk_id = int(doc.id)
                db_handler.update_document(
                    chunk_id,
                    {"ingestion_status": IngestionStatus.FAILED, "error_message": str(e)}
                )

    finally:
        db_session.close()
        logger.info("--- Vector Store Pipeline Finished ---")


if __name__ == "__main__":
    logger.info("Application started.")
    
    # Ensure the database and tables are created before running the pipeline
    init_db()
    
    # Run the main processing function
    run_markdown_conversion_pipeline()
    
    logger.info("Application finished.")

In [None]:
from tqdm import tqdm

# --- Core Application Imports ---
from config.config import Config
from rfiprocessor.utils.logger import get_logger
from rfiprocessor.core.agents.document_classifier import DocumentClassifierAgent

# --- Database Imports ---
from rfiprocessor.db.database import init_db, get_db_session
from rfiprocessor.services.db_handler import DatabaseHandler
from rfiprocessor.db.db_models import IngestionStatus

# --- Initial Setup ---
config = Config()
logger = get_logger(__name__)

def run_document_classification_pipeline():
    """
    Scans for markdown_converted files, classify them to RFI/RFO or Supporting Document with grades, and tracks progress in the database.
    """

    classifier = DocumentClassifierAgent()

    # Get a database session and initialize the handler
    db_session_generator = get_db_session()
    db_session = next(db_session_generator)
    try:
        db_handler = DatabaseHandler(db_session)

        # --- Step 2: Document Classification ---
        logger.info("--- Starting Document Classification Step ---")
        docs_to_classify = db_handler.get_documents_by_status(IngestionStatus.MARKDOWN_CONVERTED)

        if not docs_to_classify:
            logger.info("No new documents to classify.")
        else:
            logger.info(f"Found {len(docs_to_classify)} documents to classify.")

        for doc in tqdm(docs_to_classify, desc="Classifying documents"):
            try:
                logger.info(f"Classifying document: {doc.source_filename} (ID: {doc.id})")
                
                # Read the markdown content from the file
                with open(doc.markdown_filepath, 'r', encoding='utf-8') as f:
                    markdown_content = f.read()
                
                # Call the classifier agent
                classification = classifier.classify(markdown_content)
                
                # Update the document in the database
                updates = {
                    "document_type": classification.get("document_type"),
                    "document_grade": classification.get("document_grade"),
                    "ingestion_status": IngestionStatus.CLASSIFIED,
                    "error_message": None
                }
                db_handler.update_document(doc.id, updates)
                logger.info(f"Successfully classified document ID {doc.id}.")

            except Exception as e:
                logger.error(f"Error classifying document ID {doc.id}: {e}", exc_info=True)
                db_handler.update_document(
                    doc.id,
                    {"ingestion_status": IngestionStatus.FAILED, "error_message": f"Classification failed: {e}"}
                )

    finally:
            db_session.close()
            logger.info("--- Ingestion Pipeline Finished ---")

if __name__ == "__main__":
    logger.info("Application started.")
    
    # Ensure the database and tables are created before running the pipeline
    init_db()
    
    # Run the main processing function
    run_document_classification_pipeline()
    
    logger.info("Application finished.")


In [None]:
from tqdm import tqdm
from pydantic import ValidationError

# --- Core Application Imports ---
from config.config import Config
from rfiprocessor.utils.logger import get_logger
from rfiprocessor.core.agents.rfi_parser import RfiParserAgent

# --- Database Imports ---
from rfiprocessor.db.database import init_db, get_db_session
from rfiprocessor.services.db_handler import DatabaseHandler
from rfiprocessor.db.db_models import IngestionStatus
from rfiprocessor.models.data_models import RFIJson

import os
import json

# --- Initial Setup ---
config = Config()
logger = get_logger(__name__)

def run_rfi_parser_pipeline():
    """
    Scans for classified RFI/RFP files and processes them through the RFI parser pipeline.
    """
    logger.info("--- Starting RFI Parsing Pipeline ---")
    rfi_parser = RfiParserAgent()
    
    db_session_generator = get_db_session()
    db_session = next(db_session_generator)
    try:
        db_handler = DatabaseHandler(db_session)

        # --- Step 3: RFI/RFP Parsing ---
        logger.info("--- Starting RFI/RFP Parsing Step ---")
        docs_to_parse = db_handler.get_documents_by_status(IngestionStatus.CLASSIFIED)

        if not docs_to_parse:
            logger.info("No new classified documents to parse.")
        else:
            logger.info(f"Found {len(docs_to_parse)} documents to potentially parse.")

        for doc in tqdm(docs_to_parse, desc="Parsing RFI/RFP documents"):
            try:
                # Only process documents that were classified as RFI/RFP
                if doc.document_type != "RFI/RFP":
                    logger.info(f"Skipping parsing for doc ID {doc.id} (type: {doc.document_type}). Marking as parsed.")
                    db_handler.update_document(doc.id, {"ingestion_status": IngestionStatus.PARSED})
                    continue

                logger.info(f"Parsing RFI/RFP document: {doc.source_filename} (ID: {doc.id})")
                
                # Read the markdown content
                with open(doc.markdown_filepath, 'r', encoding='utf-8') as f:
                    markdown_content = f.read()
                
                # Call the RFI Parser Agent
                parsed_data = rfi_parser.parse(markdown_content)
                
                # --- SAVE TO JSON DATA FOLDER ---
                output_dir = config.PARSED_JSON_PATH
                os.makedirs(output_dir, exist_ok=True)
                json_filename = os.path.splitext(doc.source_filename)[0] + ".json"
                output_path = os.path.join(output_dir, json_filename)
                with open(output_path, "w", encoding="utf-8") as f:
                    json.dump(parsed_data, f, indent=2, ensure_ascii=False)
                logger.info(f"Saved parsed output to {output_path}")
                # --- END SAVE SECTION ---

                # Validate the structured output with Pydantic
                try:
                    validated_data = RFIJson.model_validate(parsed_data)
                    logger.info(f"Successfully validated JSON structure for doc ID {doc.id}.")
                except ValidationError as ve:
                    # If validation fails, log the specific error and mark the doc as failed
                    error_details = f"Pydantic validation failed for doc ID {doc.id}: {ve}"
                    logger.error(error_details)
                    db_handler.update_document(
                        doc.id,
                        {"ingestion_status": IngestionStatus.FAILED, "error_message": error_details}
                    )
                    continue  # Skip to the next document
                
                # Update the document record with the JSON payload and new status
                updates = {
                    "rfi_json_payload": validated_data.model_dump(),  # Use the validated data
                    "ingestion_status": IngestionStatus.PARSED,
                    "error_message": None
                }
                db_handler.update_document(doc.id, updates)
                logger.info(f"Successfully parsed and stored JSON for document ID {doc.id}.")

            except Exception as e:
                error_msg = f"Error parsing document ID {doc.id}: {e}"
                logger.error(error_msg, exc_info=True)
                db_handler.update_document(
                    doc.id,
                    {"ingestion_status": IngestionStatus.FAILED, "error_message": error_msg}
                )

    finally:
        db_session.close()
        logger.info("--- Ingestion Pipeline Finished ---")

# The if __name__ == "__main__": block remains the same
if __name__ == "__main__":
    logger.info("Application started.")
    init_db()
    run_rfi_parser_pipeline()
    logger.info("Application finished.")

In [None]:
import os
import shutil
from tqdm import tqdm

# --- Core Application Imports ---
from config.config import Config
from rfiprocessor.utils.logger import get_logger
from rfiprocessor.services.chunker import ChunkerService

# --- Database Imports ---
from rfiprocessor.db.database import init_db, get_db_session
from rfiprocessor.services.db_handler import DatabaseHandler
from rfiprocessor.db.db_models import Document, IngestionStatus
from rfiprocessor.models.data_models import RFIJson

# --- Initial Setup ---
config = Config()
logger = get_logger(__name__)

def run_chunking_pipeline():
    """
    Scans for PARSED files and chunk them.
    """
    logger.info("--- Starting Chinking Pipeline ---")

    # Initialize services and agents
    chunker = ChunkerService(config.CHUNK_SIZE, config. CHUNK_OVERLAP)

    db_session_generator = get_db_session()
    db_session = next(db_session_generator)
    try:
        db_handler = DatabaseHandler(db_session)
        # --- Step 4: Document Chunking ---
        logger.info("--- Starting Document Chunking Step ---")
        docs_to_chunk = db_handler.get_documents_by_status(IngestionStatus.PARSED)

        if not docs_to_chunk:
            logger.info("No new parsed documents to chunk.")
        else:
            logger.info(f"Found {len(docs_to_chunk)} documents to chunk.")

        for doc in tqdm(docs_to_chunk, desc="Chunking documents"):
            try:
                logger.info(f"Chunking document: {doc.source_filename} (ID: {doc.id})")

                if doc.document_type == "RFI/RFP" and doc.rfi_json_payload:
                    markdown_content = ""  # Not used for RFI/RFP
                    logger.info("Chunking as RFI/RFP (using JSON payload)...")

                elif doc.markdown_filepath:
                    with open(doc.markdown_filepath, 'r', encoding='utf-8') as f:
                        markdown_content = f.read()
                    logger.info("Chunking as supporting document (using markdown content)...")
                else:
                    logger.info("Skipping: Document is missing required data for chunking.")
                    continue

                # Call the chunker service
                chunks_data = chunker.create_chunks_for_document(doc, markdown_content)
                logger.info(f"Number of chunks: {len(chunks_data)}")

                if chunks_data:
                    db_handler.add_chunks_to_document(doc.id, chunks_data)
                    # Update the document status to CHUNKED
                    db_handler.update_document(doc.id, {"ingestion_status": IngestionStatus.CHUNKED})
                    logger.info(f"Successfully chunked and stored chunks for document ID {doc.id}.")
                    # Add the created chunks to the database
                    db_handler.add_chunks_to_document(doc.id, chunks_data)
                else:
                    logger.warning(f"No chunks were created for document ID {doc.id}. Moving to next stage.")
                    
            except Exception as e:
                error_msg = f"Error chunking document ID {doc.id}: {e}"
                logger.error(error_msg, exc_info=True)
                db_handler.update_document(
                    doc.id,
                    {"ingestion_status": IngestionStatus.FAILED, "error_message": error_msg}
                )

    finally:
        db_session.close()
        logger.info("--- Ingestion Chinking Finished ---")

# The if __name__ == "__main__": block remains the same
if __name__ == "__main__":
    logger.info("Application started.")
    init_db()
    run_chunking_pipeline()
    logger.info("Application finished.")

In [None]:
import logging
from tqdm import tqdm

# --- Core Application Imports ---
from config.config import Config
from rfiprocessor.utils.logger import get_logger
from rfiprocessor.services.vector_store_service import VectorStoreService
from rfiprocessor.db.database import init_db, get_db_session
from rfiprocessor.db.db_models import Chunk, IngestionStatus
from rfiprocessor.services.db_handler import DatabaseHandler
from langchain_core.documents import Document
from rfiprocessor.db import db_models

# --- Initial Setup ---
config = Config()
logger = get_logger(__name__)

def run_vector_store_pipeline(batch_size: int = 100):
    """
    Loads chunked documents from the database and adds them to the ChromaDB vector store.
    """
    logger.info("--- Starting Vector Store Pipeline ---")

    # Initialize services
    vector_store_service = VectorStoreService()

    db_session_generator = get_db_session()
    db_session = next(db_session_generator)
    try:
        db_handler = DatabaseHandler(db_session)
        # --- Step: Load and Embed Chunks ---
        logger.info("--- Starting Vector Store Embedding Step ---")
        chunks_to_embed = db_session.query(Chunk).join(
            db_models.Document
        ).filter(
            db_models.Document.ingestion_status == IngestionStatus.CHUNKED
        ).all()

        if not chunks_to_embed:
            logger.info("No chunked documents to embed.")
            return

        logger.info(f"Found {len(chunks_to_embed)} chunks to embed.")

        # Convert chunks to LangChain Documents
        documents = []
        for chunk in tqdm(chunks_to_embed, desc="Preparing chunks for vector store"):
            try:
                document = Document(
                    page_content=chunk.chunk_text,
                    metadata=chunk.chunk_metadata,
                    id=str(chunk.id)
                )
                documents.append(document)
            except Exception as e:
                logger.error(f"Error preparing chunk ID {chunk.id}: {e}", exc_info=True)
                continue

        # Add documents to vector store in batches
        try:
            logger.info(f"Adding {len(documents)} chunks to vector store...")
            vector_ids = vector_store_service.add_documents(batch_size=batch_size)
            logger.info(f"Successfully added {len(vector_ids)} chunks to ChromaDB.")

            # Update document statuses to VECTORIZED
            for doc in documents:
                chunk_id = int(doc.id)
                db_handler.update_document(
                    chunk_id,
                    {"ingestion_status": IngestionStatus.VECTORIZED}
                )
        except Exception as e:
            logger.error(f"Error adding chunks to vector store: {e}", exc_info=True)
            for doc in documents:
                chunk_id = int(doc.id)
                db_handler.update_document(
                    chunk_id,
                    {"ingestion_status": IngestionStatus.FAILED, "error_message": str(e)}
                )

    finally:
        db_session.close()
        logger.info("--- Vector Store Pipeline Finished ---")

if __name__ == "__main__":
    logger.info("Application started.")
    init_db()
    run_vector_store_pipeline()
    logger.info("Application finished.")

In [None]:
import os
from chromadb import PersistentClient
from chromadb.utils import embedding_functions
from rfiprocessor.db.database import get_db_session
from rfiprocessor.db.db_models import Chunk

# --- Setup OpenAI Embedding Function ---
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
assert OPENAI_API_KEY, "Set your OPENAI_API_KEY in the environment!"

openai_ef = embedding_functions.OpenAIEmbeddingFunction(
    api_key=OPENAI_API_KEY,
    model_name="text-embedding-3-large"  # Use "text-embedding-ada-002" for cheaper/faster
)

# --- Setup ChromaDB Persistent Client ---
chroma_path = "data/vector_store/chroma_db"
client = PersistentClient(path=chroma_path)
collection = client.get_or_create_collection("chunks", embedding_function=openai_ef)

# --- Load all chunks from DB ---
db_session_generator = get_db_session()
db_session = next(db_session_generator)
chunks = db_session.query(Chunk).all()
print(f"Loaded {len(chunks)} chunks from DB.")

# --- Batch and Add Chunks to Vector DB ---
BATCH_SIZE = 100
for i in range(0, len(chunks), BATCH_SIZE):
    batch = chunks[i:i+BATCH_SIZE]
    documents = [c.chunk_text for c in batch]
    ids = [str(c.id) for c in batch]
    metadatas = [c.chunk_metadata for c in batch]
    collection.add(
        documents=documents,
        ids=ids,
        metadatas=metadatas
    )
    print(f"Added batch {i//BATCH_SIZE + 1} ({len(documents)} chunks)")

print("All chunks embedded and stored in ChromaDB!")

In [None]:
import os
import shutil
from tqdm import tqdm

# --- Core Application Imports ---
from config.config import Config
from rfiprocessor.utils.logger import get_logger
from rfiprocessor.core.agents.rfi_parser import RfiParserAgent

# --- Database Imports ---
from rfiprocessor.db.database import init_db, get_db_session
from rfiprocessor.services.db_handler import DatabaseHandler
from rfiprocessor.db.db_models import IngestionStatus
from rfiprocessor.models.data_models import RFIJson

# --- Initial Setup ---
config = Config()
logger = get_logger(__name__)


def run_chunking_pipeline():
    """
    Scans for new files and processes them through the ingestion pipeline.
    """
    logger.info("--- Starting Ingestion Pipeline ---")

    # Initialize services and agents
    chunker = ChunkerService(config.CHUNK_SIZE, config. CHUNK_OVERLAP)

    db_session_generator = get_db_session()
    db_session = next(db_session_generator)
    try:
        db_handler = DatabaseHandler(db_session)
        # --- Step 4: Document Chunking ---
        logger.info("--- Starting Document Chunking Step ---")
        docs_to_chunk = db_handler.get_documents_by_status(IngestionStatus.PARSED)

        if not docs_to_chunk:
            logger.info("No new parsed documents to chunk.")
        else:
            logger.info(f"Found {len(docs_to_chunk)} documents to chunk.")

        for doc in tqdm(docs_to_chunk, desc="Chunking documents"):
            try:
                logger.info(f"Chunking document: {doc.source_filename} (ID: {doc.id})")
                
                # We need the markdown content for supporting docs.
                # RFI docs don't need it for chunking, but it's cleaner to always have it.
                with open(doc.markdown_filepath, 'r', encoding='utf-8') as f:
                    markdown_content = f.read()
                    
                # Call the chunker service
                chunks_data = chunker.create_chunks_for_document(doc, markdown_content)
                
                if not chunks_data:
                    logger.warning(f"No chunks were created for document ID {doc.id}. Moving to next stage.")
                else:
                    # Add the created chunks to the database
                    db_handler.add_chunks_to_document(doc.id, chunks_data)
                
                # Update the document status to CHUNKED
                db_handler.update_document(doc.id, {"ingestion_status": IngestionStatus.CHUNKED})
                logger.info(f"Successfully chunked and stored chunks for document ID {doc.id}.")

            except Exception as e:
                error_msg = f"Error chunking document ID {doc.id}: {e}"
                logger.error(error_msg, exc_info=True)
                # db_handler.update_document(
                #     doc.id,
                #     {"ingestion_status": IngestionStatus.FAILED, "error_message": error_msg}
                # )

    finally:
        db_session.close()
        logger.info("--- Ingestion Pipeline Finished ---")

# The if __name__ == "__main__": block remains the same
if __name__ == "__main__":
    logger.info("Application started.")
    init_db()
    run_ingestion_pipeline()
    logger.info("Application finished.")

In [1]:
import os
import json
import shutil
from tqdm import tqdm
from pydantic import ValidationError
from contextlib import contextmanager
from typing import List, Dict, Any, Generator
from sqlalchemy import func

# --- Core Application Imports ---
from config.config import Config
from rfiprocessor.utils.logger import get_logger
from rfiprocessor.utils.wlak_dir import list_all_file_paths
from rfiprocessor.services.markdown_converter import MarkdownConverter, ProcessorType
from rfiprocessor.core.agents.document_classifier import DocumentClassifierAgent
from rfiprocessor.core.agents.rfi_parser import RfiParserAgent
from rfiprocessor.services.chunker import ChunkerService
from rfiprocessor.services.vector_store_service import VectorStoreService

# --- Database Imports ---
from rfiprocessor.db.database import init_db, get_db_session
from rfiprocessor.services.db_handler import DatabaseHandler
from rfiprocessor.db.db_models import Chunk, IngestionStatus
from rfiprocessor.models.data_models import RFIJson
from langchain_core.documents import Document
from rfiprocessor.db import db_models

# --- Initial Setup ---
config = Config()
logger = get_logger(__name__)


class IngestionPipeline:
    """Main pipeline class that orchestrates the document ingestion process."""
    
    def __init__(self, batch_size: int = 100):
        self.batch_size = batch_size
        self.converter = MarkdownConverter()
        self.classifier = DocumentClassifierAgent()
        self.rfi_parser = RfiParserAgent()
        self.chunker = ChunkerService(config.CHUNK_SIZE, config.CHUNK_OVERLAP)
        self.vector_store_service = VectorStoreService()
    
    @contextmanager
    def get_db_handler(self) -> Generator[DatabaseHandler, None, None]:
        """Context manager for database session handling."""
        db_session_generator = get_db_session()
        db_session = next(db_session_generator)
        try:
            yield DatabaseHandler(db_session)
        finally:
            db_session.close()
    
    def run_full_pipeline(self) -> None:
        """Runs the complete ingestion pipeline."""
        logger.info("Starting full ingestion pipeline")
        
        self._run_markdown_conversion()
        self._run_document_classification()
        self._run_rfi_parsing()
        self._run_document_chunking()
        self._run_vector_store_embedding()
        
        logger.info("Full ingestion pipeline completed")
    
    def _run_markdown_conversion(self) -> None:
        """Step 1: Convert documents to markdown format."""
        logger.info("--- Starting Markdown Conversion Pipeline ---")
        
        with self.get_db_handler() as db_handler:
            # Register all files in database
            self._register_files_in_database(db_handler)
            
            # Get pending documents
            pending_docs = db_handler.get_documents_by_status(IngestionStatus.PENDING)
            valid_docs = self._filter_valid_documents(pending_docs)
            
            if not valid_docs:
                logger.info("No new documents to convert. Pipeline finished.")
                return
            
            logger.info(f"Found {len(valid_docs)} new documents to process.")
            
            # Process each document
            for doc in tqdm(valid_docs, desc="Converting files to Markdown"):
                self._convert_single_document(db_handler, doc)
        
        logger.info("--- Markdown Conversion Pipeline Finished ---")
    
    def _run_document_classification(self) -> None:
        """Step 2: Classify documents as RFI/RFP or Supporting Document."""
        logger.info("--- Starting Document Classification Step ---")
        
        with self.get_db_handler() as db_handler:
            docs_to_classify = db_handler.get_documents_by_status(IngestionStatus.MARKDOWN_CONVERTED)
            
            if not docs_to_classify:
                logger.info("No new documents to classify.")
                return
            
            logger.info(f"Found {len(docs_to_classify)} documents to classify.")
            
            for doc in tqdm(docs_to_classify, desc="Classifying documents"):
                self._classify_single_document(db_handler, doc)
        
        logger.info("--- Document Classification Step Finished ---")
    
    def _run_rfi_parsing(self) -> None:
        """Step 3: Parse RFI/RFP documents."""
        logger.info("--- Starting RFI Parsing Pipeline ---")
        
        with self.get_db_handler() as db_handler:
            docs_to_parse = db_handler.get_documents_by_status(IngestionStatus.CLASSIFIED)
            
            if not docs_to_parse:
                logger.info("No new classified documents to parse.")
                return
            
            logger.info(f"Found {len(docs_to_parse)} documents to potentially parse.")
            
            for doc in tqdm(docs_to_parse, desc="Parsing RFI/RFP documents"):
                self._parse_single_document(db_handler, doc)
        
        logger.info("--- RFI Parsing Pipeline Finished ---")
    
    def _run_document_chunking(self) -> None:
        """Step 4: Chunk parsed documents."""
        logger.info("--- Starting Chunking Pipeline ---")
        
        db_session_generator = get_db_session()
        db_session = next(db_session_generator)
        try:
            db_handler = DatabaseHandler(db_session)
            docs_to_chunk = db_handler.get_documents_by_status(IngestionStatus.PARSED)
            
            if not docs_to_chunk:
                logger.info("No new parsed documents to chunk.")
                return
            
            logger.info(f"Found {len(docs_to_chunk)} documents to chunk.")
            
            # Get starting chunk ID
            max_chunk_id = db_session.query(func.max(Chunk.id)).scalar() or 0
            logger.info(f"Starting new chunk IDs from {max_chunk_id + 1}")
            current_chunk_id = max_chunk_id + 1
            
            for doc in tqdm(docs_to_chunk, desc="Chunking documents"):
                current_chunk_id = self._chunk_single_document(db_handler, doc, current_chunk_id)
        finally:
            db_session.close()
        
        logger.info("--- Chunking Pipeline Finished ---")
    
    def _run_vector_store_embedding(self) -> None:
        """Step 5: Embed chunks into vector store."""
        logger.info("--- Starting Vector Store Pipeline ---")
        
        db_session_generator = get_db_session()
        db_session = next(db_session_generator)
        try:
            db_handler = DatabaseHandler(db_session)
            chunked_docs = db_handler.get_documents_by_status(IngestionStatus.CHUNKED)
            
            # Get chunks to embed
            chunks_to_embed = db_session.query(Chunk).join(
                db_models.Document
            ).filter(
                db_models.Document.ingestion_status == IngestionStatus.CHUNKED
            ).all()
            
            if not chunks_to_embed:
                logger.info("No chunked documents to embed.")
                return
            
            logger.info(f"Found {len(chunks_to_embed)} chunks to embed.")
            
            # Convert and embed chunks
            documents = self._prepare_chunks_for_embedding(chunks_to_embed)
            self._embed_documents_to_vector_store(db_handler, documents, chunked_docs)
        finally:
            db_session.close()
        
        logger.info("--- Vector Store Pipeline Finished ---")
    
    def _register_files_in_database(self, db_handler: DatabaseHandler) -> None:
        """Register all found files in the database."""
        all_files = list_all_file_paths(config.INCOMING_DATA_PATH)
        logger.info(f"Found {len(all_files)} files. Registering new files in the database...")
        
        for file_path in all_files:
            db_handler.add_or_get_document(source_filepath=file_path)
    
    def _filter_valid_documents(self, pending_docs: List[Any]) -> List[Any]:
        """Filter out unsupported file types."""
        return [
            doc for doc in pending_docs 
            if any(doc.source_filename.lower().endswith(ext) for ext in config.VALID_FILE_EXTNS)
        ]
    
    def _convert_single_document(self, db_handler: DatabaseHandler, doc: Any) -> None:
        """Convert a single document to markdown."""
        try:
            logger.info(f"Processing document: {doc.source_filename} (ID: {doc.id})")
            
            # Determine processor type
            processor_to_use = ProcessorType.MARKITDOWN
            if any(doc.source_filename.lower().endswith(ext) for ext in config.UNSTRD_FILE_EXTNS):
                processor_to_use = ProcessorType.UNSTRUCTURED
            
            # Convert to markdown
            markdown_content, markdown_path = self.converter.convert_to_markdown(
                file_path=doc.source_filepath,
                processor=processor_to_use
            )
            
            # Move original file
            destination_path = os.path.join(config.PROCESSED_DATA_PATH, doc.source_filename)
            os.makedirs(os.path.dirname(destination_path), exist_ok=True)
            shutil.move(doc.source_filepath, destination_path)
            
            # Update database
            updates = {
                "markdown_filepath": markdown_path,
                "processed_filepath": destination_path,
                "ingestion_status": IngestionStatus.MARKDOWN_CONVERTED,
                "error_message": None
            }
            db_handler.update_document(doc.id, updates)
            logger.info(f"Successfully converted and moved document ID {doc.id}.")
            
        except Exception as e:
            logger.error(f"Error processing document ID {doc.id} ('{doc.source_filename}'): {e}", exc_info=True)
            db_handler.update_document(
                doc.id,
                {
                    "ingestion_status": IngestionStatus.FAILED,
                    "error_message": str(e)
                }
            )
    
    def _classify_single_document(self, db_handler: DatabaseHandler, doc: Any) -> None:
        """Classify a single document."""
        try:
            logger.info(f"Classifying document: {doc.source_filename} (ID: {doc.id})")
            
            # Read markdown content
            with open(doc.markdown_filepath, 'r', encoding='utf-8') as f:
                markdown_content = f.read()
            
            # Classify document
            classification = self.classifier.classify(markdown_content)
            
            # Update database
            updates = {
                "document_type": classification.get("document_type"),
                "document_grade": classification.get("document_grade"),
                "ingestion_status": IngestionStatus.CLASSIFIED,
                "error_message": None
            }
            db_handler.update_document(doc.id, updates)
            logger.info(f"Successfully classified document ID {doc.id}.")
            
        except Exception as e:
            logger.error(f"Error classifying document ID {doc.id}: {e}", exc_info=True)
            db_handler.update_document(
                doc.id,
                {"ingestion_status": IngestionStatus.FAILED, "error_message": f"Classification failed: {e}"}
            )
    
    def _parse_single_document(self, db_handler: DatabaseHandler, doc: Any) -> None:
        """Parse a single RFI/RFP document."""
        try:
            # Skip non-RFI/RFP documents
            if doc.document_type != "RFI/RFP":
                logger.info(f"Skipping parsing for doc ID {doc.id} (type: {doc.document_type}). Marking as parsed.")
                db_handler.update_document(doc.id, {"ingestion_status": IngestionStatus.PARSED})
                return
            
            logger.info(f"Parsing RFI/RFP document: {doc.source_filename} (ID: {doc.id})")
            
            # Read markdown content
            with open(doc.markdown_filepath, 'r', encoding='utf-8') as f:
                markdown_content = f.read()
            
            # Parse document
            parsed_data = self.rfi_parser.parse(markdown_content)
            
            # Save to JSON file
            self._save_parsed_json(doc, parsed_data)
            
            # Validate with Pydantic
            try:
                validated_data = RFIJson.model_validate(parsed_data)
                logger.info(f"Successfully validated JSON structure for doc ID {doc.id}.")
            except ValidationError as ve:
                error_details = f"Pydantic validation failed for doc ID {doc.id}: {ve}"
                logger.error(error_details)
                db_handler.update_document(
                    doc.id,
                    {"ingestion_status": IngestionStatus.FAILED, "error_message": error_details}
                )
                return
            
            # Update database
            updates = {
                "rfi_json_payload": validated_data.model_dump(),
                "ingestion_status": IngestionStatus.PARSED,
                "error_message": None
            }
            db_handler.update_document(doc.id, updates)
            logger.info(f"Successfully parsed and stored JSON for document ID {doc.id}.")
            
        except Exception as e:
            error_msg = f"Error parsing document ID {doc.id}: {e}"
            logger.error(error_msg, exc_info=True)
            db_handler.update_document(
                doc.id,
                {"ingestion_status": IngestionStatus.FAILED, "error_message": error_msg}
            )
    
    def _save_parsed_json(self, doc: Any, parsed_data: Dict[str, Any]) -> None:
        """Save parsed data to JSON file."""
        output_dir = config.PARSED_JSON_PATH
        os.makedirs(output_dir, exist_ok=True)
        json_filename = os.path.splitext(doc.source_filename)[0] + ".json"
        output_path = os.path.join(output_dir, json_filename)
        
        with open(output_path, "w", encoding="utf-8") as f:
            json.dump(parsed_data, f, indent=2, ensure_ascii=False)
        
        logger.info(f"Saved parsed output to {output_path}")
    
    def _chunk_single_document(self, db_handler: DatabaseHandler, doc: Any, current_chunk_id: int) -> int:
        """Chunk a single document and return the next available chunk ID."""
        try:
            logger.info(f"Chunking document: {doc.source_filename} (ID: {doc.id})")
            
            # Prepare content for chunking
            if doc.document_type == "RFI/RFP" and doc.rfi_json_payload:
                markdown_content = ""  # Not used for RFI/RFP
                logger.info("Chunking as RFI/RFP (using JSON payload)...")
            elif doc.markdown_filepath:
                with open(doc.markdown_filepath, 'r', encoding='utf-8') as f:
                    markdown_content = f.read()
                logger.info("Chunking as supporting document (using markdown content)...")
            else:
                logger.info("Skipping: Document is missing required data for chunking.")
                return current_chunk_id
            
            # Create chunks
            chunks_data = self.chunker.create_chunks_for_document(doc, markdown_content)
            logger.info(f"Number of chunks: {len(chunks_data)}")
            
            if chunks_data:
                # Assign chunk IDs
                for chunk_data in chunks_data:
                    chunk_data['id'] = current_chunk_id
                    current_chunk_id += 1
                
                # Save to database
                db_handler.add_chunks_to_document(doc.id, chunks_data)
                db_handler.update_document(doc.id, {"ingestion_status": IngestionStatus.CHUNKED})
                
                logger.info(f"Chunked document ID {doc.id}: {len(chunks_data)} chunks")
                logger.info(f"Successfully chunked and stored chunks for document ID {doc.id}.")
            else:
                logger.warning(f"No chunks were created for document ID {doc.id}. Moving to next stage.")
            
            return current_chunk_id
            
        except Exception as e:
            error_msg = f"Error chunking document ID {doc.id}: {e}"
            logger.error(error_msg, exc_info=True)
            db_handler.update_document(
                doc.id,
                {"ingestion_status": IngestionStatus.FAILED, "error_message": error_msg}
            )
            return current_chunk_id
    
    def _prepare_chunks_for_embedding(self, chunks_to_embed: List[Chunk]) -> List[Document]:
        """Convert database chunks to LangChain Documents."""
        documents = []
        
        for chunk in tqdm(chunks_to_embed, desc="Preparing chunks for vector store"):
            try:
                document = Document(
                    page_content=chunk.chunk_text,
                    metadata=chunk.chunk_metadata,
                    id=str(chunk.id)
                )
                documents.append(document)
            except Exception as e:
                logger.error(f"Error preparing chunk ID {chunk.id}: {e}", exc_info=True)
                continue
        
        return documents
    
    def _embed_documents_to_vector_store(self, db_handler: DatabaseHandler, documents: List[Document], chunked_docs: List[Any]) -> None:
        """Embed documents to vector store and update database."""
        try:
            logger.info(f"Adding {len(documents)} chunks to vector store...")
            vector_ids = self.vector_store_service.add_documents(batch_size=self.batch_size)
            logger.info(f"Successfully added {len(vector_ids)} chunks to ChromaDB.")
            
            # Update document statuses
            for doc in chunked_docs:
                doc_id = int(doc.id)
                db_handler.update_document(
                    doc_id,
                    {"ingestion_status": IngestionStatus.VECTORIZED}
                )

                # Move original file
                file_name = doc.markdown_filepath.split("/")[-1]
                destination_path = os.path.join(config.PROCESSED_MARKDOWN_PATH, file_name)
                os.makedirs(os.path.dirname(destination_path), exist_ok=True)
                shutil.move(doc.markdown_filepath, destination_path)
                
        except Exception as e:
            logger.error(f"Error adding chunks to vector store: {e}", exc_info=True)
            for doc in documents:
                chunk_id = int(doc.id)
                db_handler.update_document(
                    chunk_id,
                    {"ingestion_status": IngestionStatus.FAILED, "error_message": str(e)}
                )


def run_ingestion_pipeline(batch_size: int = 100) -> None:
    """
    Main entry point for the ingestion pipeline.
    
    Args:
        batch_size: Number of documents to process in each batch
    """
    pipeline = IngestionPipeline(batch_size)
    pipeline.run_full_pipeline()


if __name__ == "__main__":
    logger.info("Application started.")
    
    # Ensure the database and tables are created before running the pipeline
    init_db()
    
    # Run the main processing function
    run_ingestion_pipeline()
    
    logger.info("Application finished.")

2025-07-28 12:41:48,662 - rfiprocessor.services.vector_store_service - INFO - Loading OpenAI embedding model: text-embedding-3-large
2025-07-28 12:41:48,780 - rfiprocessor.services.vector_store_service - INFO - ChromaDB service initialized. Collection 'chunks' at data/vector_store/chroma_db
2025-07-28 12:41:48,786 - __main__ - INFO - Application started.
2025-07-28 12:41:48,786 - rfiprocessor.db.database - INFO - Initializing database and creating tables if they don't exist...
2025-07-28 12:41:48,792 - rfiprocessor.db.database - INFO - Database initialization complete.
2025-07-28 12:41:48,809 - rfiprocessor.services.llm_provider - INFO - Initialized Fast LLM Provider: gpt-4-turbo
2025-07-28 12:41:48,810 - rfiprocessor.services.prompt_loader - INFO - Loading prompt 'document_classifier' from: rfiprocessor/prompts/document_classifier.txt
2025-07-28 12:41:48,811 - rfiprocessor.core.agents.document_classifier - INFO - DocumentClassifierAgent initialized successfully.
2025-07-28 12:41:48,82