# Process Content
Purpose is to take content stored in blob storage and process it into a separate container.
The new container will consist of json files that have all the data needed to push into Azure AI Search.
This json data is stored for potential BCDR and Geo-replication needs so that content does not need to be reprocessed.

## Required for this step
- Azure Blob Storage (with content)
- Azure OpenAI (completions model and Ada-002 embeddings)
- Azure Document Intelligence

## Important
- This demo was done on Ubuntu which uses LibreOffice to do conversion of documents to PDF for a standard processing format
- PDFKit is used for converting html to PDF - this may need: sudo apt-get install wkhtmltopdf  
- If using Linux run: sudo apt-get install libreoffice
  - eg: !lowriter --convert-to pdf marketbulletin021505.doc



In [1]:
# # Import required libraries  
import os  
import base64
from pathlib import Path
from shutil import rmtree
from requests import get, post
import json
import time
import copy  
from azure.storage.blob import BlobServiceClient, BlobClient, ContainerClient, generate_blob_sas, BlobSasPermissions  
from datetime import datetime, timedelta  
import pdfkit
from langchain.text_splitter import TokenTextSplitter, MarkdownHeaderTextSplitter
import pickle
from openai import AzureOpenAI
from tenacity import retry, wait_random_exponential, stop_after_attempt 


In [5]:
#Load the configuration details for the Cognitive Search Service and Azure OpenAI Instance
#Credentials should be secured using a more secure method such as Azure KeyVault
config = json.load(open("config.json"))

# Azure Blob Storage Config
blob_service_name = config["blob_service_name"]
blob_container = config["blob_container"]
blob_key = config["blob_key"]
connection_string = "DefaultEndpointsProtocol=https;AccountName=" + blob_service_name + ";AccountKey=" + blob_key + ";EndpointSuffix=core.windows.net"  
blob_service_client = BlobServiceClient.from_connection_string(connection_string)  
container_client = blob_service_client.get_container_client(blob_container) 

#Azure OpenAI
api_base = config["openai_api_base"]
api_key = config["openai_api_key"]
openai_api_version = config["openai_api_version"]
embeddings_model = config["openai_embedding_model"]
gpt_model = config["openai_gpt_model"] 

# Doc Intelligence Config
di_endpoint = config["doc_intelligence_endpoint"]
di_apim_key = config["doc_intelligence_apim_key"]
di_headers = {
    'Content-Type': 'application/pdf',
    'Ocp-Apim-Subscription-Key': di_apim_key,
}
di_post_url = di_endpoint + "documentintelligence/documentModels/prebuilt-layout:analyze?api-version=2023-10-31-preview&stringIndexType=utf16CodeUnit&outputContentFormat=markdown"

# Set a temp directory for downloading pdf's for processing
data_root_dir = config["data_root_dir"]
tmp_dir = os.path.join(data_root_dir, "tmp")
pkl_dir = os.path.join(data_root_dir, "pkl")
json_dir = os.path.join(data_root_dir, "json")

# Chunking Config
text_splitter = TokenTextSplitter(chunk_size=512, chunk_overlap=52)  
headers_to_split_on = [
    ("#", "Header 1"),
    ("##", "Header 2"),
    ("###", "Header 3"),
]
markdown_splitter = MarkdownHeaderTextSplitter(headers_to_split_on=headers_to_split_on)


# gets the API Key from environment variable AZURE_OPENAI_API_KEY
client = AzureOpenAI(
    api_version=openai_api_version,
    azure_endpoint=api_base,
    api_key=api_key
)

print ('Temp Dir:', tmp_dir)
print ('Pickle Dir:', pkl_dir)
print ('JSON Dir:', json_dir)


Temp Dir: /aci/data/data/customers/financial-docs/tmp
Pickle Dir: /aci/data/data/customers/financial-docs/pkl
JSON Dir: /aci/data/data/customers/financial-docs/json


In [6]:
# Function to generate embeddings for title and content fields, also used for query embeddings
@retry(wait=wait_random_exponential(min=1, max=20), stop=stop_after_attempt(6))
def generate_embeddings(text):
    response = client.embeddings.create(
        input=text,
        model=embeddings_model
    )
    return json.loads(response.model_dump_json())["data"][0]['embedding']

# Create a title based on a supplied set of text
@retry(wait=wait_random_exponential(min=1, max=20), stop=stop_after_attempt(6))
def generate_title(text):
    response = client.chat.completions.create(
        model=gpt_model, 
        messages=[
            {"role": "system", "content": "Assistant who creates succint titles for content."},
            {"role": "user", "content": text}
        ]
    )
    return response.choices[0].message.content

# reset output dir
def reset_dir(dir):
    processed_path = Path(dir)
    if processed_path.exists():
        rmtree(processed_path)
    processed_path.mkdir(parents=True)

# Get all files in dir
def get_files_in_dir(in_dir):
    return [os.path.join(dp, f) for dp, dn, filenames in os.walk(in_dir) for f in filenames]

def base64_encode_string(s):
    # encode the string into bytes, then encode it in base64  
    encoded = base64.b64encode(s.encode('utf-8'))  
    return encoded.decode('utf-8')


In [9]:
# generate_embeddings('test')[:10]
# generate_title('The quick brown fox jumped over the lazy dog.')

In [10]:
# Create directories for downloading and processing
reset_dir(tmp_dir)
reset_dir(pkl_dir)
reset_dir(json_dir)


In [14]:
# Download and process blobs
blob_list = container_client.list_blobs()  

# Iterate through each blob  
documents = []
for blob in blob_list:  
    file_type = os.path.splitext(blob.name)[1].lower()
    pkl_file = os.path.join(pkl_dir, os.path.basename(blob.name) + '.pkl')


    if os.path.exists(pkl_file) == False:
        if (file_type == ".pdf") or (file_type == ".docx") or (file_type == ".doc") or (file_type == ".html") or (file_type == ".htm"):
            print ('Processing', blob.name)
            # Create a blob client for the blob  
            blob_client = blob_service_client.get_blob_client(blob_container, blob.name)  
            local_file = os.path.join(tmp_dir, blob.name)

            # Download file locally
            print ('Downloading', blob.name, 'to', local_file, '...')
            with open(local_file, "wb") as download_file:  
                download_file.write(blob_client.download_blob().readall())  
    
            pdf_file = local_file
            # Conver file to PDF format (if it is not already PDF)
            if file_type != ".pdf":
                print ('Converting file to PDF format...')
                pdf_file = os.path.join(tmp_dir, blob.name.split('.')[:len(blob.name.split('.'))-1][0] + '.pdf')
                if file_type == ".html":
                    pdfkit.from_file(local_file, pdf_file)
                else:
                    os.system("lowriter --convert-to pdf " + local_file + " --outdir " + tmp_dir) 
    
            print (pdf_file)
    
            print ('Processing', pdf_file)
            with open(pdf_file, "rb") as f:
                data_bytes = f.read()
    
    
            resp = post(url = di_post_url, data = data_bytes, headers = di_headers)
            if resp.status_code != 202:
                print("POST analyze failed:\n%s" % resp.text)
                quit()
            print("POST analyze succeeded:\n%s" % resp.headers)
            get_url = resp.headers["operation-location"]
            
            if resp.status_code == 202:
                get_url = resp.headers['Operation-Location']
                print (get_url)
            
            n_tries = 10
            n_try = 0
            wait_sec = 2
            processing = True
            while processing:
                try:
                    resp = get(url = get_url, headers = {"Ocp-Apim-Subscription-Key": di_apim_key})
                    resp_json = json.loads(resp.text)
                    if resp.status_code != 200:
                        # print("GET Layout results failed:\n%s" % resp_json)
                        print("GET Layout results failed:\n")
                        processing = False
                    elif resp_json["status"] == "succeeded":
                        # print("Layout Analysis succeeded:\n%s" % resp_json)
                        print("Layout Analysis succeeded:\n")
                        print("--------------------------------")
                        processing = False
                    elif resp_json["status"] == "failed":
                        # print("Layout Analysis failed:\n%s" % resp_json)
                        print("Layout Analysis failed:\n")
                        processing = False
                    else:
                        # Analysis still running. Wait and retry.
                        print ('Waiting to complete processing...')
                        time.sleep(wait_sec)
                except Exception as e:
                    msg = "GET analyze results failed:\n%s" % str(e)
                    print(msg)
                    processing = False
            
            # Persist the Doc Int Output for further processing
            if 'analyzeResult' in resp_json:
                with open(pkl_file, 'wb') as pkl_out:
                    pickle.dump(resp_json['analyzeResult'], pkl_out, protocol=pickle.HIGHEST_PROTOCOL)

            
        else:
            print ('Skipping - Unsupported file type')
    


Processing MSFT_FY22Q4_10K.docx
Downloading MSFT_FY22Q4_10K.docx to /aci/data/data/customers/financial-docs/tmp/MSFT_FY22Q4_10K.docx ...
Converting file to PDF format...
convert /aci/data/data/customers/financial-docs/tmp/MSFT_FY22Q4_10K.docx -> /aci/data/data/customers/financial-docs/tmp/MSFT_FY22Q4_10K.pdf using filter : writer_pdf_Export
/aci/data/data/customers/financial-docs/tmp/MSFT_FY22Q4_10K.pdf
Processing /aci/data/data/customers/financial-docs/tmp/MSFT_FY22Q4_10K.pdf
POST analyze succeeded:
{'Content-Length': '0', 'Operation-Location': 'https://vikurpad-collab.cognitiveservices.azure.com/documentintelligence/documentModels/prebuilt-layout/analyzeResults/3da8d9ec-fa6d-4a11-9238-b2954befe4aa?api-version=2023-10-31-preview', 'x-envoy-upstream-service-time': '149', 'apim-request-id': '3da8d9ec-fa6d-4a11-9238-b2954befe4aa', 'Strict-Transport-Security': 'max-age=31536000; includeSubDomains; preload', 'x-content-type-options': 'nosniff', 'x-ms-region': 'West US 2', 'Date': 'Fri, 01 

In [15]:
# Get the pkl files for processing of JSON files
pkl_files = get_files_in_dir(pkl_dir)
total_files = len(pkl_files)
print ('Total PKL files:', total_files)


Total PKL files: 8


In [16]:
# Process the Pickle files which contain the results of the Document Intelligence Analyze Results
# We will use the markdown text within this for chunking, etc
# The output will be a set of JSON files which be uploaaded to Azure AI Search in the next step
# These JSON files can be saved for BCDR purposes so that you do not need to reprocess the original content
for pkl_file in pkl_files:
    print (pkl_file)
    
    json_data_base = {}
    base_file = os.path.basename(pkl_file)
    base_file=base_file[:base_file.rfind('.pkl')]
    json_out_file = os.path.join(json_dir, base_file + ".json")

    if os.path.exists(json_out_file) == False:
        json_data_base["parent_id"] = base64_encode_string(os.path.basename(pkl_file)[:pkl_file.rfind('.pkl')])
        # json_data_base["url"] = download_url
        json_data_base["file_name"] = os.path.basename(pkl_file)[:pkl_file.rfind('.pkl')]
        json_data_base["last_updated"] = datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z'
    
        with open(pkl_file, 'rb') as pkl_in:
            analyze_result = pickle.load(pkl_in)
        content = analyze_result['content']
    
        md_header_splits = markdown_splitter.split_text(content)
        documents = []
        section_counter = 0
        total_sections = len(md_header_splits)
        chunk_id = 0
        for s in md_header_splits:
            section_counter+=1
    
            section_content = s.page_content
            chunks = text_splitter.split_text(section_content)
            print ('Processing Section:', section_counter, 'of', total_sections, 'with', len(chunks), 'chunks...')
    
            if chunks != []:
                for chunk in chunks:
                    json_data = json_data_base 
                    json_data = copy.deepcopy(json_data_base)  
                    json_data["chunk_id"] = str(chunk_id)
                    json_data["chunk"] = chunk
                    json_data["title"] = generate_title(json_data['chunk'])
                    chunk_content = "File Name: " + base_file + "\n"
                    chunk_content += "Section Title: " + json_data["title"] + "\n"
                    chunk_content += chunk
                    json_data["vector"] = generate_embeddings(chunk_content)
                    chunk_id+=1
                    documents.append(json_data)
            else:
                print ('No content found for this file')
    
        with open(json_out_file, "w") as j_out:
            j_out.write(json.dumps(documents))
            


/aci/data/data/customers/financial-docs/pkl/msft-2022_Annual_Report.docx.pkl
Processing Section: 1 of 185 with 2 chunks...
Processing Section: 2 of 185 with 1 chunks...
Processing Section: 3 of 185 with 2 chunks...
Processing Section: 4 of 185 with 2 chunks...
Processing Section: 5 of 185 with 1 chunks...
Processing Section: 6 of 185 with 1 chunks...
Processing Section: 7 of 185 with 2 chunks...
Processing Section: 8 of 185 with 1 chunks...
Processing Section: 9 of 185 with 1 chunks...
Processing Section: 10 of 185 with 1 chunks...
Processing Section: 11 of 185 with 1 chunks...
Processing Section: 12 of 185 with 2 chunks...
Processing Section: 13 of 185 with 2 chunks...
Processing Section: 14 of 185 with 3 chunks...
Processing Section: 15 of 185 with 1 chunks...
Processing Section: 16 of 185 with 1 chunks...
Processing Section: 17 of 185 with 1 chunks...
Processing Section: 18 of 185 with 1 chunks...
Processing Section: 19 of 185 with 2 chunks...
Processing Section: 20 of 185 with 3 ch