In [1]:
%autosave 300
%load_ext autoreload
%autoreload 2
%reload_ext autoreload
%config Completer.use_jedi = False

Autosaving every 300 seconds


In [2]:
import os

os.chdir(
    "/mnt/batch/tasks/shared/LS_root/mounts/clusters/copilot-model-run/code/Users/Soutrik.Chowdhury/unstructured_data_experiments"
)
print(os.getcwd())

/mnt/batch/tasks/shared/LS_root/mounts/clusters/copilot-model-run/code/Users/Soutrik.Chowdhury/unstructured_data_experiments


In [3]:
from pdf2image import convert_from_path
import os
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import PromptTemplate
from langchain_openai import AzureChatOpenAI
from langchain.prompts import (
    ChatPromptTemplate,
    HumanMessagePromptTemplate,
    MessagesPlaceholder,
    SystemMessagePromptTemplate,
)
from dotenv import load_dotenv, find_dotenv
import base64
import requests
import json
from datetime import datetime
from langchain.schema import Document
import re
import shutil
from joblib import Parallel, delayed
from functools import partial
from requests.exceptions import RequestException
import time
import pickle

In [4]:
load_dotenv(find_dotenv("dev.env"))

True

In [5]:
print(os.getenv("AZURE_OPENAI_API_KEY"))
print(os.getenv("AZURE_OPENAI_ENDPOINT"))
print(os.getenv("OPENAI_API_VERSION"))

64a128e1a82f40888c77f200c6e5e661
https://brewdatgbgaighqtechopenai01ncud.openai.azure.com/
2024-02-15-preview


In [6]:
def convert_pdf_to_images(pdf_path, dest_folder, image_format="JPEG"):
    """
    Converts each page of a PDF into images and saves them in a directory.

    Args:
    - pdf_path (str): Path to the PDF file.
    - dest_folder (str): Destination folder to save the images.
    - image_format (str): Format to save the images (default is "JPEG").

    Returns:
    - list: List of image file paths saved.
    """

    pdf_name = os.path.splitext(os.path.basename(pdf_path))[0]
    output_dir = os.path.join(dest_folder, f"{pdf_name}_images")
    if os.path.exists(output_dir):
        # remove the existing directory
        shutil.rmtree(output_dir)
    os.makedirs(output_dir, exist_ok=True)

    images = convert_from_path(pdf_path)
    saved_image_paths = []

    for i, img in enumerate(images):
        image_path = os.path.join(output_dir, f"page{i}.{image_format.lower()}")
        img.save(image_path, image_format)
        saved_image_paths.append(image_path)

    return saved_image_paths

In [7]:
def encode_image(image_path):
    """
    Encodes an image to base64.

    Args:
    - image_path (str): Path to the image file.

    Returns:
    - str: Base64 encoded string of the image.
    """
    with open(image_path, "rb") as img_file:
        encoded_image = base64.b64encode(img_file.read()).decode("utf-8")
    return encoded_image

In [8]:
def get_gpt_model_infra(api_key, endpoint, model_name, api_version):
    """
    Sets up headers and endpoint for GPT model API.

    Args:
    - api_key (str): API key for authentication.
    - endpoint (str): Base URL of the API endpoint.
    - model_name (str): Name of the GPT model (default is "gpt-4o").
    - api_version (str): API version (default is "2024-02-15-preview").

    Returns:
    - tuple: Headers and GPT model endpoint.
    """
    headers = {
        "Content-Type": "application/json",
        "api-key": api_key,
    }

    gpt_endpoint = f"{endpoint}openai/deployments/{model_name}/chat/completions?api-version={api_version}"
    # print(gpt_endpoint)
    return headers, gpt_endpoint

In [9]:
def get_gpt_model_payload(encoded_image, temperature=0.0, top_p=1e-5, seed=1234):
    """
    Constructs the payload for the GPT model API request.

    Args:
    - encoded_image (str): Base64 encoded image string.
    - temperature (float): Sampling temperature (default is 0.0).
    - top_p (float): Cumulative probability for nucleus sampling (default is 1e-5).
    - seed (int): Seed for reproducibility (default is 1234).

    Returns:
    - dict: Payload for the GPT model API request.
    """
    # this can be argument to the function
    system_message = (
        "You are an AI assistant specializing in extracting information from images containing three main components: "
        "text, tables, and figures. Your tasks are as follows:\n"
        "* Extract the text exactly as it appears in the image.\n"
        "* Extract data from tables while preserving the table structure, providing the data in a structured 2D format, "
        "and maintaining hierarchical columns if present.\n"
        "* Extract and describe the information from figures in plain text format, making it understandable and reproducible "
        "from the extracted data.\n\n"
        "The final output should be in plain text format, containing only the extracted information."
    )

    user_message = {
        "type": "image_url",
        "image_url": {"url": f"data:image/jpeg;base64,{encoded_image}"},
    }

    payload = {
        "messages": [
            {"role": "system", "content": system_message},
            {"role": "user", "content": [user_message]},
        ],
        "temperature": temperature,
        "top_p": top_p,
        "seed": seed,
    }

    return payload

In [10]:
def invoke_gpt_model_response(headers, endpoint, payload):
    """
    Sends a request to the GPT model API and retrieves the response.

    Args:
    - headers (dict): Headers for the API request.
    - endpoint (str): Full URL of the GPT model endpoint.
    - payload (dict): Payload for the API request.

    Returns:
    - dict: Response from the GPT model API.
    """
    try:
        response = requests.post(endpoint, headers=headers, json=payload)
        response.raise_for_status()
    except Exception as e:
        raise SystemExit(f"Failed to make the request. Error: {e}")

    return response.json()

In [11]:
def extract_text_from_image(
    image_path, api_key, endpoint, model_name, api_version, max_retries=3, retry_wait=2
):
    """
    Extracts text and information from an image using GPT model with manual retry logic.

    Args:
    - image_path (str): Path to the image file.
    - api_key (str): API key for authentication.
    - endpoint (str): Base URL of the API endpoint.
    - model_name (str): Name of the GPT model.
    - api_version (str): API version.
    - max_retries (int): Maximum number of retries (default is 3).
    - retry_wait (int): Wait time between retries in seconds (default is 2).

    Returns:
    - dict: GPT model's response containing extracted information.
    """
    retries = 0
    while retries < max_retries:
        try:
            encoded_image = encode_image(image_path)
            headers, gpt_endpoint = get_gpt_model_infra(
                api_key, endpoint, model_name, api_version
            )
            payload = get_gpt_model_payload(encoded_image)
            response = invoke_gpt_model_response(headers, gpt_endpoint, payload)
            return response
        except Exception as e:
            print(f"Attempt {retries + 1} failed with error: {e}")
            retries += 1
            if retries < max_retries:
                time.sleep(retry_wait)
            else:
                # If the retry attempts fail, return custom fallback text
                return {
                    "choices": [
                        {
                            "message": {
                                "content": f"Failed to extract information from {image_path} after {max_retries} attempts."
                            }
                        }
                    ],
                    "usage": {"completion_tokens": 0},
                }

In [12]:
def lc_document_from_response(response, image_path):
    """
    Creates a LangChain Document object from the GPT model's response.

    Args:
    - response (dict): Response from the GPT model.
    - image_path (str): Path to the image file.

    Returns:
    - Document: LangChain Document object containing extracted content and metadata.
    """
    pg_content = response["choices"][0]["message"]["content"]
    token_size = response["usage"]["completion_tokens"]
    curr_time = datetime.now()
    pdf_name = re.findall(r"(.*)_images\b", image_path.split("/")[-2])[0]
    page_num = re.findall(r"page(\d+).jpeg", image_path.split("/")[-1])[0]

    # langchain document object

    doc = Document(
        page_content=pg_content,
        metadata={
            "source": pdf_name,
            "page_number": int(page_num),
            "token_size": token_size,
            "timestamp": str(curr_time),
        },
    )

    return doc

In [13]:
def final_image_extraction_pipeline(
    image_path, api_key, endpoint, model_name, api_version
):
    """
    Orchestrates the entire image extraction pipeline from an image to a LangChain Document.

    Args:
    - image_path (str): Path to the image file.
    - api_key (str): API key for authentication.
    - endpoint (str): Base URL of the API endpoint.

    Returns:
    - Document: LangChain Document object containing extracted content and metadata.
    """
    print(f"Extracting information from {image_path}...")
    response = extract_text_from_image(
        image_path, api_key, endpoint, model_name, api_version
    )
    doc = lc_document_from_response(response, image_path)
    return doc

In [14]:
# pdf to images
pdf_file = os.listdir("data")[0]
pdf_path = os.path.join(os.getcwd(), "data", pdf_file)
saved_paths = convert_pdf_to_images(pdf_path, "image_extracts")
print("Images saved to:", saved_paths)

Images saved to: ['image_extracts/1. MSA Gateware (Enteron) 1_images/page0.jpeg', 'image_extracts/1. MSA Gateware (Enteron) 1_images/page1.jpeg', 'image_extracts/1. MSA Gateware (Enteron) 1_images/page2.jpeg', 'image_extracts/1. MSA Gateware (Enteron) 1_images/page3.jpeg', 'image_extracts/1. MSA Gateware (Enteron) 1_images/page4.jpeg', 'image_extracts/1. MSA Gateware (Enteron) 1_images/page5.jpeg', 'image_extracts/1. MSA Gateware (Enteron) 1_images/page6.jpeg', 'image_extracts/1. MSA Gateware (Enteron) 1_images/page7.jpeg', 'image_extracts/1. MSA Gateware (Enteron) 1_images/page8.jpeg', 'image_extracts/1. MSA Gateware (Enteron) 1_images/page9.jpeg', 'image_extracts/1. MSA Gateware (Enteron) 1_images/page10.jpeg', 'image_extracts/1. MSA Gateware (Enteron) 1_images/page11.jpeg', 'image_extracts/1. MSA Gateware (Enteron) 1_images/page12.jpeg', 'image_extracts/1. MSA Gateware (Enteron) 1_images/page13.jpeg', 'image_extracts/1. MSA Gateware (Enteron) 1_images/page14.jpeg', 'image_extracts/1

In [15]:
# adding paths to each image file
file_name = pdf_file.replace(".pdf", "")
docs_ls = os.listdir(f"image_extracts/{file_name}_images")
docs_ls = [os.path.join(f"image_extracts/{file_name}_images", doc) for doc in docs_ls]
print(len(docs_ls))

32


In [None]:
# docs_ls

In [None]:
# Running the final pipeline on the first image
docs = final_image_extraction_pipeline(
    image_path="image_extracts/1. MSA Gateware (Enteron) 1_images/page24.jpeg",
    api_key=os.getenv("AZURE_OPENAI_API_KEY"),
    endpoint=os.getenv("AZURE_OPENAI_ENDPOINT"),
    model_name=os.getenv("CHAT_ENGINE_GPT4_DEPLOYMENT_NAME"),
    api_version=os.getenv("OPENAI_API_VERSION"),
)

Extracting information from image_extracts/1. MSA Gateware (Enteron) 1_images/page24.jpeg...


In [None]:
print(docs.page_content)

In [None]:
def run_pipeline_in_parallel(docs_ls, api_key, endpoint, model_name, api_version):
    """
    Runs the image extraction pipeline in parallel for all images in the list.

    Args:
    - docs_ls (list): List of image paths.
    - api_key (str): API key for authentication.
    - endpoint (str): Base URL of the API endpoint.
    - model_name (str): Name of the GPT model.
    - api_version (str): API version.

    Returns:
    - list: List of LangChain Document objects containing extracted content and metadata.
    """
    # Create a partial function with the static arguments
    pipeline_func = partial(
        final_image_extraction_pipeline,
        api_key=api_key,
        endpoint=endpoint,
        model_name=model_name,
        api_version=api_version,
    )

    # Run the function in parallel for all image paths
    results = Parallel(n_jobs=4)(
        delayed(pipeline_func)(image_path) for image_path in docs_ls
    )

    return results

In [None]:
# Running the final pipeline in parallel
documents_ls = run_pipeline_in_parallel(
    docs_ls=docs_ls,
    api_key=os.getenv("AZURE_OPENAI_API_KEY"),
    endpoint=os.getenv("AZURE_OPENAI_ENDPOINT"),
    model_name=os.getenv("CHAT_ENGINE_GPT4_DEPLOYMENT_NAME"),
    api_version=os.getenv("OPENAI_API_VERSION"),
)

In [None]:
print("Documents created:", len(documents_ls))

In [None]:
documents_ls

In [None]:
os.makedirs("data_extracts", exist_ok=True)
with open(f"data_extracts/{file_name}.pkl", "wb") as f:
    pickle.dump(documents_ls, f)

In [None]:
# Notes:
# - The pipeline is working fine for the images in parallel.
# - The extracted content is documented in the LangChain Document objects.
# Retry decorator is non-pickeable, so it is not possible to use it in the parallel processing. so used manual retry logic.