In [1]:
import os
import glob  
import requests  
import time
from azure.storage.blob import BlobServiceClient, ContainerClient  
from openai import AzureOpenAI
from pathlib import Path  
import concurrent.futures  
from functools import partial  


In [2]:
# Data to send in the job request (optional)  
data = {  
    "openai_gpt_api_base" : "https://[].openai.azure.com/",
    "openai_gpt_api_key" : "[]",
    "openai_gpt_api_version" :  "2024-02-15-preview",
    "openai_gpt_model" : "gpt-4o",
    "blob_storage_service_name" : "[]",
    "blob_storage_service_api_key" : "[]",
    "blob_storage_container" : "insight-engine-data",
    "openai_embedding_api_base" : "https://[].openai.azure.com/",
    "openai_embedding_api_key" : "[]",
    "openai_embedding_api_version" :  "2024-02-15-preview",
    "openai_embedding_model" : "text-embedding-ada-002",
    "search_service_name": "[]",
    "search_admin_key" : "[]",
    "search_index_name": "test",
    "search_api_version" : "2024-05-01-preview",
}  

data['url_file_to_process'] = 'https://github.com/liamca/GPT4oContentExtraction/raw/main/Transforming-Content-with-GPT4o.pptx'
# base_url = "http://127.0.0.1:3100"
base_url = "https://liamca-aca-doc2md.salmonpebble-e154b31c.westus2.azurecontainerapps.io"

job_submit_url = f"{base_url}/start-job"
job_status_url = f"{base_url}/job-status"

search_headers = {  
    'Content-Type': 'application/json',  
    'api-key': data['search_admin_key']
}  



In [3]:
# Function to generate vectors for title and content fields, also used for query vectors
max_attempts = 6
max_backoff = 60
def generate_embedding(text):
    if text == None:
        return None

    client = AzureOpenAI(
        api_version=data['openai_embedding_api_version'],
        azure_endpoint=data['openai_embedding_api_base'],
        api_key=data['openai_embedding_api_key']
    )    
    counter = 0
    incremental_backoff = 1   # seconds to wait on throttline - this will be incremental backoff
    while True and counter < max_attempts:
        try:
            response = client.embeddings.create(
                input=text,
                model=data['openai_embedding_model']
            )
            return json.loads(response.model_dump_json())["data"][0]['embedding']
        except openai.APIError as ex:
            # Handlethrottling - code 429
            if str(ex.code) == "429":
                incremental_backoff = min(max_backoff, incremental_backoff * 1.5)
                print ('Waiting to retry after', incremental_backoff, 'seconds...')
                time.sleep(incremental_backoff)
            elif str(ex.code) == "content_filter":
                print ('API Error', ex.code)
                return None
        except Exception as ex:
            counter += 1
            print ('Error - Retry count:', counter, ex)
    return None
    
def create_index():
    dims = len(generate_embedding('The quick brown fox.'))
    print ('Dimensions in Embedding Model:', dims)
    
    with open("schema.json", "r") as f_in:
        index_schema = json.loads(f_in.read())
        index_schema['name'] = data['search_index_name']
        index_schema['vectorSearch']['vectorizers'][0]['azureOpenAIParameters']['resourceUri'] = data['openai_embedding_api_base']
        index_schema['vectorSearch']['vectorizers'][0]['azureOpenAIParameters']['deploymentId'] = data['openai_embedding_model']
        index_schema['vectorSearch']['vectorizers'][0]['azureOpenAIParameters']['apiKey'] = data['openai_embedding_api_key']

    # Making the POST requests to re-create the index  
    search_service_url = "https://{}.search.windows.net/".format(data['search_service_name'])
    delete_url = f"{search_service_url}/indexes/{data['search_index_name']}?api-version={data['search_api_version']}"  
    response = requests.delete(delete_url, headers=search_headers)  
    if response.status_code == 204:  
        print(f"Index {data['search_index_name']} deleted successfully.")  
        # print(json.dumps(response.json(), indent=2))  
    else:  
        print("Error deleting index, it may not exist.")  
    
    # The endpoint URL for creating the index  
    create_index_url = f"{search_service_url}/indexes?api-version={data['search_api_version']}"  
    response = requests.post(create_index_url, headers=search_headers, json=index_schema)  
      
    # Check the response  
    if response.status_code == 201:  
        print(f"Index {data['search_index_name']} created successfully.")  
        # print(json.dumps(response.json(), indent=2))  
    else:  
        print(f"Error creating index {data['search_index_name']} :")  
        print(response.json())

# Create directory if it does not exist
def ensure_directory_exists(directory_path):  
    path = Path(directory_path)  
    if not path.exists():  
        path.mkdir(parents=True, exist_ok=True)  
        print(f"Directory created: {directory_path}")  
    else:  
        print(f"Directory already exists: {directory_path}")  

def process_json(file, doc_id, json_out_dir):
    print ('file', file)
    if '.txt' in file:
        with open(file, 'r', encoding="utf8") as c_in:
            content = c_in.read()

        json_data = {
            'doc_id': doc_id, 
            'page_number': int(os.path.basename(file).replace('.txt', '')),
            'content': content
            }

        json_data['vector'] = generate_embedding(json_data['content'])


        with open(os.path.join(json_out_dir, os.path.basename(file).replace('.txt', '.json')), 'w') as c_out:
            c_out.write(json.dumps(json_data, indent=4))

    else:
        print ('Skipping non JSON file:', file)

    return file


def index_content(json_files):
    # Index the content
    batch_size = 50
    search_service_url = "https://{}.search.windows.net/".format(data['search_service_name'])
    index_doc_url = f"{search_service_url}/indexes/{data['search_index_name']}/docs/index?api-version={data['search_api_version']}" 
    
    documents = {"value": []}
    for file in json_files:
        if '.json' in file:
            with open(file, 'r') as j_in:
                json_data = json.loads(j_in.read())
            json_data['doc_id'] = json_data['doc_id'] + '-' + str(json_data['page_number'])
            documents["value"].append(json_data)
            if len(documents["value"]) == batch_size:
                response = requests.post(index_doc_url, headers=search_headers, json=documents)  
                # Check the response  
                if response.status_code == 200:  
                    print(f"Document Indexed successfully.")  
                    # print(json.dumps(response.json(), indent=2))  
                else:  
                    print(f"Error indexing document {file} :")  
                    print(response.json())  
                documents = {"value": []}
                
    response = requests.post(index_doc_url, headers=search_headers, json=documents)  
    # Check the response  
    if response.status_code == 200:  
        print(f"Documents Indexed successfully.")  
        # print(json.dumps(response.json(), indent=2))  
    else:  
        print(f"Error indexing documents {file} :")  
        print(response.json())  
    documents = {"value": []}


In [4]:
# Submit job to convert the document to Markdown files
response = requests.post(job_submit_url, json=data)  

# Check if the request was successful  
if response.status_code == 200:  
    job_info = response.json()  
    job_id=job_info['job_id']
    print(f"Job started successfully! Job ID: {job_id}")  
    data_status = { 
        "job_id": job_info['job_id'],
        "blob_storage_service_name" : data['blob_storage_service_name'],
        "blob_storage_service_api_key" : data['blob_storage_service_api_key'],
        "blob_storage_container" : data['blob_storage_container']
    }  
    
    # Send requests to check job status  
    while True:
        time.sleep(2)
        response = requests.post(job_status_url, json=data_status)  

        # Check if the request was successful  
        if response.status_code == 200:  
            job_status = response.json()  
            print(f"Job Status for Job ID {job_id}: {job_status['status']}")  
            if 'message' in job_status:
                print(f"{job_status['message']}")  
            if job_status['status'] != 'in-progress':
                print (job_status)
                break
        else:  
            print(f"Failed to check job status: {response.status_code} - {response.text}")  
            break
else:  
    print(f"Failed to start job: {response.status_code} - {response.text}")  


Job started successfully! Job ID: 4ff0e714-ed34-40e8-ab07-12a212c78b92
Job Status for Job ID 4ff0e714-ed34-40e8-ab07-12a212c78b92: in-progress
Converting images to Markdown...
Job Status for Job ID 4ff0e714-ed34-40e8-ab07-12a212c78b92: in-progress
Converting images to Markdown...
Job Status for Job ID 4ff0e714-ed34-40e8-ab07-12a212c78b92: in-progress
Converting images to Markdown...
Job Status for Job ID 4ff0e714-ed34-40e8-ab07-12a212c78b92: in-progress
Converting images to Markdown...
Job Status for Job ID 4ff0e714-ed34-40e8-ab07-12a212c78b92: in-progress
Converting images to Markdown...
Job Status for Job ID 4ff0e714-ed34-40e8-ab07-12a212c78b92: in-progress
Converting images to Markdown...
Job Status for Job ID 4ff0e714-ed34-40e8-ab07-12a212c78b92: in-progress
Converting images to Markdown...
Job Status for Job ID 4ff0e714-ed34-40e8-ab07-12a212c78b92: complete
Processing complete.
{'job_id': '4ff0e714-ed34-40e8-ab07-12a212c78b92', 'status': 'complete', 'message': 'Processing complete

In [5]:
# Download the files that were processed
connection_string = f"DefaultEndpointsProtocol=https;AccountName={data['blob_storage_service_name']};AccountKey={data['blob_storage_service_api_key']};EndpointSuffix=core.windows.net"  
container_name = data['blob_storage_container']
folder_name = job_info['job_id']

# Initialize the BlobServiceClient and ContainerClient  
blob_service_client = BlobServiceClient.from_connection_string(connection_string)  
container_client = blob_service_client.get_container_client(container_name)  
blobs = container_client.list_blobs(name_starts_with=folder_name)  

# Define the local directory to save the downloaded files  
local_path = job_info['job_id']  

# Ensure the local directory exists  
if not os.path.exists(local_path):  
    os.makedirs(local_path)  

# Download each blob  
for blob in blobs:  
    blob_client = container_client.get_blob_client(blob)  
    blob_name = blob.name  
    # Create the full local path  
    local_file_path = blob_name
    ensure_directory_exists(os.path.dirname(local_file_path))

    # Download the blob to the local file  
    with open(local_file_path, "wb") as download_file:  
        download_file.write(blob_client.download_blob().readall())  

    print(f"Downloaded {blob_name} to {local_file_path}")  

print("Download completed!")  


Directory created: 4ff0e714-ed34-40e8-ab07-12a212c78b92/images
Downloaded 4ff0e714-ed34-40e8-ab07-12a212c78b92/images/1.png to 4ff0e714-ed34-40e8-ab07-12a212c78b92/images/1.png
Directory already exists: 4ff0e714-ed34-40e8-ab07-12a212c78b92/images
Downloaded 4ff0e714-ed34-40e8-ab07-12a212c78b92/images/2.png to 4ff0e714-ed34-40e8-ab07-12a212c78b92/images/2.png
Directory already exists: 4ff0e714-ed34-40e8-ab07-12a212c78b92/images
Downloaded 4ff0e714-ed34-40e8-ab07-12a212c78b92/images/3.png to 4ff0e714-ed34-40e8-ab07-12a212c78b92/images/3.png
Directory already exists: 4ff0e714-ed34-40e8-ab07-12a212c78b92/images
Downloaded 4ff0e714-ed34-40e8-ab07-12a212c78b92/images/4.png to 4ff0e714-ed34-40e8-ab07-12a212c78b92/images/4.png
Directory already exists: 4ff0e714-ed34-40e8-ab07-12a212c78b92/images
Downloaded 4ff0e714-ed34-40e8-ab07-12a212c78b92/images/5.png to 4ff0e714-ed34-40e8-ab07-12a212c78b92/images/5.png
Directory already exists: 4ff0e714-ed34-40e8-ab07-12a212c78b92/images
Downloaded 4ff0e7

In [6]:
# Re-create the index
create_index()

Dimensions in Embedding Model: 1536
Index test deleted successfully.
Index test created successfully.


In [None]:
# Vectorize the content and store in an AI Search compatible JSON format
max_workers = 15

print ('Vectorizing content...')
markdown_files = glob.glob(os.path.join(os.path.join(folder_name, 'markdown'), "*.txt"))  
json_out_dir = os.path.join(job_info['job_id'], 'json')
ensure_directory_exists(json_out_dir)

partial_process_json = partial(process_json, doc_id=job_info['job_id'], json_out_dir=json_out_dir)  
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:  
    results = list(executor.map(partial_process_json, markdown_files))  
print(results)  

json_files = glob.glob(os.path.join(json_out_dir, "*.json"))
total_files = len(json_files)
print ('Total JSON Files:', total_files)


Vectorizing content...
Directory created: 4ff0e714-ed34-40e8-ab07-12a212c78b92\json
file 4ff0e714-ed34-40e8-ab07-12a212c78b92\markdown\1.txt
file 4ff0e714-ed34-40e8-ab07-12a212c78b92\markdown\2.txt
file 4ff0e714-ed34-40e8-ab07-12a212c78b92\markdown\3.txt
file 4ff0e714-ed34-40e8-ab07-12a212c78b92\markdown\4.txt
file 4ff0e714-ed34-40e8-ab07-12a212c78b92\markdown\5.txt
file 4ff0e714-ed34-40e8-ab07-12a212c78b92\markdown\6.txt
file 4ff0e714-ed34-40e8-ab07-12a212c78b92\markdown\7.txt
file 4ff0e714-ed34-40e8-ab07-12a212c78b92\markdown\8.txt
file 4ff0e714-ed34-40e8-ab07-12a212c78b92\markdown\9.txt


In [12]:
# Index content
print ('Indexing content...')
index_content(json_files)

Indexing content...
Documents Indexed successfully.
