In [None]:
import requests
from bs4 import BeautifulSoup
import time
import pandas as pd
#from aws_s3 import save_image, download_pdf
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.service import Service as ChromeService
from selenium.webdriver.chrome.options import Options

# Base URL for the first page
base_url = "https://rpc.cfainstitute.org/en/research-foundation/publications"

# Base domain to construct full URLs for PDFs and images
base_domain = "https://rpc.cfainstitute.org"

# Default alternative image URL
alternative_image_url = "https://media.istockphoto.com/id/1352945762/vector/no-image-available-like-missing-picture.jpg?s=612x612&w=0&k=20&c=4X-znbt02a8EIdxwDFaxfmKvUhTnLvLMv1i1f3bToog="

# Initialize the Selenium WebDriver
chrome_options = Options()
chrome_options.add_argument("--headless")
# chrome_options.add_argument("--no-sandbox")
# chrome_options.add_argument("--disable-dev-shm-usage")

# Connect to the remote Selenium server
driver = webdriver.Chrome(
    options=chrome_options
)

In [None]:
# Define a global ID counter outside the functions
global_id_counter = 1

def get_publications_from_page(page_url):
    global global_id_counter  # Declare as global to update across function calls

    # List to store publication data
    publications_data = []
    # Load the page using Selenium
    driver.get(page_url)
    time.sleep(5)  # Wait for the page to fully load

    # Get the page content and pass it to BeautifulSoup
    soup = BeautifulSoup(driver.page_source, 'html.parser')
    publications = soup.find_all('div', class_='coveo-list-layout CoveoResult')
    publications=publications[:3]

    for publication in publications:
        # Get the title and link
        title_tag = publication.find('h4', class_='coveo-title').find('a', class_='CoveoResultLink')
        if title_tag:
            title = title_tag.text.strip()
            publication_link = title_tag['href']
        else:
            print(f"No title available for publication: {len(publications_data)+1}")
            continue

        # Extract the summary text
        summary_tag = publication.find('div', class_='result-body')
        summary = summary_tag.text.strip() if summary_tag else "No summary available"

        # Extract the image URL
        result_link_div = publication.find('div', class_='result-link')
        if result_link_div:
            image_tag = result_link_div.find('img', class_='coveo-result-image')
            image_url = base_domain + image_tag['src'] if image_tag else alternative_image_url
            image_path = save_image(title, image_url)
        else:
            image_url = alternative_image_url
            image_path = save_image(title, image_url)

        # Visit the publication page to extract the PDF link
        driver.get(publication_link)
        time.sleep(3)
        publication_soup = BeautifulSoup(driver.page_source, 'html.parser')
        pdf_link_tag = publication_soup.find('a', href=lambda href: href and href.endswith('.pdf'))
        pdf_url = base_domain + pdf_link_tag['href'] if pdf_link_tag else "No PDF found"
        pdf_path = download_pdf(title, pdf_url)

        # Append data to publications_data
        publications_data.append({
            "ID": global_id_counter,
            "Title": title,
            "Summary": summary,
            "Image Path": image_path,
            "PDF Path": pdf_path
        })

        # Output the extracted information
        print(f"ID:{global_id_counter}")
        print(f"Title: {title}")
        print(f"Summary: {summary}")
        print(f"Image Path: {image_path}")
        print(f"Publication Link: {publication_link}")
        print(f"PDF Path: {pdf_path}")
        print("-" * 100)
        global_id_counter += 1  # Increment the global ID counter
    # Convert the publications_data list to a DataFrame
    publications_df = pd.DataFrame(publications_data, columns=["ID", "Title", "Summary", "Image Path", "PDF Path"])
    return publications_df





In [None]:
import os
import requests

def download_pdf(title, pdf_url):
    try:
        response = requests.get(pdf_url)
        response.raise_for_status()

        if response.content:  # Ensure response has content
            # Define local path for saving the PDF
            local_path = f"pdfs/{sanitize_filename(title)}.pdf"
            os.makedirs(os.path.dirname(local_path), exist_ok=True)
            
            # Write content to a local PDF file
            with open(local_path, "wb") as pdf_file:
                pdf_file.write(response.content)
                
            print(f"PDF saved locally as {local_path}")
            return local_path
        else:
            print(f"No content in PDF for {title}.")
            return ""
    except Exception as e:
        print(f"Failed to download PDF for {title}. Error: {e}")
        return ""

def save_image(title, image_url):
    try:
        response = requests.get(image_url)
        response.raise_for_status()
        
        if response.content:  # Ensure response has content
            # Define local path for saving the image
            local_path = f"images/{sanitize_filename(title)}.jpg"
            os.makedirs(os.path.dirname(local_path), exist_ok=True)
            
            # Write content to a local image file
            with open(local_path, "wb") as image_file:
                image_file.write(response.content)
                
            print(f"Image saved locally as {local_path}")
            return local_path
        else:
            print(f"No content in image for {title}.")
            return ""
    except Exception as e:
        print(f"Failed to save image for {title}. Error: {e}")
        return ""

def sanitize_filename(name):
    return "".join([c if c.isalnum() or c in " ._-()" else "_" for c in name])


In [None]:
def scrape_publications():
    total_pages = 1  # Adjust the number of pages
    all_publications_df = pd.DataFrame(columns=["ID", "Title", "Summary", "Image Path", "PDF Path"])
    for page_number in range(1, total_pages + 1):
        page_url = f"{base_url}#first={(page_number - 1) * 10}"
        print(f"\n{'-'*100}\nScraping page {page_number}: {page_url}\n{'-'*100}\n")
        # Get the DataFrame for each page and append it to the main DataFrame
        page_df = get_publications_from_page(page_url)
        all_publications_df = pd.concat([all_publications_df, page_df], ignore_index=True)

    # all_publications_df.to_csv("publications_data.csv", index=False)
    # print("Data saved to publications_data.csv")
    return all_publications_df

# Close the Selenium browser after scraping
def close_driver():
    driver.quit()


In [None]:
df= scrape_publications()
close_driver()
df

In [None]:
from docling.document_converter import DocumentConverter

source = "./pdfs/Beyond Active and Passive Investing_ The Customization of Finance.pdf"  # PDF path or URL
converter = DocumentConverter()
result = converter.convert(source)
markdown_content=result.document.export_to_markdown()

def save_markdown_to_readme(markdown_content, output_path="./extracted.md"):
    # Write the Markdown content to README.md
    with open(output_path, "w") as file:
        file.write(markdown_content)
    print(f"Markdown content saved to {output_path}")

save_markdown_to_readme(markdown_content)

In [None]:
import json
import logging
import time
from pathlib import Path
from typing import Iterable

from docling.datamodel.base_models import ConversionStatus
from docling.datamodel.document import ConversionResult
from docling.datamodel.settings import settings
from docling.document_converter import DocumentConverter, PdfFormatOption
from docling.datamodel.pipeline_options import PdfPipelineOptions
#from docling.datamodel.base_models import ImageRefMode, PictureItem, TableItem
from docling.datamodel import base_models
# Setup logging
logging.basicConfig(level=logging.INFO)
_log = logging.getLogger(__name__)

# Set image resolution for extracted images
IMAGE_RESOLUTION_SCALE = 2.0

def export_images_and_tables(conv_res, output_dir, doc_filename):
    """Save images and tables from the document."""
    # Create output directory if not exists
    output_dir.mkdir(parents=True, exist_ok=True)

    # Save page images
    for page_no, page in conv_res.document.pages.items():
        page_image_filename = output_dir / f"{doc_filename}-page-{page_no}.png"
        with page_image_filename.open("wb") as fp:
            page.image.pil_image.save(fp, format="PNG")

    # Save images of figures and tables
    table_counter = 0
    picture_counter = 0
    for element, _ in conv_res.document.iterate_items():
        if isinstance(element, base_models.TableItem):
            table_counter += 1
            table_image_filename = output_dir / f"{doc_filename}-table-{table_counter}.png"
            with table_image_filename.open("wb") as fp:
                element.image.pil_image.save(fp, "PNG")
        elif isinstance(element, base_models.PictureItem):
            picture_counter += 1
            picture_image_filename = output_dir / f"{doc_filename}-picture-{picture_counter}.png"
            with picture_image_filename.open("wb") as fp:
                element.image.pil_image.save(fp, "PNG")

def export_documents(conv_results: Iterable[ConversionResult], output_dir: Path):
    """Export document metadata and images for each document."""
    output_dir.mkdir(parents=True, exist_ok=True)

    # Track the status of document conversions
    success_count = 0
    failure_count = 0
    partial_success_count = 0

    for conv_res in conv_results:
        doc_filename = conv_res.input.file.stem

        if conv_res.status == ConversionStatus.SUCCESS:
            success_count += 1

            # Export document to JSON
            with (output_dir / f"{doc_filename}.json").open("w") as fp:
                json.dump(conv_res.document.export_to_dict(), fp)

            # Export document to Markdown with embedded images
            #with (output_dir / f"{doc_filename}.md").open("w") as fp:
            #    fp.write(conv_res.document.export_to_markdown(image_mode=base_models.ImageRefMode.EMBEDDED))

            # Save images and tables
            export_images_and_tables(conv_res, output_dir / "images", doc_filename)

        elif conv_res.status == ConversionStatus.PARTIAL_SUCCESS:
            partial_success_count += 1
            _log.warning(f"Partial success for {doc_filename} with errors:")
            for item in conv_res.errors:
                _log.warning(f"\t{item.error_message}")

        else:
            failure_count += 1
            _log.error(f"Failed to convert {doc_filename}.")

    # Logging final counts of conversion results
    _log.info(f"Processed {success_count + partial_success_count + failure_count} documents.")
    _log.info(f"{success_count} successful, {partial_success_count} partially successful, {failure_count} failed.")

    return success_count, partial_success_count, failure_count

def main():
    # Path to your PDF files
    input_doc_paths = [
        Path("./pdfs/Beyond Active and Passive Investing_ The Customization of Finance.pdf"),
        Path("./pdfs/Investment Model Validation_ A Guide for Practitioners.pdf"),
        Path("./pdfs/The Economics of Private Equity_ A Critical Review.pdf")
    ]

    # Configure PDF conversion options for images
    pipeline_options = PdfPipelineOptions()
    pipeline_options.images_scale = IMAGE_RESOLUTION_SCALE
    pipeline_options.generate_page_images = True
    pipeline_options.generate_table_images = True
    pipeline_options.generate_picture_images = True

    # Initialize the DocumentConverter with custom pipeline options
    doc_converter = DocumentConverter(
        format_options={
            "pdf": PdfFormatOption(pipeline_options=pipeline_options)
        }
    )

    # Start batch conversion
    start_time = time.time()
    conv_results = doc_converter.convert_all(input_doc_paths, raises_on_error=False)
    export_documents(conv_results, output_dir=Path("output"))

    # Log time taken for conversion
    end_time = time.time() - start_time
    _log.info(f"Batch processing complete in {end_time:.2f} seconds.")

if __name__ == "__main__":
    main()

In [None]:
import json
import logging
import time
from pathlib import Path
from typing import Iterable

from docling.datamodel.base_models import ConversionStatus
from docling.datamodel.document import ConversionResult
from docling.datamodel.settings import settings
from docling.document_converter import DocumentConverter, PdfFormatOption
from docling.datamodel.pipeline_options import PdfPipelineOptions
#from docling.datamodel.base_models import ImageRefMode, PictureItem, TableItem
from docling.datamodel import base_models
# Setup logging
logging.basicConfig(level=logging.INFO)
_log = logging.getLogger(__name__)

# Set image resolution for extracted images
IMAGE_RESOLUTION_SCALE = 2.0

def export_images_and_tables(conv_res, output_dir, doc_filename):
    """Save images and tables from the document."""
    # Create output directory if not exists
    output_dir.mkdir(parents=True, exist_ok=True)

    # Save page images
    for page_no, page in conv_res.document.pages.items():
        page_image_filename = output_dir / f"{doc_filename}-page-{page_no}.png"
        with page_image_filename.open("wb") as fp:
            page.image.pil_image.save(fp, format="PNG")

    # Save images of figures and tables
    table_counter = 0
    picture_counter = 0
    for element, _ in conv_res.document.iterate_items():
        if isinstance(element, base_models.TableItem):
            table_counter += 1
            table_image_filename = output_dir / f"{doc_filename}-table-{table_counter}.png"
            with table_image_filename.open("wb") as fp:
                element.image.pil_image.save(fp, "PNG")
        elif isinstance(element, base_models.PictureItem):
            picture_counter += 1
            picture_image_filename = output_dir / f"{doc_filename}-picture-{picture_counter}.png"
            with picture_image_filename.open("wb") as fp:
                element.image.pil_image.save(fp, "PNG")

def export_documents(conv_results: Iterable[ConversionResult], output_dir: Path):
    """Export document metadata and images for each document."""
    output_dir.mkdir(parents=True, exist_ok=True)

    success_count = 0
    failure_count = 0
    partial_success_count = 0

    for conv_res in conv_results:
        doc_filename = conv_res.input.file.stem

        if conv_res.status == ConversionStatus.SUCCESS:
            success_count += 1

            # Export document to JSON
            with (output_dir / f"{doc_filename}.json").open("w") as fp:
                json.dump(conv_res.document.export_to_dict(), fp)

            # Export document to Markdown, fallback if ImageRefMode.EMBEDDED is unavailable
            with (output_dir / f"{doc_filename}.md").open("w") as fp:
                try:
                    fp.write(conv_res.document.export_to_markdown(image_mode=base_models.ImageRefMode.EMBEDDED))
                except AttributeError:
                    _log.warning(f"Failed to use ImageRefMode.EMBEDDED for {doc_filename}. Exporting without embedded images.")
                    fp.write(conv_res.document.export_to_markdown())

            # Save images and tables
            export_images_and_tables(conv_res, output_dir / "images", doc_filename)

        elif conv_res.status == ConversionStatus.PARTIAL_SUCCESS:
            partial_success_count += 1
            _log.warning(f"Partial success for {doc_filename} with errors:")
            for item in conv_res.errors:
                _log.warning(f"\t{item.error_message}")

        else:
            failure_count += 1
            _log.error(f"Failed to convert {doc_filename}.")

    _log.info(f"Processed {success_count + partial_success_count + failure_count} documents.")
    _log.info(f"{success_count} successful, {partial_success_count} partially successful, {failure_count} failed.")

    return success_count, partial_success_count, failure_count


def main():
    # Path to your PDF files
    input_doc_paths = [
        Path("./pdfs/Beyond Active and Passive Investing_ The Customization of Finance.pdf"),
        Path("./pdfs/Investment Model Validation_ A Guide for Practitioners.pdf"),
        Path("./pdfs/The Economics of Private Equity_ A Critical Review.pdf")
    ]

    # Configure PDF conversion options for images
    pipeline_options = PdfPipelineOptions()
    pipeline_options.images_scale = IMAGE_RESOLUTION_SCALE
    pipeline_options.generate_page_images = True
    pipeline_options.generate_table_images = True
    pipeline_options.generate_picture_images = True

    # Initialize the DocumentConverter with custom pipeline options
    doc_converter = DocumentConverter(
        format_options={
            "pdf": PdfFormatOption(pipeline_options=pipeline_options)
        }
    )

    # Start batch conversion
    start_time = time.time()
    conv_results = doc_converter.convert_all(input_doc_paths, raises_on_error=False)
    export_documents(conv_results, output_dir=Path("output"))

    # Log time taken for conversion
    end_time = time.time() - start_time
    _log.info(f"Batch processing complete in {end_time:.2f} seconds.")

if __name__ == "__main__":
    main()

In [None]:
import json
import logging
import time
from pathlib import Path

from docling.datamodel.base_models import ConversionStatus
from docling.datamodel.document import ConversionResult
from docling.datamodel.settings import settings
from docling.document_converter import DocumentConverter, PdfFormatOption
from docling.datamodel.pipeline_options import PdfPipelineOptions
#from docling.datamodel.base_models import ImageRefMode, PictureItem, TableItem
from docling.datamodel import base_models
# Setup logging
logging.basicConfig(level=logging.INFO)
_log = logging.getLogger(__name__)

# Set image resolution for extracted images
IMAGE_RESOLUTION_SCALE = 2.0

def export_images_and_tables(conv_res, output_dir, doc_filename):
    """Save images and tables from the document."""
    # Create output directory if it doesn't exist
    output_dir.mkdir(parents=True, exist_ok=True)

    # Save page images
    for page_no, page in conv_res.document.pages.items():
        page_image_filename = output_dir / f"{doc_filename}-page-{page_no}.png"
        with page_image_filename.open("wb") as fp:
            page.image.pil_image.save(fp, format="PNG")

    # Save images of figures and tables
    table_counter = 0
    picture_counter = 0
    for element, _ in conv_res.document.iterate_items():
        if isinstance(element, base_models.TableItem):
            table_counter += 1
            table_image_filename = output_dir / f"{doc_filename}-table-{table_counter}.png"
            with table_image_filename.open("wb") as fp:
                element.image.pil_image.save(fp, "PNG")
        elif isinstance(element, base_models.PictureItem):
            picture_counter += 1
            picture_image_filename = output_dir / f"{doc_filename}-picture-{picture_counter}.png"
            with picture_image_filename.open("wb") as fp:
                element.image.pil_image.save(fp, "PNG")

def export_document(conv_res, output_dir):
    """Export document metadata and images for a single document."""
    output_dir.mkdir(parents=True, exist_ok=True)

    doc_filename = conv_res.input.file.stem

    if conv_res.status == ConversionStatus.SUCCESS:
        # Export document to JSON
        with (output_dir / f"{doc_filename}.json").open("w") as fp:
            json.dump(conv_res.document.export_to_dict(), fp)

        # Export document to Markdown with embedded images
        with (output_dir / f"{doc_filename}.md").open("w") as fp:
            fp.write(conv_res.document.export_to_markdown(image_mode=base_models.ImageRefMode.EMBEDDED))

        # Save images and tables
        export_images_and_tables(conv_res, output_dir / "images", doc_filename)
        _log.info(f"Successfully processed and exported '{doc_filename}'")

    elif conv_res.status == ConversionStatus.PARTIAL_SUCCESS:
        _log.warning(f"Partial success for '{doc_filename}' with errors:")
        for item in conv_res.errors:
            _log.warning(f"\t{item.error_message}")
    else:
        _log.error(f"Failed to convert '{doc_filename}'.")

def main():
    # Path to your PDF files
    input_doc_paths = [
        Path("./pdfs/Beyond Active and Passive Investing_ The Customization of Finance.pdf"),
        Path("./pdfs/Investment Model Validation_ A Guide for Practitioners.pdf"),
        Path("./pdfs/The Economics of Private Equity_ A Critical Review.pdf")
    ]

    # Configure PDF conversion options for images
    pipeline_options = PdfPipelineOptions()
    pipeline_options.images_scale = IMAGE_RESOLUTION_SCALE
    pipeline_options.generate_page_images = False
    pipeline_options.generate_table_images = True
    pipeline_options.generate_picture_images = True

    # Initialize the DocumentConverter with custom pipeline options
    doc_converter = DocumentConverter(
        format_options={
            "pdf": PdfFormatOption(pipeline_options=pipeline_options)
        }
    )

    # Process each document individually in a loop
    for pdf_path in input_doc_paths:
        start_time = time.time()
        print(f"-"*40)
        print(f"PDF PATH:{pdf_path}\n")
        _log.info(f"Processing '{pdf_path.name}'...")

        # Convert the document
        conv_res = doc_converter.convert(pdf_path)

        # Export the document content, images, and tables
        export_document(conv_res, output_dir=Path("output"))

        # Log time taken for each document
        end_time = time.time() - start_time
        _log.info(f"Finished processing '{pdf_path.name}' in {end_time:.2f} seconds.")

if __name__ == "__main__":
    main()


In [None]:
import json
import logging
import time
from pathlib import Path
from typing import Iterable

from docling.datamodel.base_models import ConversionStatus, FigureElement, InputFormat, Table
from docling.datamodel.document import ConversionResult
from docling.datamodel.settings import settings
from docling.document_converter import DocumentConverter, PdfFormatOption
from docling_core.types.doc import ImageRefMode, PictureItem, TableItem
from docling.datamodel.pipeline_options import PdfPipelineOptions


_log = logging.getLogger(__name__)
USE_V2 = True
IMAGE_RESOLUTION_SCALE = 2.0

In [None]:
def export_documents(
    conv_results: Iterable[ConversionResult],
    output_dir: Path,
):
    output_dir.mkdir(parents=True, exist_ok=True)

    success_count = 0
    failure_count = 0
    partial_success_count = 0

    for conv_res in conv_results:
        if conv_res.status == ConversionStatus.SUCCESS:
            success_count += 1
            doc_filename = conv_res.input.file.stem

            if USE_V2:
                # Export Docling document format to JSON:
                with (output_dir / f"{doc_filename}.json").open("w") as fp:
                    fp.write(json.dumps(conv_res.document.export_to_dict()))

                # Export Docling document format to markdown:
                with (output_dir / f"{doc_filename}.md").open("w") as fp:
                    fp.write(conv_res.document.export_to_markdown())

        elif conv_res.status == ConversionStatus.PARTIAL_SUCCESS:
            _log.info(
                f"Document {conv_res.input.file} was partially converted with the following errors:"
            )
            for item in conv_res.errors:
                _log.info(f"\t{item.error_message}")
            partial_success_count += 1
        else:
            _log.info(f"Document {conv_res.input.file} failed to convert.")
            failure_count += 1

    _log.info(
        f"Processed {success_count + partial_success_count + failure_count} docs, "
        f"of which {failure_count} failed "
        f"and {partial_success_count} were partially converted."
    )
    return success_count, partial_success_count, failure_count

In [None]:
def main():
    logging.basicConfig(level=logging.INFO)

    input_doc_paths = [
        Path("./pdfs/Beyond Active and Passive Investing_ The Customization of Finance.pdf"),
        Path("./pdfs/Investment Model Validation_ A Guide for Practitioners.pdf"),
        Path("./pdfs/The Economics of Private Equity_ A Critical Review.pdf")
    ]

    # buf = BytesIO(Path("./test/data/2206.01062.pdf").open("rb").read())
    # docs = [DocumentStream(name="my_doc.pdf", stream=buf)]
    # input = DocumentConversionInput.from_streams(docs)

    # # Turn on inline debug visualizations:
    # settings.debug.visualize_layout = True
    # settings.debug.visualize_ocr = True
    # settings.debug.visualize_tables = True
    # settings.debug.visualize_cells = True
    pipeline_options = PdfPipelineOptions()
    pipeline_options.images_scale = IMAGE_RESOLUTION_SCALE
    pipeline_options.generate_page_images = False
    pipeline_options.generate_table_images = True
    pipeline_options.generate_picture_images = True

    doc_converter = DocumentConverter(
        format_options={
            InputFormat.PDF: PdfFormatOption(pipeline_options=pipeline_options)
        }
    )

    start_time = time.time()

    conv_results = doc_converter.convert_all(
        input_doc_paths,
        raises_on_error=False,  # to let conversion run through all and examine results at the end
    )
    
    success_count, partial_success_count, failure_count = export_documents(
        conv_results, output_dir=Path("Output")
    )

    end_time = time.time() - start_time

    _log.info(f"Document conversion complete in {end_time:.2f} seconds.")

    if failure_count > 0:
        raise RuntimeError(
            f"The example failed converting {failure_count} on {len(input_doc_paths)}."
        )

In [None]:
if __name__ == "__main__":
    main()

In [4]:
import json
import logging
import time
from pathlib import Path
from typing import Iterable


from docling.datamodel.base_models import ConversionStatus,InputFormat
from docling.datamodel.document import ConversionResult
from docling.datamodel.pipeline_options import PdfPipelineOptions
from docling.datamodel.settings import settings
from docling.document_converter import DocumentConverter, PdfFormatOption
from docling_core.types.doc import ImageRefMode, PictureItem, TableItem

_log = logging.getLogger(__name__)

IMAGE_RESOLUTION_SCALE = 2.0
USE_V2 = True

def export_documents(
    conv_results: Iterable[ConversionResult],
    output_dir: Path,
):
    # Define paths for markdown and images
    markdown_output_dir = output_dir
    images_output_dir = output_dir / "images"

    # Ensure directories exist
    markdown_output_dir.mkdir(parents=True, exist_ok=True)
    images_output_dir.mkdir(parents=True, exist_ok=True)

    success_count = 0
    failure_count = 0
    partial_success_count = 0

    for conv_res in conv_results:
        if conv_res.status == ConversionStatus.SUCCESS:
            success_count += 1
            doc_filename = conv_res.input.file.stem

            # Export JSON, YAML, doctags, markdown, and text
            # if USE_V2:
            #     with (output_dir / f"{doc_filename}.json").open("w") as fp:
            #         fp.write(json.dumps(conv_res.document.export_to_dict()))

            #     # with (output_dir / f"{doc_filename}.yaml").open("w") as fp:
            #     #     fp.write(yaml.safe_dump(conv_res.document.export_to_dict()))

            #     # with (output_dir / f"{doc_filename}.doctags.txt").open("w") as fp:
            #     #     fp.write(conv_res.document.export_to_document_tokens())

            #     with (output_dir / f"{doc_filename}.md").open("w") as fp:
            #         fp.write(conv_res.document.export_to_markdown())

            #     # with (output_dir / f"{doc_filename}.txt").open("w") as fp:
            #     #     fp.write(conv_res.document.export_to_markdown(strict_text=True))

            # Export images for each page, table, and figure
            # for page_no, page in conv_res.document.pages.items():
            #     page_image_filename = output_dir / f"{doc_filename}-page-{page_no}.png"
            #     with page_image_filename.open("wb") as fp:
            #         page.image.pil_image.save(fp, format="PNG")

            table_counter = 0
            picture_counter = 0
            for element, _ in conv_res.document.iterate_items():
                if isinstance(element, TableItem):
                    table_counter += 1
                    element_image_filename = (
                        images_output_dir / f"{doc_filename}-table-{table_counter}.png"
                    )
                    with element_image_filename.open("wb") as fp:
                        element.image.pil_image.save(fp, "PNG")

                if isinstance(element, PictureItem):
                    picture_counter += 1
                    element_image_filename = (
                        images_output_dir / f"{doc_filename}-picture-{picture_counter}.png"
                    )
                    with element_image_filename.open("wb") as fp:
                        element.image.pil_image.save(fp, "PNG")

            # Export markdown with embedded images
            content_md = conv_res.document.export_to_markdown(image_mode=ImageRefMode.EMBEDDED)
            md_filename = markdown_output_dir / f"{doc_filename}-with-images.md"
            json_filename=markdown_output_dir / f"{doc_filename}-with-images.json"
            with md_filename.open("w") as fp:
                fp.write(content_md)
            with json_filename.open("w") as fp:
                fp.write(json.dumps(conv_res.document.export_to_dict()))
                

        elif conv_res.status == ConversionStatus.PARTIAL_SUCCESS:
            _log.info(f"Document {conv_res.input.file} was partially converted with errors:")
            for item in conv_res.errors:
                _log.info(f"\t{item.error_message}")
            partial_success_count += 1
        else:
            _log.info(f"Document {conv_res.input.file} failed to convert.")
            failure_count += 1

    _log.info(
        f"Processed {success_count + partial_success_count + failure_count} docs, "
        f"of which {failure_count} failed and {partial_success_count} were partially converted."
    )
    return success_count, partial_success_count, failure_count

In [7]:
def main():
    logging.basicConfig(level=logging.INFO)

    # PDF paths
    input_doc_paths = [
        Path("./pdfs/Beyond Active and Passive Investing_ The Customization of Finance.pdf"),
        Path("./pdfs/Investment Model Validation_ A Guide for Practitioners.pdf"),
        Path("./pdfs/The Economics of Private Equity_ A Critical Review.pdf"),
    ]

    # Setup PDF pipeline options for image scaling and generation
    pipeline_options = PdfPipelineOptions()
    pipeline_options.images_scale = IMAGE_RESOLUTION_SCALE
    pipeline_options.generate_page_images = False
    pipeline_options.generate_table_images = True
    pipeline_options.generate_picture_images = True

    doc_converter = DocumentConverter(
        format_options={
            InputFormat.PDF: PdfFormatOption(pipeline_options=pipeline_options)
        }
    )

    start_time = time.time()

    # Convert all documents
    conv_results = doc_converter.convert_all(input_doc_paths, raises_on_error=False)
    success_count, partial_success_count, failure_count = export_documents(
        conv_results, output_dir=Path("output")
    )

    end_time = time.time() - start_time
    _log.info(f"Document conversion complete in {end_time:.2f} seconds.")

    if failure_count > 0:
        raise RuntimeError(
            f"The conversion failed for {failure_count} out of {len(input_doc_paths)} documents."
        )

In [8]:
if __name__ == "__main__":
    main()

INFO:docling.document_converter:Going to convert document batch...


Fetching 9 files:   0%|          | 0/9 [00:00<?, ?it/s]

INFO:docling.pipeline.base_pipeline:Processing document Beyond Active and Passive Investing_ The Customization of Finance.pdf
INFO:docling.document_converter:Finished converting document Beyond Active and Passive Investing_ The Customization of Finance.pdf in 55.04 sec.
INFO:docling.pipeline.base_pipeline:Processing document Investment Model Validation_ A Guide for Practitioners.pdf
INFO:docling.document_converter:Finished converting document Investment Model Validation_ A Guide for Practitioners.pdf in 34.23 sec.
INFO:docling.document_converter:Going to convert document batch...
INFO:docling.pipeline.base_pipeline:Processing document The Economics of Private Equity_ A Critical Review.pdf
INFO:docling.document_converter:Finished converting document The Economics of Private Equity_ A Critical Review.pdf in 30.22 sec.
INFO:__main__:Processed 3 docs, of which 0 failed and 0 were partially converted.
INFO:__main__:Document conversion complete in 119.66 seconds.


In [10]:
import os
import json
import pinecone
import openai
from pathlib import Path
from typing import List, Dict
from pinecone import Pinecone, ServerlessSpec
from dotenv import load_dotenv

load_dotenv(override=True)

# Set up OpenAI and Pinecone API keys
pinecone = Pinecone(api_key=os.getenv("PINECONE_API_KEY"))

index_name = "document-embedding-index"

# Initialize Pinecone index (create if it doesn't exist)
if index_name not in pinecone.list_indexes():
    pinecone.create_index(
        name=index_name,
        dimension=1536,
        metric="cosine",
        spec=ServerlessSpec(
            cloud="aws",
            region="us-east-1"
        ) 
    )
index = pinecone.Index(index_name)

INFO:pinecone_plugin_interface.logging:Discovering subpackages in _NamespacePath(['/Users/udaykiran/Library/Caches/pypoetry/virtualenvs/a4-vItfbIzM-py3.12/lib/python3.12/site-packages/pinecone_plugins'])
INFO:pinecone_plugin_interface.logging:Looking for plugins in pinecone_plugins.inference
INFO:pinecone_plugin_interface.logging:Installing plugin inference into Pinecone


In [62]:
import os
from pathlib import Path
import json
import numpy as np
from typing import Dict, List, Optional
import pinecone
from openai import OpenAI
from tqdm import tqdm
import base64
from PIL import Image
from io import BytesIO
import logging
from time import sleep

# Set up logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('vectorization.log'),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger(__name__)

class CFADocumentVectorizer:
    def __init__(
        self,
        openai_api_key: str,
        pinecone_api_key: str,
        index_name: str = "cfa-research",
        namespace: str = "investment_research"
    ):
        # Initialize OpenAI client
        self.client = OpenAI(api_key=openai_api_key)
        self.embedding_model = "text-embedding-3-small"
        
        # Initialize Pinecone
        self.pc = Pinecone(api_key=pinecone_api_key)
        try:
            # Check if index exists
            index_list = self.pc.list_indexes()
            logger.info(f"Existing indexes: {index_list}")

            # Check if index exists in the list
            existing_indexes = index_list.get('indexes', [])
            index_exists = any(idx.get('name') == index_name for idx in existing_indexes)
            
            if index_exists:
                logger.info(f"Deleting existing index: {index_name}")
                self.pc.delete_index(index_name)
                # Wait a bit after deletion
                sleep(5)
                logger.info(f"Deleted index: {index_name}")

            # Create new index
            logger.info(f"Creating new index: {index_name}")
            self.pc.create_index(
                name=index_name,
                dimension=1536,
                metric="cosine",
                spec=ServerlessSpec(
                    cloud="aws",
                    region="us-east-1"
                )
            )
            
            # Wait for index to be ready
            retry_count = 0
            max_retries = 60  # Maximum 1 minute wait
            while retry_count < max_retries:
                try:
                    index_info = self.pc.describe_index(index_name)
                    if index_info.get('status', {}).get('ready', False):
                        break
                except Exception as e:
                    logger.warning(f"Waiting for index to be ready... {str(e)}")
                sleep(1)
                retry_count += 1
                
            if retry_count >= max_retries:
                raise TimeoutError("Index creation timed out")
                
            logger.info(f"Index {index_name} is ready")
            
            self.index = self.pc.Index(index_name)
            self.namespace = namespace
            
            # Set up paths
            self.output_dir = Path("./output")
            self.images_dir = self.output_dir / "images"
            
        except Exception as e:
            logger.error(f"Error initializing Pinecone: {str(e)}")
            raise


    def load_document(self, json_path: Path) -> Dict:
        """Load and validate JSON document"""
        try:
            with open(json_path) as f:
                doc_data = json.load(f)
            logger.info(f"Successfully loaded document: {json_path.name}")
            return doc_data
        except Exception as e:
            logger.error(f"Error loading document {json_path}: {e}")
            raise

    def create_embedding(self, text: str) -> List[float]:
        """Create embedding with rate limiting and retries"""
        max_retries = 3
        for attempt in range(max_retries):
            try:
                response = self.client.embeddings.create(
                    input=text,
                    model=self.embedding_model
                )
                return response.data[0].embedding
            except Exception as e:
                if attempt == max_retries - 1:
                    logger.error(f"Failed to create embedding after {max_retries} attempts: {e}")
                    raise
                sleep(2 ** attempt)  # Exponential backoff
                continue

    def process_text_block(self, text_block: Dict, doc_name: str, current_section: str) -> Dict:
        """Process a single text block with section context"""
        text_content = text_block['text']
        label = text_block['label']
        prov = text_block['prov'][0]
        prov = text_block['prov'][0]  # Access first element of prov list
        page_no = prov['page_no']
        bbox = prov['bbox']
        # page_no = text_block['prov']['0']['page_no']
        # bbox = text_block['prov']['0']['bbox']
        # Convert bbox to string format for Pinecone metadata
        bbox_str = f"{bbox['l']:.2f},{bbox['t']:.2f},{bbox['r']:.2f},{bbox['b']:.2f}"
        try:
            embedding = self.create_embedding(text_content)
            
            return {
                'id': f"{doc_name}_text_{text_block['self_ref'].split('/')[-1]}",
                'values': embedding,
                'metadata': {
                    'doc_name': doc_name,
                    'content_type': 'text',
                    'label': label,
                    'page_no': page_no,
                    'text': text_content[:1000],  ##CHECK IF TO USE OR NOT # Limit metadata text length
                    'bbox': bbox_str,
                    'section': current_section
                }
            }
        except Exception as e:
            logger.error(f"Error processing text block: {e}")
            logger.error(f"Text block structure: {text_block}")
            return None
    def process_table(self, table: Dict, doc_name: str) -> List[Dict]:
        """Process table data and corresponding image"""
        vectors = []
        table_id = table['self_ref'].split('/')[-1]
        prov = table['prov'][0]  # Access first element of prov list
        page_no = prov['page_no']
        bbox = prov['bbox']  # Get bbox from prov
        bbox_str = f"{bbox['l']:.2f},{bbox['t']:.2f},{bbox['r']:.2f},{bbox['b']:.2f}"
        
        # Get table image path
        table_image_path = self.images_dir / f"{doc_name}-table-{int(table_id)+1}.png"
        
        # Process structured table data
        if 'data' in table and 'table_cells' in table['data']:
            table_text = []
            for cell in table['data']['grid']:
                if 'text' in cell:
                    table_text.append(cell['text'])
            
            if table_text:
                try:
                    text_embedding = self.create_embedding(" ".join(table_text))
                    
                    vectors.append({
                        'id': f"{doc_name}_table_{table_id}_text",
                        'values': text_embedding,
                        'metadata': {
                            'doc_name': doc_name,
                            'content_type': 'table_text',
                            'page_no': page_no,
                            'text': " ".join(table_text)[:1000],
                            'bbox': bbox_str,  # Add bbox to metadata
                            'table_structure': {
                                'num_rows': table['data']['table_cells']['num_rows'],
                                'num_cols': table['data']['table_cells']['num_cols']
                            },
                            'image_path': str(table_image_path)
                        }
                    })
                except Exception as e:
                    logger.error(f"Error processing table text: {e}")

        # Process table image if it exists
        if table_image_path.exists():
            try:
                with open(table_image_path, 'rb') as img_file:
                    image_data = img_file.read()
                    
                response = self.client.chat.completions.create(
                    model="gpt-4o",
                    messages=[
                        {
                            "role": "user",
                            "content": [
                                {"type": "text", "text": "Describe this table's content and structure in detail."},
                                {"type": "image_url",
                                "image_url": {"url": f"data:image/png;base64,{base64.b64encode(image_data).decode()}"}}
                            ]
                        }
                    ],
                    max_tokens=300
                )
                
                image_description = response.choices[0].message.content
                image_embedding = self.create_embedding(image_description)
                
                vectors.append({
                    'id': f"{doc_name}_table_{table_id}_image",
                    'values': image_embedding,
                    'metadata': {
                        'doc_name': doc_name,
                        'content_type': 'table_image',
                        'page_no': page_no,
                        'bbox': bbox_str,  # Add bbox to metadata
                        'description': image_description,
                        'image_path': str(table_image_path)
                    }
                })
            except Exception as e:
                logger.error(f"Error processing table image {table_image_path}: {e}")

        return vectors

    def process_picture(self, picture: Dict, doc_name: str) -> Optional[Dict]:
        """Process picture using the extracted image file"""
        try:
            picture_id = picture['self_ref'].split('/')[-1]
            prov = picture['prov'][0]  # Access first element of prov list
            page_no = prov['page_no']
            bbox = prov['bbox']  # Get bbox from prov
            bbox_str = f"{bbox['l']:.2f},{bbox['t']:.2f},{bbox['r']:.2f},{bbox['b']:.2f}"
            
            picture_path = self.images_dir / f"{doc_name}-picture-{int(picture_id)+1}.png"
            
            if not picture_path.exists():
                logger.warning(f"Picture file not found: {picture_path}")
                return None
                
            with open(picture_path, 'rb') as img_file:
                image_data = img_file.read()
                
            # Get image description using GPT-4 Vision
            response = self.client.chat.completions.create(
                model="gpt-4o",
                messages=[
                    {
                        "role": "user",
                        "content": [
                            {"type": "text", "text": "Describe this image in detail, including any text or diagrams visible."},
                            {"type": "image_url",
                            "image_url": {"url": f"data:image/png;base64,{base64.b64encode(image_data).decode()}"}}
                        ]
                    }
                ],
                max_tokens=300
            )
            
            image_description = response.choices[0].message.content
            
            # Create embedding for the description
            embedding = self.create_embedding(image_description)
            
            return {
                'id': f"{doc_name}_picture_{picture_id}",
                'values': embedding,
                'metadata': {
                    'doc_name': doc_name,
                    'content_type': 'picture',
                    'page_no': page_no,
                    'bbox': bbox_str,  # Add bbox to metadata
                    'description': image_description,
                    'image_path': str(picture_path)
                }
            }
        except Exception as e:
            logger.error(f"Error processing picture {picture_id}: {str(e)}")
            logger.error(f"Picture structure: {picture}")
            return None

    def process_document(self, json_path: Path) -> None:
        """Process complete document including all its components"""
        doc_data = self.load_document(json_path)
        doc_name = doc_data['name']
        current_section = "introduction"
        vectors_to_upsert = []
        
        logger.info(f"Processing document: {doc_name}")
        
        # Process text blocks
        logger.info("Processing text blocks...")
        for text_block in tqdm(doc_data['texts'], desc="Text blocks"):
            # Update section if section header
            if text_block['label'] == 'section_header':
                current_section = text_block['text']
                
            vector = self.process_text_block(text_block, doc_name, current_section)
            if vector:
                vectors_to_upsert.append(vector)
                
                if len(vectors_to_upsert) >= 100:
                    self.index.upsert(vectors=vectors_to_upsert, namespace=self.namespace)
                    vectors_to_upsert = []
                    sleep(1)  # Rate limiting
        
        # Process tables
        logger.info("Processing tables...")
        for table in tqdm(doc_data['tables'], desc="Tables"):
            table_vectors = self.process_table(table, doc_name)
            vectors_to_upsert.extend(table_vectors)
            
            if len(vectors_to_upsert) >= 100:
                self.index.upsert(vectors=vectors_to_upsert, namespace=self.namespace)
                vectors_to_upsert = []
                sleep(1)  # Rate limiting
        
        # Process pictures
        logger.info("Processing pictures...")
        for picture in tqdm(doc_data['pictures'], desc="Pictures"):
            vector = self.process_picture(picture, doc_name)
            if vector:
                vectors_to_upsert.append(vector)
                
                if len(vectors_to_upsert) >= 100:
                    self.index.upsert(vectors=vectors_to_upsert, namespace=self.namespace)
                    vectors_to_upsert = []
                    sleep(1)  # Rate limiting
        
        # Upsert any remaining vectors
        if vectors_to_upsert:
            self.index.upsert(vectors=vectors_to_upsert, namespace=self.namespace)
        
        logger.info(f"Completed processing document: {doc_name}")

def main():
    """Process all documents in the output directory"""
    try:
        # Load environment variables
        from dotenv import load_dotenv
        load_dotenv()
        
        openai_api_key = os.getenv("OPENAI_API_KEY")
        pinecone_api_key = os.getenv("PINECONE_API_KEY")
        #pinecone_env = os.getenv("PINECONE_ENV")
        
        if not all([openai_api_key, pinecone_api_key]):
            raise ValueError("Missing required environment variables")
        
        # Initialize vectorizer
        vectorizer = CFADocumentVectorizer(
            openai_api_key=openai_api_key,
            pinecone_api_key=pinecone_api_key
        )
        
        # Process all JSON files in output directory
        output_dir = Path("./output")
        json_files = list(output_dir.glob("*-with-images.json"))
        
        logger.info(f"Found {len(json_files)} documents to process")
        
        for json_file in json_files:
            try:
                logger.info(f"\nProcessing {json_file.name}")
                vectorizer.process_document(json_file)
            except Exception as e:
                logger.error(f"Error processing document {json_file.name}: {e}")
                continue
        
        logger.info("Completed processing all documents")
        
    except Exception as e:
        logger.error(f"Error in main process: {e}")
        raise

if __name__ == "__main__":
    main()

INFO:pinecone_plugin_interface.logging:Discovering subpackages in _NamespacePath(['/Users/udaykiran/Library/Caches/pypoetry/virtualenvs/a4-vItfbIzM-py3.12/lib/python3.12/site-packages/pinecone_plugins'])
INFO:pinecone_plugin_interface.logging:Looking for plugins in pinecone_plugins.inference
INFO:pinecone_plugin_interface.logging:Installing plugin inference into Pinecone


INFO:__main__:Existing indexes: {'indexes': [{'deletion_protection': 'disabled',
              'dimension': 1536,
              'host': 'cfa-research-gu6zp0z.svc.aped-4627-b74a.pinecone.io',
              'metric': 'cosine',
              'name': 'cfa-research',
              'spec': {'serverless': {'cloud': 'aws', 'region': 'us-east-1'}},
              'status': {'ready': True, 'state': 'Ready'}}]}
INFO:__main__:Deleting existing index: cfa-research
INFO:__main__:Deleted index: cfa-research
INFO:__main__:Creating new index: cfa-research
INFO:__main__:Index cfa-research is ready
INFO:__main__:Found 1 documents to process
INFO:__main__:
Processing Beyond Active and Passive Investing_ The Customization of Finance-with-images.json
INFO:__main__:Successfully loaded document: Beyond Active and Passive Investing_ The Customization of Finance-with-images.json
INFO:__main__:Processing document: Beyond Active and Passive Investing_ The Customization of Finance
INFO:__main__:Processing text bloc