In [3]:
# Auto-reload modules when they change
%load_ext autoreload
%autoreload 2

import sys
import os
project_root = r"C:\Users\sa007769\Downloads\rag_etl\bis-gpt-rag-etl"
if project_root not in sys.path:
    sys.path.insert(0, project_root)
os.chdir(project_root)


The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [None]:
import logging
import sys

from ETL.nodes.process_new_files import process_new_files
from ETL.tools.models import ProcessingConfig
from ETL.tools.settings import sql_server_settings, rag_app_settings
from ETL.tools.registry_utils import get_etl_sources

In [None]:
app_id = rag_app_settings.app_id


In [None]:
# Configure logging to write to stdout
logging.basicConfig(
    level=logging.INFO,  # Set the logging level
    format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
    handlers=[
        logging.StreamHandler(sys.stdout),  # Log to stdout
    ],
)
logger = logging.getLogger(__name__)


# Get app_id from settings
app_id = rag_app_settings.app_id
logger.info(f"Loading configuration for app_id: {app_id}")


In [4]:
# Fetch all ETL configurations from App Registry API
try:
    all_etl_configs = get_etl_sources()
    logger.info(f"Retrieved {len(all_etl_configs)} ETL configurations from App Registry")
except Exception as e:
    logger.error(f"Failed to retrieve ETL configurations from App Registry: {e}")
    logger.error("Cannot proceed without ETL configuration. Exiting...")
    sys.exit(1)  # Exit if we cannot get configuration


# Filter configuration for current app_id
filtered_configs = [item for item in all_etl_configs if item.applicationId == app_id]


# Create ProcessingConfig from registry values
if filtered_configs:
    registry_config = filtered_configs[0]
    logger.info(f"Found ETL configuration for app_id {app_id}")
    logger.info(f"  - applicationName: {registry_config.applicationName}")
    logger.info(f"  - parserType: {registry_config.parserType}")
    logger.info(f"  - chunkerType: {registry_config.chunkerType}")
    logger.info(f"  - chunkAugmentationMethod: {registry_config.chunkAugmentationMethod}")
    
    # Normalize chunk_augmentation_method: convert 'None' or None to 'none'
    chunk_method = registry_config.chunkAugmentationMethod
    if chunk_method in [None, 'None']:
        chunk_method = 'none'
        logger.info(f"  - Normalized chunkAugmentationMethod from '{registry_config.chunkAugmentationMethod}' to 'none'")
    
    config1 = ProcessingConfig(
        parser_type=registry_config.parserType or 'document_intelligence',
        chunking_strategy=registry_config.chunkerType or 'recursive',
        chunk_augment_method=chunk_method,
        document_page_stitching=True,
    )
    
    logger.info(f"ProcessingConfig created successfully:")
    logger.info(f"  - parser_type: {config1.parser_type}")
    logger.info(f"  - chunking_strategy: {config1.chunking_strategy}")
    logger.info(f"  - chunk_augment_method: {config1.chunk_augment_method}")
    logger.info(f"  - append_summary_to_chunks: {config1.append_summary_to_chunks}")
    logger.info(f"  - use_iterative_reconstruction: {config1.use_iterative_reconstruction}")
    logger.info(f"  - document_page_stitching: {config1.document_page_stitching}")

NameError: name 'logger' is not defined

In [None]:
config1

In [None]:
config1.document_page_stitching = False
config1.append_summary_to_chunks = True
config1.parser_type = "document_intelligence"
config1.use_iterative_reconstruction = False

In [None]:
import logging
import sys

from ETL.nodes.process_new_files import process_new_files
from ETL.tools.models import ProcessingConfig
from ETL.tools.settings import sql_server_settings, rag_app_settings
from ETL.tools.db_utils import get_all_etl


# Configure logging to write to stdout
logging.basicConfig(
    level=logging.INFO,  # Set the logging level
    format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
    handlers=[
        logging.StreamHandler(sys.stdout),  # Log to stdout
    ],
)
logger = logging.getLogger(__name__)


# Get app_id from settings
app_id = rag_app_settings.app_id
logger.info(f"Loading configuration for app_id: {app_id}")

# Fetch all ETL configurations from database
try:
    all_etl_configs = get_all_etl(sql_server_settings.engine)
    logger.info(f"Retrieved {len(all_etl_configs)} ETL configurations from database")
except Exception as e:
    logger.error(f"Failed to retrieve ETL configurations from database: {e}")
    all_etl_configs = []


# Filter configuration for current app_id
filtered_configs = [item for item in all_etl_configs if item.id == app_id]

# Create ProcessingConfig from database values
if filtered_configs:
    db_config = filtered_configs[0]
    logger.info(f"Found ETL configuration for app_id {app_id}")
    logger.info(f"  - parser_type: {db_config.parser_type}")
    logger.info(f"  - chunker_type: {db_config.chunker_type}")
    logger.info(f"  - chunk_augmentation_method: {db_config.chunk_augmentation_method}")
    

    chunk_augment_method_input = db_config.chunk_augmentation_method
    if chunk_augment_method_input is None or str(chunk_augment_method_input).lower() == "none":
        chunk_augment_method_input = "none"


    config1 = ProcessingConfig(
        parser_type=db_config.parser_type or 'document_intelligence',
        chunking_strategy=db_config.chunker_type or 'recursive',
        chunk_augment_method=chunk_augment_method_input,
        document_page_stitching=True,
    )
    
    logger.info(f"ProcessingConfig created successfully:")
    logger.info(f"  - parser_type: {config1.parser_type}")
    logger.info(f"  - chunking_strategy: {config1.chunking_strategy}")
    logger.info(f"  - chunk_augment_method: {config1.chunk_augment_method}")
    logger.info(f"  - append_summary_to_chunks: {config1.append_summary_to_chunks}")
    logger.info(f"  - use_iterative_reconstruction: {config1.use_iterative_reconstruction}")
    logger.info(f"  - document_page_stitching: {config1.document_page_stitching}")
    
else:
    # Fallback to default config if no matching record found
    logger.warning(f"No ETL configuration found for app_id {app_id}, using default configuration")
    config1 = ProcessingConfig(
        chunking_strategy="recursive",
        parser_type='document_intelligence',
        chunk_augment_method='none',
        document_page_stitching=True,
    )
    logger.info("Using default ProcessingConfig")

In [1]:
import os
os.chdir(r"C:\Users\sa007769\Downloads\rag_etl\bis-gpt-rag-etl")

In [2]:
from ETL.document_processor.base.models import ProcessingConfig
from ETL.nodes.process_new_files import process_new_files

  from .autonotebook import tqdm as notebook_tqdm


In [3]:
config1 = ProcessingConfig(
    chunking_strategy="recursive",
    parser_type='document_intelligence',
    chunk_augment_method='none',
    document_page_stitching=False,
)

In [4]:
config1.document_page_stitching = False
config1.use_iterative_reconstruction = False
config1.append_summary_to_chunks = False
config1.parser_type = "document_intelligence"


files = [
    {'id': '01HYSZYLJABAHBMSIOXRDZAGN4VJGSGA26', 'name': 'BIS Meeting Services Terminology.docx', 'etag': '"{160E0820-0E49-47BC-9019-BCAA4D23035E},7"', 'web_url': 'https://bisadaz.sharepoint.com/sites/prod-MeetingServicesGPT/_layouts/15/Doc.aspx?sourcedoc=%7B160E0820-0E49-47BC-9019-BCAA4D23035E%7D&file=BIS%20Meeting%20Services%20Terminology.docx&action=default&mobileredirect=true'},
]

data = process_new_files(files=files, config=config1)

100%|██████████| 1/1 [00:02<00:00,  2.51s/it]

I am Creating DocxParser





In [5]:
data

{'BIS Meeting Services Terminology.docx': '0 present and unprocessed'}

In [None]:
config1

In [None]:
files = [
    {'id': '01HYSZYLJABAHBMSIOXRDZAGN4VJGSGA26', 'name': 'BIS Meeting Services Terminology.docx', 'etag': '"{160E0820-0E49-47BC-9019-BCAA4D23035E},7"', 'web_url': 'https://bisadaz.sharepoint.com/sites/prod-MeetingServicesGPT/_layouts/15/Doc.aspx?sourcedoc=%7B160E0820-0E49-47BC-9019-BCAA4D23035E%7D&file=BIS%20Meeting%20Services%20Terminology.docx&action=default&mobileredirect=true'},
]

data = process_new_files(files=files, config=config1)

In [None]:
print(data)

In [None]:
files = [
    {'id': '01HYSZYLJABAHBMSIOXRDZAGN4VJGSGA26', 'name': 'BIS Meeting Services Terminology.docx', 'etag': '"{160E0820-0E49-47BC-9019-BCAA4D23035E},7"', 'web_url': 'https://bisadaz.sharepoint.com/sites/prod-MeetingServicesGPT/_layouts/15/Doc.aspx?sourcedoc=%7B160E0820-0E49-47BC-9019-BCAA4D23035E%7D&file=BIS%20Meeting%20Services%20Terminology.docx&action=default&mobileredirect=true'},
]

data = process_new_files(files=files, config=config1)

files = 
config = 

#processor = FileProcessor(weaviate_client=weaviate_client, config=config)
processor = FileProcessor(config=config)
images_count_per_file_dict = {}

for file_metadata in tqdm(files):
    file_path = download_file(file_metadata)
    #file_path = Path("C:/Users/sa007769/Downloads/rag_etl/bis-gpt-rag-etl/intermediate_files/sample_2.pdf")

    msg = f"processing file {file_metadata['name']}"
    logger.info(msg)
    #print(f"file_type :: {type(file_path)}")
    if config.parser_type == 'vision' and (file_path.suffix == ".docx" or file_path.suffix == ".pdf"):
        print(f" I am in vision :: file_path :: {file_path.as_posix()}")
        file_path = convert_to_pdf(file_path.as_posix())
        print(f" After I am in vision :: file_path :: {type(file_path)}")


    # exploit file_metadata to add more stuff
    file_metadata["file_path"] = file_path

    n_unprocessed_images = processor.process_file(
        file_path=file_path,
        file_metadata=file_metadata
    )

In [None]:
from ETL.nodes.process_new_files import FileProcessor, process_new_files

In [None]:
from ETL.tools.models import ProcessingConfig

In [None]:
config1 = ProcessingConfig(
    chunking_strategy="recursive",  # Use the ChunkingStrategy enum
    parser_type='document_intelligence',
    use_iterative_reconstruction=False,
    append_summary_to_chunks=False,
    document_page_stitching=True,

)

In [None]:
files = [
    {'id': '01HYSZYLJABAHBMSIOXRDZAGN4VJGSGA26', 'name': 'BIS Meeting Services Terminology.docx', 'etag': '"{160E0820-0E49-47BC-9019-BCAA4D23035E},7"', 'web_url': 'https://bisadaz.sharepoint.com/sites/prod-MeetingServicesGPT/_layouts/15/Doc.aspx?sourcedoc=%7B160E0820-0E49-47BC-9019-BCAA4D23035E%7D&file=BIS%20Meeting%20Services%20Terminology.docx&action=default&mobileredirect=true'},
]

In [None]:
data = process_new_files(files=files, config=config1)

In [None]:
data

In [None]:
print(data[1].content)

In [None]:
data

In [None]:
"""
Simple ETL Configuration Query - Hardcoded Version
All configuration hardcoded at the top for easy modification
"""

from sqlalchemy.orm import Session, declarative_base
from sqlalchemy import select, Engine, Column, Integer, String, create_engine
from typing import List, Dict, Any

# ============================================================================
# HARDCODED CONFIGURATION - MODIFY THESE AS NEEDED
# ============================================================================

MSSQL_SERVER = 'wdchio2964.bisad.bisinfo.org,55001'
MSSQL_DB_NAME = 'bis_gpt_persistence'

# Leave as None for Windows Authentication, or set username/password for SQL Auth
MSSQL_USERNAME = None
MSSQL_PASSWORD = None

# ============================================================================
# DATABASE MODEL
# ============================================================================

Base = declarative_base()



"""
Simple ETL Configuration Query - Hardcoded Version
All configuration hardcoded at the top for easy modification
"""

from sqlalchemy.orm import Session
from sqlalchemy import select, Engine, Column, Integer, String, create_engine
from typing import List, Dict, Any

# ============================================================================
# HARDCODED CONFIGURATION - MODIFY THESE AS NEEDED
# ============================================================================

MSSQL_SERVER = 'wdchio2964.bisad.bisinfo.org,55001'
MSSQL_DB_NAME = 'bis_gpt_persistence'

# Leave as None for Windows Authentication, or set username/password for SQL Auth
MSSQL_USERNAME = None
MSSQL_PASSWORD = None

# ============================================================================
# DATABASE MODEL
# ============================================================================

Base = declarative_base()

class ETLConfiguration(Base):
    """Database model for dbo.etl_configuration table"""
    __tablename__ = "etl_configuration"
    __table_args__ = {'schema': 'dbo'}
    
    id = Column(Integer, primary_key=True, autoincrement=True)
    parser_type = Column(String(255), nullable=True)
    chunker_type = Column(String(255), nullable=True)
    
    def to_dict(self) -> Dict[str, Any]:
        """Convert to dictionary"""
        return {
            'id': self.id,
            'parser_type': self.parser_type,
            'chunker_type': self.chunker_type,
        }


# ============================================================================
# DATABASE CONNECTION
# ============================================================================

def get_database_engine() -> Engine:
    """Create and return database engine"""
    if MSSQL_USERNAME is None or MSSQL_PASSWORD is None:
        # Windows Authentication
        connection_string = (
            f"mssql+pyodbc://{MSSQL_SERVER}/{MSSQL_DB_NAME}"
            f"?driver=ODBC+Driver+17+for+SQL+Server"
            f"&trusted_connection=yes"
        )
    else:
        # SQL Server Authentication
        connection_string = (
            f"mssql+pyodbc://{MSSQL_USERNAME}:{MSSQL_PASSWORD}@{MSSQL_SERVER}/{MSSQL_DB_NAME}"
            f"?driver=ODBC+Driver+17+for+SQL+Server"
        )
    
    return create_engine(connection_string, echo=False, pool_pre_ping=True)


# ============================================================================
# QUERY FUNCTIONS
# ============================================================================

def get_all_etl(engine: Engine) -> List[Dict[str, Any]]:
    """
    Retrieve all ETL configurations from the database.
    Returns list of dictionaries with all columns.
    """
    with Session(engine) as session:
        query = select(ETLConfiguration)
        results = session.execute(query).scalars().all()
        return [config.to_dict() for config in results]




# ============================================================================
# MAIN FUNCTION
# ============================================================================

print("Connecting to database...")
print(f"Server: {MSSQL_SERVER}")
print(f"Database: {MSSQL_DB_NAME}")
print()

engine = get_database_engine()

# Test connection
with engine.connect() as conn:
    print("✓ Database connection successful!\n")

# Get all configurations
configs = get_all_etl(engine)

print("="*80)
print(f"Found {len(configs)} ETL configuration(s):")
print("="*80)

for i, config in enumerate(configs, 1):
    print(f"\nConfiguration {i}:")
    print(f"  ID: {config['id']}")
    print(f"  Parser Type: {config['parser_type']}")
    print(f"  Chunker Type: {config['chunker_type']}")

print("\n" + "="*80)





In [None]:
engine

In [None]:
configs

In [None]:
from sqlalchemy.orm import Session, declarative_base
from sqlalchemy import select, Engine, Column, Integer, String, create_engine
from typing import List, Dict, Any
from ETL.tools.settings import sql_server_settings

Base = declarative_base()

class ETLConfiguration(Base):
    """Database model for dbo.etl_configuration table"""
    __tablename__ = "etl_configuration"
    __table_args__ = {'schema': 'dbo'}
    
    id = Column(Integer, primary_key=True, autoincrement=True)
    parser_type = Column(String(255), nullable=True)
    chunker_type = Column(String(255), nullable=True)
    
    def to_dict(self) -> Dict[str, Any]:
        """Convert to dictionary"""
        return {
            'id': self.id,
            'parser_type': self.parser_type,
            'chunker_type': self.chunker_type,
        }
    

def get_all_etl(engine: Engine) -> List[Dict[str, Any]]:
    """
    Retrieve all ETL configurations from the database.
    Returns list of dictionaries with all columns.
    """
    with Session(engine) as session:
        query = select(ETLConfiguration)
        results = session.execute(query).scalars().all()
        return [config.to_dict() for config in results]

In [None]:
engine = sql_server_settings.engine

In [None]:
engine

In [None]:
# Test connection
with engine.connect() as conn:
    print("✓ Database connection successful!\n")

In [None]:
configs = get_all_etl(engine)


In [None]:
configs

In [None]:
from ETL.tools.settings import sql_server_settings
engine = sql_server_settings.engine

In [None]:
from ETL.tools.db_utils import get_all_etl
result = get_all_etl(engine)

In [None]:
result

In [None]:
from sqlalchemy.orm import Session
from sqlalchemy import Engine, text
from typing import List
from ETL.tools.settings import sql_server_settings

from pydantic import BaseModel

class ETLConfigurationModel(BaseModel):
    id: int
    parser_type: str | None = None
    chunker_type: str | None = None
    chunk_augmentation_method: str | None = None

def get_all_etl(engine: Engine) -> List[ETLConfigurationModel]:
    """
    Retrieve all ETL configurations from the database using raw SQL.
    Returns a list of Pydantic models.
    """
    with Session(engine) as session:
        result = session.execute(
            text("SELECT id, parser_type, chunker_type, chunk_augmentation_method FROM dbo.etl_configuration")
        )
        rows = result.fetchall()
        columns = result.keys()
        return [
            ETLConfigurationModel(**dict(zip(columns, row)))
            for row in rows
        ]

In [None]:
result = get_all_etl(engine)
filtered_dicts = [item.model_dump() for item in result if item.id == 2]

filtered_dicts

In [None]:
from ETL.tools.settings import rag_app_settings

In [None]:
rag_app_settings.app_id

In [None]:
from ETL.tools.models import ProcessingConfig
from ETL.tools.settings import sql_server_settings, rag_app_settings
from ETL.tools.db_utils import get_all_etl

import logging
import sys

# Configure logging to write to stdout
logging.basicConfig(
    level=logging.INFO,  # Set the logging level
    format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
    handlers=[
        logging.StreamHandler(sys.stdout),  # Log to stdout
    ],
)
logger = logging.getLogger(__name__)


# Get app_id from settings
app_id = rag_app_settings.app_id
logger.info(f"Loading configuration for app_id: {app_id}")

# Fetch all ETL configurations from database
try:
    all_etl_configs = get_all_etl(sql_server_settings.engine)
    logger.info(f"Retrieved {len(all_etl_configs)} ETL configurations from database")
except Exception as e:
    logger.error(f"Failed to retrieve ETL configurations from database: {e}")
    all_etl_configs = []


# Filter configuration for current app_id
filtered_configs = [item for item in all_etl_configs if item.id == app_id]
#filtered_dicts = [item.model_dump() for item in result if item.id == 2]


# Create ProcessingConfig from database values
if filtered_configs:
    db_config = filtered_configs[0]
    logger.info(f"Found ETL configuration for app_id {app_id}")
    logger.info(f"  - parser_type: {db_config.parser_type}")
    logger.info(f"  - chunker_type: {db_config.chunker_type}")
    logger.info(f"  - chunk_augmentation_method: {db_config.chunk_augmentation_method}")
    
    config1 = ProcessingConfig(
        parser_type=db_config.parser_type or 'document_intelligence',
        chunking_strategy=db_config.chunker_type or 'recursive',
        chunk_augment_method=db_config.chunk_augmentation_method or 'none',
        document_page_stitching=True,
    )
    
    logger.info(f"ProcessingConfig created successfully:")
    logger.info(f"  - parser_type: {config1.parser_type}")
    logger.info(f"  - chunking_strategy: {config1.chunking_strategy}")
    logger.info(f"  - chunk_augment_method: {config1.chunk_augment_method}")
    logger.info(f"  - append_summary_to_chunks: {config1.append_summary_to_chunks}")
    logger.info(f"  - use_iterative_reconstruction: {config1.use_iterative_reconstruction}")
    logger.info(f"  - document_page_stitching: {config1.document_page_stitching}")
    
else:
    # Fallback to default config if no matching record found
    logger.warning(f"No ETL configuration found for app_id {app_id}, using default configuration")
    config1 = ProcessingConfig(
        chunking_strategy="recursive",
        parser_type='document_intelligence',
        chunk_augment_method='none',
        document_page_stitching=True,
    )
    logger.info("Using default ProcessingConfig")



In [None]:
config1

In [None]:
[k.id for k in  result if ]

In [None]:
#%pip install markdown

In [None]:
import os
from pathlib import Path
filename_path = Path(filename)

In [None]:
output_filename = filename_path.stem

In [None]:
output_filename

In [None]:
#%pip install docx2pdf
#%pip install fpdf
#%pip install PyMuPDF
from docx2pdf import convert
convert(r"C:\Users\sa007769\Downloads\rag_etl\bis-gpt-rag-etl\intermediate_files\BIS Meeting Services Terminology.docx", r"C:\Users\sa007769\Downloads\Mine.pdf")

In [None]:
from pptxtopdf import convert

In [None]:
convert(r"C:\Users\sa007769\Downloads\sample1.pptx",r"C:\Users\sa007769\Downloads\sample11.pptxpdf")

In [None]:
import gc
from datetime import datetime
from PIL import Image
import fitz
import tempfile
import io
import time
from pathlib import Path
import shutil
import base64
from tqdm import tqdm
from langchain_core.messages import HumanMessage, SystemMessage
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_core.prompts import PromptTemplate
from langchain.chains.combine_documents import create_stuff_documents_chain
from langchain_core.documents import Document
import openai


In [None]:
Image.MAX_IMAGE_PIXELS = None


from pydantic import BaseModel, Field
class complete_doc(BaseModel):
    complete_doc: str = Field(default=None, description='complete document in markdown format')

class etl_components:
    """Processes images."""

    def __init__(self, file, llm_multimodal, stitching_enabled=True):
        """
        Initialize ImageProcessor.

        Args:
        - file (str): Image file path.
        - bucket_name (str, optional): Name of the Google Cloud Storage bucket (default: False).
        """
        self.file = file
        self.llm_multimodal = llm_multimodal
        self.stitching_enabled = stitching_enabled


    def pdf_to_base64_utf8_images(self,blob_pdf_path=False):
        # Open the PDF file
        pdf_document = fitz.open(self.file)

        # List to store base64 encoded images
        base64_images = []
        raw_images = []
        images_path = []
        images_path_blob = []
        names=[]


        # Ensure the output folder exists
        temp_dir=tempfile.mkdtemp()+"/"

        try:
            # Iterate over each page
            for page_num in range(len(pdf_document)):
                # Get the page
                page = pdf_document.load_page(page_num)

                # Render the page to an image
                pix = page.get_pixmap(dpi=300)

                # Convert the image to PIL Image format
                img = Image.open(io.BytesIO(pix.tobytes("png")))
                raw_images.append(img)

                # Save the image to a BytesIO object in JPEG format
                img_byte_arr = io.BytesIO()
                img.save(img_byte_arr, format='JPEG')

                ##### For image source - blob storage
                if blob_pdf_path != False:
                    blob_image_full_path = os.path.join(os.path.dirname(blob_pdf_path),f"{Path(self.file).stem}_{page_num + 1}.jpeg")
                    images_path_blob.append(blob_image_full_path)



                ##### local images path
                images_location_locally = os.path.join(temp_dir, f"{Path(self.file).stem}_{page_num + 1}.jpeg")
                images_path.append(images_location_locally)
                names.append(Path(images_location_locally).stem)
                #####


                # Get the byte data of the image
                img_byte_arr = img_byte_arr.getvalue()

                # Encode the byte data to base64
                img_base64 = base64.b64encode(img_byte_arr)

                # Encode the base64 bytes to UTF-8 string
                img_base64_utf8 = img_base64.decode('utf-8')

                # Append the UTF-8 string to the list
                base64_images.append(img_base64_utf8)
                imagestring_n_name = dict(zip(names,base64_images))

                del img_byte_arr
                del img_base64_utf8
                del img
                del img_base64
                del page

                gc.collect()

        finally:
            shutil.rmtree(temp_dir, ignore_errors=True)

        return (imagestring_n_name)
    


    def summarize_image(self,encoded_image: str) -> str:
        """
        Asynchronous Summarize the content of the provided image using an LLM.

        Args:
            encoded_image (str): Base64-encoded image string.

        Returns:
            str: Summarized content of the image.
        """
        prompt = [
            SystemMessage(content="""You are a bot that is good at analyzing images. Please act as an Expert and help in analysing and describing the tables, flowcharts, graphs, plots etc. 
                          Please extract all the details given in the image.\n Use only these images."""),
            HumanMessage(content=[
                {
                    "type": "text",
                    "text": """Execute the following tasks step by step and extract information.
                    1. Capture the maximum possible details (all the details) given in the image in a best possible way.
                    2. Wherever text is present, extract ALL the text as it is without changing/modifying the text. 
                    In Image, if plots, diagrams, tables present please provide description and capture as much as details possible (note: some images has image caption). Please elaborate captions and other additional details also.
                    3. Please examine the each step of flowcharts and process flow carefully and describe them step by step in detail.
                    Please capture all the possible details.
                    4. Please also capture facts and numeric information given in plots and graphs such barplot, histogram, lineplot etc
                    5. Please also capture exact text given on different objects or products and also capture details of given entities.
                    6. Please also capture information such as references, information given on header, footers, filenames,
                    page, question, answers, multiple choice questions, signatures, signatory names and other additional information etc.
                    7. Carefully, capture the exact text and details given on different tables and extract it in markdown structured table format.
                    Please do it extremely carefully and in detail.

                    # Provide all the extracted details in the best way possible in Markdown format (headings, Subheadings, text, points etc).
                    # Please avois writing extra text such as Extracted Information from the Image. You can directly start from content. 
                    
                    Take a deep breath and let's do it step by step. It is important for my career!"""

                },
                {
                    "type": "image_url",
                    "image_url": {
                        "url": f"data:image/jpeg;base64,{encoded_image}"
                    },
                },
            ])
        ]


        retries = 0
        max_retries = 10
        while retries < max_retries:

            try:
                response = self.llm_multimodal.invoke(prompt)
                return response.content
            
            except openai.error.InvalidRequestError as e:
                if e.error.code == "content_filter" and e.error.innererror:
                    content_filter_result = e.error.innererror.content_filter_result
                    # print the formatted JSON
                    print(content_filter_result)
                    # or access the individual categories and details
                    for category, details in content_filter_result.items():
                        print(f"{category}:\n filtered={details['filtered']}\n severity={details['severity']}")

            except Exception as e:
                if "limit" in str(e).lower():
                    print(f"Rate limit error encountered. Retrying in 30 seconds... (Attempt {retries+1}/{max_retries})")
                    retries += 1
                    time.sleep(6)

        
    def new_summarize_image(self,encoded_image: str) -> str:
        """
        Asynchronous Summarize the content of the provided image using an LLM.

        Args:
            encoded_image (str): Base64-encoded image string.

        Returns:
            str: Summarized content of the image.
        """
        prompt = [
            SystemMessage(content="""You are an expert image analysis bot. You are a bot that is good at analyzing images. Please act as an Expert and help in analysing and describing the tables, flowcharts, graphs, plots etc.
            Please extract all the details given in the image, maintaining original formatting, structure, and layout for the text.

            **CRITICAL RULES:**
            - Extract text EXACTLY as it appears in the image - preserve original formatting, line breaks, and structure.
            - Start directly with the actual content from the image.
            - Maintain the EXACT layout and format from the image for text content.

            **Tasks:**
            1. Capture the maximum possible details (all details) given in the image in the best possible way.
                          
            2. Extract ALL text as it is, without changing or modifying the text, and preserve original formatting and structure.
                          
            3. For all non-textual elements—such as flowcharts, diagrams, footers, pages, logos, visual objects, and any other graphical or layout features—provide clear, descriptive, and detailed explanations, covering every minute piece of information present.  
            - Carefully describe each component, symbol, shape, icon, color, positioning, and any visible annotation or mark.
            - If the image contains captions for these elements, elaborate on those captions and include all additional details.
                          
            4. For flowcharts and process flows: examine each step carefully and describe them step by step in detail.
                          
            5. Capture all facts and numeric information given in plots and graphs (such as bar plots, histograms, line plots, etc.).
                          
            6. Extract exact text given on different objects, products, and entities. Capture details of all given entities.
                          
            7. Capture and extract details such as references, headers, footers, filenames, page numbers, questions, answers, multiple choice questions, signatures, signatory names, and any other additional information present.
                - For elements such as dates, signatures, names, or other indicators, you may add a brief clarifying note in parentheses or as a footnote to indicate what the element is (e.g., "Date: 2023-05-01 (document creation date)", "Signature: John Doe (signatory)"), but **do NOT change or paraphrase the actual text content**.
                - If additional details about a signature, date, or other indicator are present (such as title, position, or context), include those descriptively.
          
            8. For tables: extract the exact text and details given on different tables and present them in Markdown structured table format, preserving the exact structure. Do this with extreme care and detail.

            **Output Format:**
            - Use Markdown formatting (headings, subheadings, text, points, code blocks where appropriate).
            - For text content: maintain the EXACT layout and format from the image.
            - For visual and graphical elements (charts, diagrams, flowcharts, logos, etc.): provide clear, comprehensive, and detailed descriptions, including any captions and additional details.
            - For tables: use Markdown table format, preserving the original structure and all details.

            Begin extraction immediately without preamble."""
            ),
            HumanMessage(content=[
                {
                    "type": "image_url",
                    "image_url": {
                        "url": f"data:image/jpeg;base64,{encoded_image}"
                    },
                },
            ])
        ]


        retries = 0
        max_retries = 10
        while retries < max_retries:

            try:
                response = self.llm_multimodal.invoke(prompt)
                return response.content
            
            except openai.error.InvalidRequestError as e:
                if e.error.code == "content_filter" and e.error.innererror:
                    content_filter_result = e.error.innererror.content_filter_result
                    # print the formatted JSON
                    print(content_filter_result)
                    # or access the individual categories and details
                    for category, details in content_filter_result.items():
                        print(f"{category}:\n filtered={details['filtered']}\n severity={details['severity']}")

            except Exception as e:
                if "limit" in str(e).lower():
                    print(f"Rate limit error encountered. Retrying in 30 seconds... (Attempt {retries+1}/{max_retries})")
                    retries += 1
                    time.sleep(6)



    def generate_document_summary_stuff(self, image_summary_list):

        combine ='\n\n#############################################\n\n'.join(image_summary_list)
        docs=[combine]

        doc_creator = RecursiveCharacterTextSplitter(chunk_size=1500,chunk_overlap=150,length_function=len,is_separator_regex=False)

        split_docs = doc_creator.create_documents(texts = docs)

        prompt_template_fulldocument = """
            Write a concise summary that captures the superficial overview and key points from the following text:\n{context}

            Your summary should:
            - Provide a high-level overview, focusing on the main themes and key highlights.
            - Include relevant attributes such as TITLES, SUBTITLES, COMPANY NAMES, REFERENCES, DATES, TOTAL PAGE COUNT, DOCUMENT IDs, and any other significant details.
            - Be no longer than 15-18 lines to maintain conciseness.
            - Present the information in a clear, structured, and easy-to-read manner.

            Please ensure the summary balances brevity with comprehensiveness, providing a superficial yet meaningful overview of the text.

            SUMMARY:
            """

        prompt_document = PromptTemplate.from_template(prompt_template_fulldocument)

        retries = 0
        max_retries = 10
        while retries < max_retries:

            try:
                llmchain = create_stuff_documents_chain(self.llm_multimodal, prompt_document)

                # Invoke the llm chain with the document object
                document_summaries = llmchain.invoke({"context": split_docs})

                return document_summaries

            except Exception as e:
                if "limit" in str(e).lower():
                    print(f"Rate limit error encountered. Retrying in 8 seconds... (Attempt {retries+1}/{max_retries})")
                    retries += 1
                    time.sleep(6)
                else:
                    raise e
        raise Exception("Max retries reached due to rate limit errors.")


    def append_chunks_fulldoc_summary(self, concise_doc_summary, splitted_text, file=None):
        import copy

        filename = os.path.basename(file) if file else 'Not given'

        chunk_n = copy.deepcopy(splitted_text)
        doc_summary = concise_doc_summary
        for d in range(len(chunk_n)):
            entire_summary = f'{str(chunk_n[d].page_content)}\n\n---\n\n### **Filename : {filename}** \n### Consolidated summary / high-level overview of whole document given below: ##############\n\n{str(doc_summary)}'

            chunk_n[d].page_content = entire_summary

        return chunk_n
    

    def split_recursive(self, images_summary, chnk_size=1000,chnk_overlap=200, additional_metadata={}):

        """
        Split image summaries into smaller chunks for further processing.

        Args:
            images_summary (List[str]): List of image summaries.
            files_blob (str): Blob storage path of the files.
            image_path (List[str]): List of image paths.
            chnk_size (int, optional): Chunk size for splitting (default: 1000).
            chnk_overlap (int, optional): Overlap size for splitting (default: 200).

        Returns:
            List[Document]: List of split document chunks.
        """

        source = os.path.basename(self.file)

        text_splitter = RecursiveCharacterTextSplitter(chunk_size=chnk_size,chunk_overlap=chnk_overlap,is_separator_regex=False)

        list_document = []
        for i in range(len(images_summary)):
            metadata_dict = {'source': source,'page': str(i+1)}
            
            if additional_metadata:
                metadata_dict = {**metadata_dict, **additional_metadata}

            doc_n = Document(page_content = images_summary[i],metadata = metadata_dict)
            list_document.append(doc_n)
            chunks = text_splitter.split_documents(list_document)

        return chunks
    

    def stitch_pages(self, reference_text, page_contents_list):
        """
        Stitches together page-by-page content into a complete document.

        Args:
            reference_text (str): The original/reference text for context.
            page_contents_list (list): List of extracted content from each page.

        Returns:
            str: The stitched complete document.
        """

        stiching_template = """You are an expert document reconstruction specialist. Your task is to intelligently stitch together page-by-page extracted content into a single, coherent, and complete document.

            
        **PAGE-BY-PAGE EXTRACTED CONTENT:**
        {page_contents}

        

        Purpose of Reference to let you fill missing link between different page. Please do not consider it for any other purpose.
        **REFERENCE TEXT (Original Document - Use as Context):**
        {reference_text}


        **YOUR TASK:**
        Reconstruct the complete document by:

        1. **Connecting Split Text**: 
        - Identify sentences/paragraphs that are split across pages
        - Merge them seamlessly without duplication
        - Existing content should remain unchanged, But same time feel free to remove redundant lines.
        - Fill in minimal connector text ONLY where absolutely necessary for coherence

        2. **Merging Split Tables**:
        - Identify table fragments across pages
        - Combine them into single, complete markdown tables
        - Preserve all rows, columns, and data
        - Remove duplicate headers that appear on continuation pages

        3. **Combining Image Descriptions**:
        - Merge related image descriptions that were separated by page breaks
        - Create complete, unified descriptions
        - Maintain all details from individual page descriptions

        4. **Preserving Content Integrity**:
        - Keep ALL existing content unchanged except for merging split elements
        - Do NOT paraphrase, summarize, or rewrite existing text
        - Do NOT add new information not present in the extracted pages
        - Do NOT remove any existing content
        - Maintain original formatting, structure, and style

        5. **Using Reference Text**:
        - Use reference text ONLY to understand context and identify split points
        - Use it to fill MINIMAL missing connector words/phrases if absolutely necessary
        - Do NOT copy large sections from reference text
        - Prioritize the extracted page content over reference text

        **OUTPUT REQUIREMENTS:**
        - Produce a single, clean document. Please maintain the structure of EXTRACTED CONTENT
        - Ensure smooth transitions between merged sections
        - Maintain all original headings, subheadings, and structure, footer, graphical details, page, footer, header, references, links etc all as it is.
        - Keep all type of data, numbers, graphical description, footer, logo, header, references, and link and specific details etc intact
        - DO NOT add any preamble or explanation - output ONLY the reconstructed document

        **CRITICAL**: Your output should be the final, complete document starting immediately with the content.."""


        rendered_prompt = PromptTemplate(template=stiching_template, input_variables=['reference_text','page_contents'], validate_template=True)


        # Format page contents with clear page markers
        formatted_pages = ""
        for i, page_content in enumerate(page_contents_list, 1):
            formatted_pages += f"\n{'='*80}\n"
            formatted_pages += f"PAGE {i}\n"
            formatted_pages += f"{'='*80}\n"
            formatted_pages += page_content
            formatted_pages += f"\n{'='*80}\n\n"
        
        # Create the complete prompt
        prompt = rendered_prompt.invoke({'reference_text':reference_text, 'page_contents':formatted_pages})
        
        
        # Call the LLM
        structured_llm = self.llm_multimodal.with_structured_output(complete_doc)
        
        response = structured_llm.invoke(prompt)
        
        return response.complete_doc


In [None]:
from langchain_openai import AzureChatOpenAI

In [None]:
AZURE_OAI_2nd_ENDPOINT="https://oai-bisgpt-prod-001.openai.azure.com"
AZURE_OAI_2nd_API_KEY="7044270aded9483cab6269a417b33a55"
AZURE_OAI_2nd_DEPLOYMENT="gpt-4o-fsi"
AZURE_OAI_2nd_API_VERSION="2024-12-01-preview"

import httpx

httpx_client = httpx.Client(verify=False)

OPENAI_API_TYPE = "azure"
AZURE_OPENAI_ENDPOINT = AZURE_OAI_2nd_ENDPOINT
AZURE_OPENAI_API_VERSION = AZURE_OAI_2nd_API_VERSION
AZURE_OPENAI_API_KEY = AZURE_OAI_2nd_API_KEY
AZURE_OPENAI_GPT4O_DEPLOYMENT_NAME = AZURE_OAI_2nd_DEPLOYMENT

llm_multimodal = AzureChatOpenAI(
    api_key=AZURE_OAI_2nd_API_KEY,
    openai_api_version=AZURE_OAI_2nd_API_VERSION,
    azure_deployment=AZURE_OAI_2nd_DEPLOYMENT,
    temperature=0.1,
    max_tokens=4000,
    azure_endpoint=AZURE_OAI_2nd_ENDPOINT
)

In [None]:
import os
file = r"C:\Users\sa007769\Downloads\rag_etl\bis-gpt-rag-etl\intermediate_files\BIS Meeting Services Terminology.pdf"


def process_image_based_pdf(file, page_wise_completion=False):
    
    ## convert doc to images
    image_processor_object = etl_components(file=file, llm_multimodal=llm_multimodal)
    image_base64 = image_processor_object.pdf_to_base64_utf8_images(blob_pdf_path=file)
    images_list = list(image_base64.values())


    ## generate summary
    summarize_images_list = []
    for image in tqdm(images_list, desc="Generating summary - images - 1 by 1"):
        summary = image_processor_object.summarize_image(image)
        summarize_images_list.append(summary)
    print(f"{os.path.basename(file)} : One-by-One Summary generated finished !")


    if page_wise_completion:
        
        data.content = None

        ## stitching
        completed_document = image_processor_object.stitch_pages(reference_text=data.content,page_contents_list=summarize_images_list)

        ## chunking
        chunked_doc = image_processor_object.split_recursive(images_summary=[completed_document],file=file, chnk_size=3000,chnk_overlap=400, additional_metadata={})

    else:

        ## chunking
        chunked_doc = image_processor_object.split_recursive(images_summary=summarize_images_list,file=file, chnk_size=3000,chnk_overlap=400, additional_metadata={})


    ## generate full document summary
    summary_full_doc = None
    try:
        summary_full_doc = image_processor_object.generate_document_summary_stuff(image_summary_list=summarize_images_list)
        print(f"{os.path.basename(file)} : document_summary_stuff() done !")
    except Exception as e:
        print(f"document_summary_stuff() failed: {e}")


    ## append overview/resume to each chunk
    try:
        splitted_chunks_appended = image_processor_object.append_chunks_fulldoc_summary(concise_doc_summary=summary_full_doc, splitted_text=chunked_doc, file=file)
    except Exception as e:
        raise RuntimeError("Failed at append_chunks_fulldoc_summary()")
    

    return splitted_chunks_appended

In [None]:


#return (imagestring_n_name,raw_images,images_path,images_path_blob)


def image_processing(abs_filename):
    try:
        file = abs_filename  # Input file
        image_processor = etl_components(file=file, llm_multimodal=llm_multimodal)
        image_base64 = image_processor.pdf_to_base64_utf8_images(blob_pdf_path=abs_filename)

        inputs_images_n = list(image_base64.values())
        print(f"{file} images created. Count: {len(inputs_images_n)}")
        return (image_processor, inputs_images_n)

    except Exception as e:
        print(f"Error during image processing for file {abs_filename}: {e}")
        return None  # Explicitly return None to indicate failure
    

image_processor_object, images_list = image_processing(file)

In [None]:
summarize_images_list = []
for image in tqdm(images_list, desc="Processing Images-onebyone-Summary"):
    summary = image_processor_object.summarize_image(image)
    summarize_images_list.append(summary)
print(f"{os.path.basename(file)} : One-by-One Summary generated finished !")

In [None]:
from IPython.display import Markdown
Markdown(summarize_images_list[1])

In [None]:
################################################### SUMMARIZE WHOLE DOCUMENT STARTS 
summary_full_doc = None
try:
    summary_full_doc = image_processor_object.generate_document_summary_stuff(image_summary_list=summarize_images_list)
    print(f"{os.path.basename(file)} : document_summary_stuff() done !")
except Exception as e:
    print(f"document_summary_stuff() failed: {e}")




In [None]:
Markdown(summary_full_doc)

In [None]:
chunked_doc = image_processor_object.split_recursive(images_summary=summarize_images_list,file=file, chnk_size=3000,chnk_overlap=400)

In [None]:
Markdown(chunked_doc[0].page_content)

In [None]:
try:
    splitted_chunks_appended = image_processor_object.append_chunks_fulldoc_summary(concise_doc_summary=summary_full_doc, splitted_text=chunked_doc, file=file)
except Exception as e:
    raise RuntimeError("Failed at append_chunks_fulldoc_summary()")

In [None]:
Markdown(splitted_chunks_appended[0].page_content)

In [None]:
source = file

In [None]:
response = image_processor_object.stitch_pages(reference_text=data.content,page_contents_list=summarize_images_list)

In [None]:
Markdown(response)

In [None]:
from pydantic import BaseModel, Field
class complete_doc(BaseModel):
    complete_doc: str = Field(default=None, description='complete document in markdown format')

In [None]:
# Create the stitching prompt template
stitching_prompt_template = PromptTemplate(
    input_variables=["reference_text", "page_contents"],
    template="""You are an expert document reconstruction specialist. Your task is to intelligently stitch together page-by-page extracted content into a single, coherent, and complete document.

    
**PAGE-BY-PAGE EXTRACTED CONTENT:**
{page_contents}


Purpose of Reference to let you fill missing link between different page. Please do not consider it for any other purpose.
**REFERENCE TEXT (Original Document - Use as Context):**
{reference_text}



**YOUR TASK:**
Reconstruct the complete document by:

1. **Connecting Split Text**: 
   - Identify sentences/paragraphs that are split across pages
   - Merge them seamlessly without duplication
   - Existing content should remain unchanged, But same time feel free to remove redundant lines.
   - Fill in minimal connector text ONLY where absolutely necessary for coherence

2. **Merging Split Tables**:
   - Identify table fragments across pages
   - Combine them into single, complete markdown tables
   - Preserve all rows, columns, and data
   - Remove duplicate headers that appear on continuation pages

3. **Combining Image Descriptions**:
   - Merge related image descriptions that were separated by page breaks
   - Create complete, unified descriptions
   - Maintain all details from individual page descriptions

4. **Preserving Content Integrity**:
   - Keep ALL existing content unchanged except for merging split elements
   - Do NOT paraphrase, summarize, or rewrite existing text
   - Do NOT add new information not present in the extracted pages
   - Do NOT remove any existing content
   - Maintain original formatting, structure, and style

5. **Using Reference Text**:
   - Use reference text ONLY to understand context and identify split points
   - Use it to fill MINIMAL missing connector words/phrases if absolutely necessary
   - Do NOT copy large sections from reference text
   - Prioritize the extracted page content over reference text

**OUTPUT REQUIREMENTS:**
- Produce a single, clean document. Please maintain the structure of EXTRACTED CONTENT
- Ensure smooth transitions between merged sections
- Maintain all original headings, subheadings, and structure, footer, graphical details, page, footer, header, references, links etc all as it is.
- Keep all type of data, numbers, graphical description, footer, logo, header, references, and link and specific details etc intact
- DO NOT add any preamble or explanation - output ONLY the reconstructed document

**CRITICAL**: Your output should be the final, complete document starting immediately with the content.."""
)

# Function to stitch pages together
def stitch_pages(reference_text, page_contents_list, llm):
    """
    Stitches together page-by-page content into a complete document.
    
    Args:
        reference_text (str): The original/reference text for context
        page_contents_list (list): List of extracted content from each page
        llm: The language model instance
    
    Returns:
        str: The stitched complete document
    """
    
    # Format page contents with clear page markers
    formatted_pages = ""
    for i, page_content in enumerate(page_contents_list, 1):
        formatted_pages += f"\n{'='*80}\n"
        formatted_pages += f"PAGE {i}\n"
        formatted_pages += f"{'='*80}\n"
        formatted_pages += page_content
        formatted_pages += f"\n{'='*80}\n\n"
    
    print(formatted_pages)
    # Create the complete prompt
    prompt = stitching_prompt_template.format(
        reference_text=reference_text,
        page_contents=formatted_pages
    )
    
    # Call the LLM
    structured_llm = llm_multimodal.with_structured_output(complete_doc)
    
    response = structured_llm.invoke([HumanMessage(content=prompt)])
    
    return response.complete_doc


# Function to stitch pages with retry logic
def stitch_pages_with_retry(reference_text, page_contents_list, llm, max_retries=3):
    """
    Stitches pages with retry logic in case of failures.
    """
    retries = 0
    while retries < max_retries:
        try:
            result = stitch_pages(reference_text, page_contents_list, llm)
            return result
        except Exception as e:
            retries += 1
            print(f"Attempt {retries} failed: {str(e)}")
            if retries >= max_retries:
                raise Exception(f"Failed after {max_retries} attempts: {str(e)}")
    

# Example usage
if __name__ == "__main__":
    # Example reference text (original document or OCR text)
    reference_text = data.content
    
    # Example page-by-page extracted content (simulating split pages)    
    page_contents_list = summarize_images_list
    
    # Stitch the pages together
    full_document = stitch_pages_with_retry(
        reference_text=reference_text,
        page_contents_list=page_contents_list,
        llm=llm_multimodal
    )


#combine_docs=[full_document]
#doc_creator = RecursiveCharacterTextSplitter(chunk_size=2500,chunk_overlap=300,is_separator_regex=False)
#split_docs = doc_creator.create_documents(combine_docs, metadatas = [{'source': os.path.basename(source)}])
#Markdown(split_docs[0].page_content)

In [None]:
Markdown(full_document)

In [None]:
source = file

In [None]:
full_document


In [None]:
def split_recursive(images_summary, chnk_size=1000,chnk_overlap=200, additional_metadata={}):

    """
    Split image summaries into smaller chunks for further processing.

    Args:
        images_summary (List[str]): List of image summaries.
        files_blob (str): Blob storage path of the files.
        image_path (List[str]): List of image paths.
        chnk_size (int, optional): Chunk size for splitting (default: 1000).
        chnk_overlap (int, optional): Overlap size for splitting (default: 200).

    Returns:
        List[Document]: List of split document chunks.
    """

    source = os.path.basename(file)

    text_splitter = RecursiveCharacterTextSplitter(chunk_size=chnk_size,chunk_overlap=chnk_overlap,is_separator_regex=False)

    list_document = []
    for i in range(len(images_summary)):
        metadata_dict = {'source': source,'page': str(i+1)}
        
        if additional_metadata:
            metadata_dict = {**metadata_dict, **additional_metadata}

        doc_n = Document(page_content = images_summary[i],metadata = metadata_dict)
        list_document.append(doc_n)
        chunks = text_splitter.split_documents(list_document)

    return chunks

In [None]:
doc_split = split_recursive(images_summary=[full_document], chnk_size=1500,chnk_overlap=200, additional_metadata={})

In [None]:
doc_split

In [1]:
# Auto-reload modules when they change
%load_ext autoreload
%autoreload 2

import sys
import os
project_root = r"C:\Users\sa007769\Downloads\rag_etl\bis-gpt-rag-etl"
if project_root not in sys.path:
    sys.path.insert(0, project_root)
os.chdir(project_root)


## Modular code 

In [None]:
import logging
from pathlib import Path

from document_processor.main_processor.file_processor import FileProcessor
from document_processor.base.models import ProcessingConfig
from document_processor.utils.file_utils import download_file, convert_to_pdf


  from .autonotebook import tqdm as notebook_tqdm


In [6]:

config = ProcessingConfig(
    parser_type="document_intelligence",
    chunking_strategy="recursive",
    chunk_augment_method="none",
    document_page_stitching=False,
)



In [None]:
config

ProcessingConfig(parser_type='vision', chunking_strategy='recursive', chunk_size=5000, chunk_overlap=500, markdown_headers=[('#', 'h1'), ('##', 'h2'), ('###', 'h3')], separators=None, document_page_stitching=False, chunk_augment_method='none', append_summary_to_chunks=False, use_iterative_reconstruction=False)

In [8]:
config.append_summary_to_chunks=True
config.use_iterative_reconstruction = True


In [9]:
processor = FileProcessor(config=config)


In [10]:
files = [
    {'id': '01HYSZYLJABAHBMSIOXRDZAGN4VJGSGA26', 'name': 'BIS Meeting Services Terminology.docx', 'etag': '"{160E0820-0E49-47BC-9019-BCAA4D23035E},7"', 'web_url': 'https://bisadaz.sharepoint.com/sites/prod-MeetingServicesGPT/_layouts/15/Doc.aspx?sourcedoc=%7B160E0820-0E49-47BC-9019-BCAA4D23035E%7D&file=BIS%20Meeting%20Services%20Terminology.docx&action=default&mobileredirect=true'},
]

for file_metadata in files:
    file_path = download_file(file_metadata)
    #file_path = Path("C:/Users/sa007769/Downloads/rag_etl/bis-gpt-rag-etl/intermediate_files/file1.txt")

    msg = f"processing file {file_metadata['name']}"
    #print(f"file_type :: {type(file_path)}")
    if config.parser_type == 'vision' and (file_path.suffix == ".docx" or file_path.suffix == ".pdf"):
        print(f" I am in vision :: file_path :: {file_path.as_posix()}")
        file_path = convert_to_pdf(file_path.as_posix())
        print(f" After I am in vision :: file_path :: {type(file_path)}")


    file_metadata["file_path"] = file_path



In [11]:
from azure.ai.documentintelligence import DocumentIntelligenceClient
from azure.core.credentials import AzureKeyCredential
from langchain_openai import AzureChatOpenAI, AzureOpenAIEmbeddings

from document_processor.base.models import MSDBEntry, MSDBMetadata, ProcessingConfig
from document_processor.utils.exceptions import ProcessingError, StorageError
from document_processor.parsers.factory import ParserFactory
from document_processor.chunkers.factory import ChunkerFactory
from document_processor.reconstruction.factory import ReconstructionAgentFactory
from document_processor.utils.keyword_generator import KeywordGenerator

In [12]:
file_type = file_path.suffix.lstrip(".").lower()
parser_type = processor._choose_parser_type_for_extension(file_path.suffix)

parser = ParserFactory.get_parser(
    file_type=file_type,
    parser_type=parser_type,
    di_client=processor.di_client,
    llm=processor.llm,
    config=processor.config
)

I am Creating DocxParser


In [13]:
markdown_content, n_unprocessed_images = parser.parse(
    file_path=file_path,
    file_metadata=file_metadata
)

In [14]:
print(markdown_content)

# BIS Meeting Services Terminology

CREATEDATE  \@ "dd MMMM yyyy"  \* MERGEFORMAT 17 June 2025

Accommodation refers to the accommodation provided to the event participants for their stays in Basel during the event they are participating.

The Bank or BIS refers to the Bank for International Settlements.

BIS premises refers to the buildings and spaces of BIS located in Basel, namely BIS Tower, Botta Building, Rex Building and Grütli Building.

Board Secretariat refers to the unit that manages Board relations and BIS shareholders' services and coordinates the organisation of central bank Governors' meetings.

Business unit refers to the separate divisions in BIS which have own business roles in line with the core business of BIS, and who are organising events by preparing event content and agenda, sending out invitations and works in cooperation with the Meeting Services on the event logistics.

CIBT refers to the agency providing visa services for BIS staff members, regarding their bu

In [15]:
from document_processor.chunkers.recursive_chunker import RecursiveChunker
from document_processor.chunkers.character_chunker import CharacterChunker
from document_processor.chunkers.markdown_chunker import MarkdownChunker
from document_processor.base.models import ProcessingConfig

In [16]:
# Test RecursiveChunker
recursive_chunker = RecursiveChunker(config=config)
recursive_chunks = recursive_chunker.split_text(markdown_content, metadata={"source": str(file_path)})
print("Recursive Chunker Results:")
#for i, chunk in enumerate(recursive_chunks):
#    print(f"Chunk {i + 1}: {chunk.page_content}")

Recursive Chunker Results:


In [17]:
recursive_chunks

[Document(metadata={'source': 'C:\\Users\\sa007769\\Downloads\\rag_etl\\bis-gpt-rag-etl\\intermediate_files\\BIS Meeting Services Terminology.docx'}, page_content='# BIS Meeting Services Terminology\n\nCREATEDATE  \\@ "dd MMMM yyyy"  \\* MERGEFORMAT 17 June 2025\n\nAccommodation refers to the accommodation provided to the event participants for their stays in Basel during the event they are participating.\n\nThe Bank or BIS refers to the Bank for International Settlements.\n\nBIS premises refers to the buildings and spaces of BIS located in Basel, namely BIS Tower, Botta Building, Rex Building and Grütli Building.\n\nBoard Secretariat refers to the unit that manages Board relations and BIS shareholders\' services and coordinates the organisation of\xa0central bank Governors\' meetings.\n\nBusiness unit refers to the separate divisions in BIS which have own business roles in line with the core business of BIS, and who are organising events by preparing event content and agenda, sending 

In [None]:
print(recursive_chunks[6].page_content)

In [18]:
reference_chunks = recursive_chunks.copy()

In [19]:
chunks_as_entries = processor._convert_chunks_to_entries(
    chunks=recursive_chunks,
    file_path=file_path,
    file_metadata=file_metadata
)

In [20]:
chunks_as_entries

[MSDBEntry(content='# BIS Meeting Services Terminology\n\nCREATEDATE  \\@ "dd MMMM yyyy"  \\* MERGEFORMAT 17 June 2025\n\nAccommodation refers to the accommodation provided to the event participants for their stays in Basel during the event they are participating.\n\nThe Bank or BIS refers to the Bank for International Settlements.\n\nBIS premises refers to the buildings and spaces of BIS located in Basel, namely BIS Tower, Botta Building, Rex Building and Grütli Building.\n\nBoard Secretariat refers to the unit that manages Board relations and BIS shareholders\' services and coordinates the organisation of\xa0central bank Governors\' meetings.\n\nBusiness unit refers to the separate divisions in BIS which have own business roles in line with the core business of BIS, and who are organising events by preparing event content and agenda, sending out invitations and works in cooperation with the Meeting Services on the event logistics.', metadata=MSDBMetadata(source='https://bisadaz.share

In [None]:
reference_chunks

In [21]:
from document_processor.reconstruction.combined_agent import CombinedReconstructionAgent
combined_agent = CombinedReconstructionAgent(llm=processor.llm, config=config)
combined_agent.reconstruct_chunks(chunks=chunks_as_entries,
    original_content=markdown_content,
    filename="test_document")


I am in CombinedReconstructionAgent
I am in generate summary DI
I am in generate summary DI - Header Resume
I am in Augment chunk with summary DI
I am in Augment chunk with summary DI
I am in Augment chunk with summary DI
I am in Augment chunk with summary DI
I am in Augment chunk with summary DI
I am in Augment chunk with summary DI
I am in Augment chunk with summary DI
I am in Augment chunk with summary DI
I am in Augment chunk with summary DI
I am in Augment chunk with summary DI
I am in Augment chunk with summary DI
I am in Augment chunk with summary DI
I am in Augment chunk with summary DI
I am in iterative improvement agent


[MSDBEntry(content='# BIS Meeting Services Terminology\n\nCREATEDATE  \\@ "dd MMMM yyyy"  \\* MERGEFORMAT 17 June 2025\n\n## Document Context\nThis glossary provides definitions of key terms related to BIS Meeting Services, covering event logistics, roles, systems, facilities, and participant services. It is intended to support BIS staff, contractors, and event participants in understanding the terminology used in the planning, execution, and management of events hosted or co-hosted by the Bank for International Settlements (BIS).\n\n## Glossary\n\n### Accommodation\nAccommodation refers to the lodging provided to event participants for their stays in Basel during the event they are attending.\n\n### The Bank or BIS\nThe Bank or BIS refers to the Bank for International Settlements.\n\n### BIS Premises\nBIS premises refers to the buildings and spaces of BIS located in Basel, namely BIS Tower, Botta Building, Rex Building, and Grütli Building. These premises are used for hosting events, 

In [36]:
from document_processor.reconstruction.null_agent import NullReconstructionAgent

null_agent = NullReconstructionAgent()

result = null_agent.reconstruct_chunks(
    chunks=chunks_as_entries,
    original_content=markdown_content
)

In [37]:
result

 MSDBEntry(content="**The Bank or BIS**  \nRefers to the Bank for International Settlements.  \n\n**BIS premises**  \nRefers to the buildings and spaces of BIS located in Basel, namely BIS Tower, Botta Building, Rex Building, and Grütli Building.  \n\n**Board Secretariat**  \nRefers to the unit that manages Board relations and BIS shareholders' services and coordinates the organisation of central bank Governors' meetings.  \n\n**Business unit**  \nRefers to the separate divisions in BIS which have their own business roles in line with the core business of BIS, and who are organising events by preparing event content and agenda, sending out invitations, and working in cooperation with the Meeting Services on the event logistics.  \n\n**CIBT**  \nRefers to the agency providing visa services for BIS staff members, regarding their business and training travels.", metadata=MSDBMetadata(source='https://bisadaz.sharepoint.com/sites/prod-MeetingServicesGPT/_layouts/15/Doc.aspx?sourcedoc=%7B160

In [None]:
from document_processor.reconstruction.summary_agent import SummaryAgent
summary_agent = SummaryAgent(llm=processor.llm, config=config)

result = summary_agent.reconstruct_chunks(
    chunks=chunks_as_entries,
    original_content=markdown_content,
    filename="test_document"
)

In [None]:
from IPython.display import Markdown
Markdown(result[0].content)

In [None]:
print(result[0].content)

In [None]:
print(reference_chunks[0].page_content)

In [None]:
chunks_as_entries[0].metadata.file_name

In [None]:
from document_processor.reconstruction.iterative_agent import IterativeReconstructionAgent

config = ProcessingConfig()
iterative_agent = IterativeReconstructionAgent(llm=processor.llm, config=config)

result = iterative_agent.reconstruct_chunks(
    chunks=chunks_as_entries,
    original_content=markdown_content
)

In [None]:
result[0].metadata

In [None]:
chunks_as_entries

In [None]:
print(result[0].content)

## Testing code -- import config from app registry

In [33]:
# Auto-reload modules when they change
%load_ext autoreload
%autoreload 2

import sys
import os
project_root = r"C:\Users\sa007769\Downloads\rag_etl\bis-gpt-rag-etl"
if project_root not in sys.path:
    sys.path.insert(0, project_root)
os.chdir(project_root)

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [34]:

import logging
import sys
from ETL.document_processor.base.models import ProcessingConfig
from ETL.tools.registry_utils import get_etl_sources
from ETL.tools.settings import rag_app_settings
from ETL.tools.registry_utils import get_etl_sources


# Configure logging to write to stdout
logging.basicConfig(
    level=logging.INFO,  # Set the logging level
    format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
    handlers=[
        logging.StreamHandler(sys.stdout),  # Log to stdout
    ],
)
logger = logging.getLogger(__name__)


# Get app_id from settings
app_id = rag_app_settings.app_id
logger.info(f"Loading configuration for app_id: {app_id}")


# Fetch all ETL configurations from App Registry API
try:
    all_etl_configs = get_etl_sources()
    logger.info(f"Retrieved {len(all_etl_configs)} ETL configurations from App Registry")
except Exception as e:
    logger.error(f"Failed to retrieve ETL configurations from App Registry: {e}")
    logger.error("Cannot proceed without ETL configuration. Exiting...")
    sys.exit(1)


# Filter configuration for current app_id
filtered_configs = [item for item in all_etl_configs if item.applicationId == app_id]

# Create ProcessingConfig from registry values
if filtered_configs:
    registry_config = filtered_configs[0]
    logger.info(f"Found ETL configuration for app_id {app_id}")
    logger.info(f"  - applicationName: {registry_config.applicationName}")
    logger.info(f"  - parserType: {registry_config.parserType}")
    logger.info(f"  - chunkerType: {registry_config.chunkerType}")
    logger.info(f"  - chunkAugmentationMethod: {registry_config.chunkAugmentationMethod}")
    
    # Normalize chunk_augmentation_method: convert 'None' or None to 'none'
    chunk_method = registry_config.chunkAugmentationMethod
    if chunk_method in [None, 'None']:
        chunk_method = 'none'
        logger.info(f"  - Normalized chunkAugmentationMethod from '{registry_config.chunkAugmentationMethod}' to 'none'")
    
    config1 = ProcessingConfig(
        parser_type=registry_config.parserType or 'document_intelligence',
        chunking_strategy=registry_config.chunkerType or 'recursive',
        chunk_augment_method=chunk_method,
        document_page_stitching=True,
    )
    
    logger.info(f"ProcessingConfig created successfully:")
    logger.info(f"  - parser_type: {config1.parser_type}")
    logger.info(f"  - chunking_strategy: {config1.chunking_strategy}")
    logger.info(f"  - chunk_augment_method: {config1.chunk_augment_method}")
    logger.info(f"  - append_summary_to_chunks: {config1.append_summary_to_chunks}")
    logger.info(f"  - use_iterative_reconstruction: {config1.use_iterative_reconstruction}")
    logger.info(f"  - document_page_stitching: {config1.document_page_stitching}")
    
else:
    # No matching configuration found for app_id
    logger.warning(f"No ETL configuration found for app_id {app_id}")
    logger.warning(f"Available applicationIds: {[item.applicationId for item in all_etl_configs]}")
    
    # Option 1: Use default configuration (current behavior)
    logger.warning("Using default configuration")
    config1 = ProcessingConfig(
        chunking_strategy="recursive",
        parser_type='document_intelligence',
        chunk_augment_method='none',
        document_page_stitching=False,
    )
    logger.info("Using default ProcessingConfig")

2025-12-17 17:02:31,963 - __main__ - INFO - Loading configuration for app_id: 6
2025-12-17 17:02:31,964 - ETL.tools.registry_utils - INFO - Fetching ETL configurations from registry: http://localhost:8000/api/listETLConfigurations
2025-12-17 17:02:31,981 - ETL.tools.registry_utils - INFO - Successfully fetched 3 ETL configurations
2025-12-17 17:02:31,982 - __main__ - INFO - Retrieved 3 ETL configurations from App Registry
2025-12-17 17:02:31,983 - __main__ - INFO - Found ETL configuration for app_id 6
2025-12-17 17:02:31,983 - __main__ - INFO -   - applicationName: Demo RAG
2025-12-17 17:02:31,984 - __main__ - INFO -   - parserType: vision
2025-12-17 17:02:31,984 - __main__ - INFO -   - chunkerType: recursive
2025-12-17 17:02:31,985 - __main__ - INFO -   - chunkAugmentationMethod: chunk_reconstruction
2025-12-17 17:02:31,985 - __main__ - INFO - ProcessingConfig created successfully:
2025-12-17 17:02:31,986 - __main__ - INFO -   - parser_type: vision
2025-12-17 17:02:31,987 - __main__ -

In [36]:
from ETL.nodes.process_new_files import process_new_files

In [31]:
config1.document_page_stitching = False
config1.use_iterative_reconstruction = False
config1.append_summary_to_chunks = True
config1.parser_type = "vision"


files = [
    {'id': '01HYSZYLJABAHBMSIOXRDZAGN4VJGSGA26', 'name': 'BIS Meeting Services Terminology.docx', 'etag': '"{160E0820-0E49-47BC-9019-BCAA4D23035E},7"', 'web_url': 'https://bisadaz.sharepoint.com/sites/prod-MeetingServicesGPT/_layouts/15/Doc.aspx?sourcedoc=%7B160E0820-0E49-47BC-9019-BCAA4D23035E%7D&file=BIS%20Meeting%20Services%20Terminology.docx&action=default&mobileredirect=true'},
]

data = process_new_files(files=files, config=config1)

2025-12-17 16:50:50,817 - ETL.document_processor.chunkers.recursive_chunker - INFO - Initialized RecursiveChunker with chunk_size=5000, chunk_overlap=500
2025-12-17 16:50:50,818 - ETL.document_processor.reconstruction.factory - INFO - Creating SummaryAgent
2025-12-17 16:50:50,819 - ETL.document_processor.main_processor.file_processor - INFO - Initialized FileProcessor | Parser: vision | Chunker: recursive | append_summary=True | iterative_reconstruction=False


  0%|          | 0/1 [00:00<?, ?it/s]

2025-12-17 16:50:52,217 - ETL.document_processor.utils.file_utils - INFO - File BIS Meeting Services Terminology downloaded successfully!
2025-12-17 16:50:52,220 - ETL.nodes.process_new_files - INFO - processing file BIS Meeting Services Terminology.docx
 I am in vision :: file_path :: C:/Users/sa007769/Downloads/rag_etl/bis-gpt-rag-etl/intermediate_files/BIS Meeting Services Terminology.docx


100%|██████████| 1/1 [00:03<00:00,  3.59s/it]

Successfully converted C:/Users/sa007769/Downloads/rag_etl/bis-gpt-rag-etl/intermediate_files/BIS Meeting Services Terminology.docx to C:\Users\sa007769\Downloads\rag_etl\bis-gpt-rag-etl\intermediate_files\BIS Meeting Services Terminology.pdf and removed the original file.
 After I am in vision :: file_path :: <class 'pathlib.WindowsPath'>
2025-12-17 16:50:58,964 - ETL.document_processor.main_processor.file_processor - INFO - Processing file: BIS Meeting Services Terminology.pdf (.pdf)





I am Creating VisionParser
Processing image 1/3
2025-12-17 16:51:09,514 - httpx - INFO - HTTP Request: POST https://api-dev.bisinfo.org/int/its/v1/ai-services/interactive/openai/deployments/gpt4o/chat/completions?api-version=2025-04-01-preview "HTTP/1.1 200 OK"
Processing image 2/3
2025-12-17 16:51:21,452 - httpx - INFO - HTTP Request: POST https://api-dev.bisinfo.org/int/its/v1/ai-services/interactive/openai/deployments/gpt4o/chat/completions?api-version=2025-04-01-preview "HTTP/1.1 200 OK"
Processing image 3/3
2025-12-17 16:51:33,929 - httpx - INFO - HTTP Request: POST https://api-dev.bisinfo.org/int/its/v1/ai-services/interactive/openai/deployments/gpt4o/chat/completions?api-version=2025-04-01-preview "HTTP/1.1 200 OK"
2025-12-17 16:51:33,931 - ETL.document_processor.main_processor.file_processor - INFO - Reconstructing chunks...
I am in generate summary vision
I am in generate summary Vision - generate document summary stuff
2025-12-17 16:51:38,685 - httpx - INFO - HTTP Request: PO

100%|██████████| 1/1 [00:47<00:00, 47.87s/it]
