# Introduction

This is a notebook for easily testing out components of the Azure Function without all the rigermarole of running a full Azure Function just to test out a specific piece of logic. It is not a replacement for unit tests and must be kept manually up-to-date with the Azure Function but it is convenient for experimenting with units of logic quickly.

In [1]:
%pip install -r ../src/requirements.txt
%pip install python-dotenv


Note: you may need to restart the kernel to use updated packages.


In [2]:
from langchain_community.document_loaders import AzureAIDocumentIntelligenceLoader
from langchain.text_splitter import MarkdownHeaderTextSplitter
from langdetect import detect_langs
import logging
import os
import json
import semantic_kernel as sk
from semantic_kernel.connectors.ai.open_ai import AzureChatCompletion

In [3]:
from dotenv import load_dotenv

load_dotenv()

ai_multiservice_endpoint = os.environ.get("AI_MULTISERVICE_ENDPOINT")
ai_multiservice_key = os.environ.get("AI_MULTISERVICE_KEY")

print("Using Azure AI multi-service endpoint: ", ai_multiservice_endpoint)
print("Using Azure AI multi-service key: ", len(ai_multiservice_key) * "*")

Using Azure AI multi-service endpoint:  https://cajetzeraiservices.cognitiveservices.azure.com/
Using Azure AI multi-service key:  ********************************


## Utility Functions

### validate_and_format_json
Used in a similar manner to C#'s TryParse style methods where we test if the object is parsable as JSON and provide a tuple containing both a flag indiciating operational outcome and either the resulting JSON or a null value. It is possible we could reduce to just the JSON or ```null``` since the value of the boolean typically comes into play if you could potentially get a valid ```null``` value back for JSON but in this case that is functionally the same and also an unlikely outcome from Open AI which the is the intended use of this function - validating that Open AI produced a JSON response based on the prompt engineering.

In [5]:
def validate_and_format_json(json_string, indent=4):
    try:
        # Attempt to parse the JSON string
        parsed_json = json.loads(json_string)
        # If successful, re-encode it with indentation for pretty printing
        return True, json.dumps(parsed_json, indent=indent)
    except json.JSONDecodeError:
        # If parsing fails, return an error message
        return False, None

### parse_text_to_boolean
Similar to ```validate_and_format_json```, this is intended to facilitate validation of the Open AI output. In this case, we try to get a boolean response and if we get a valid one we provide it otherwise provide a null value. This could also be potentially simplified to just a boolean response but due to a poor understanding of downstream needs, we treat a failure to parse the boolean as different than a negative test result.

In [6]:
def parse_text_to_boolean(text):
    if text.lower() == "true":
        return True
    elif text.lower() == "false":
        return False
    else:
        return None

#### calculate_mode_response
This is more complicated utility function that takes a list of comparable JSON objects and tries to find the mode value across all of the objects' properties. Currently, for simplicity, we assume a standard schema across. There are currently two 'bugs' I'm working through in this function
1. List properties are not hashable and therefore can't be used as keys for the temporary dictionary of responses used to identify the mode response. I will likely need to flatten list responses and for each value in the list add them to the response
2. The function does not currently handle empty JSON objects effectively
I'm concerned that this will not be performant mostly due to a lack of familarity with Python code. I think this is a reasonable way to approach this problem though I could imagine someone coming up with a more creative and performant solve from a logic perspective.

In [7]:
# def calculate_mode_response(list_of_json_objects):
#     print(list_of_json_objects)

#     if len(list_of_json_objects) == 0:

#         return None

#     # remove any None objects from the list
#     list_of_json_objects = [val for val in list_of_json_objects if val is not None]

#     first_entry = json.loads(list_of_json_objects[0])

#     property_keys = first_entry.keys()

#     mode_response = {}

#     responses = {}

#     for key in property_keys:

#         for obj in list_of_json_objects:
#             if isinstance(obj, list):
#                 print(f"{obj} is a list in {list_of_json_objects}", list_of_json_objects)

#             json_obj = json.loads(obj)

#             value = json_obj[key]

#             # todo: determine how to handle list values, for now skipping to prepare for demo

#             if isinstance(value, list):

#                 continue

#             if json_obj[key] in responses:

#                 responses[value] += 1

#             else:

#                 responses[value] = 1

#         if len(responses) > 0:

#             mode_response[key] = max(responses, key=responses.get)

#         responses = {}

#     return mode_response

### flatten_json
try to test this out...

In [13]:
def flatten_json(json_obj):
    out = {}

    def flatten(x, name=""):
        if type(x) is dict:
            for a in x:
                flatten(x[a], name + a + "_")
        elif type(x) is list:
            i = 0
            for a in x:
                flatten(a, name + str(i) + "_")
                i += 1
        else:
            out[name[:-1]] = x

    flatten(json_obj)
    return out

## Chunking
I discuss chunking techniques at greater length in chunking-research.ipynb. Here I'm just trying to get the chunks available to leverage in testing the rest of the flow. *Notable changes* here is that I have turned it into a function and changed uri_path to file path.

In [7]:
def chunk_document(file_path):
    ai_doc_intel_loader = AzureAIDocumentIntelligenceLoader(
        file_path=file_path,
        api_key=ai_multiservice_key,
        api_endpoint=ai_multiservice_endpoint,
        api_model="prebuilt-layout",
    )

    docs = ai_doc_intel_loader.load()

    # Split the document into chunks base on markdown headers.
    headers_to_split_on = [
        ("#", "Header 1"),
        ("##", "Header 2"),
        ("###", "Header 3"),
        ("####", "Header 4"),
    ]

    text_splitter = MarkdownHeaderTextSplitter(headers_to_split_on=headers_to_split_on)

    docs_string = docs[0].page_content

    splits = text_splitter.split_text(docs_string)

    chunks = []
    
    for split in splits:
        chunks.append(split.page_content)

    return chunks

## Identifying Introduction & Conclusion Chunks
This is an area that could use more work and experimentation. You might be able to do some more tedious non-AI relatd coding around the markdown. I've tried a technique with mixed results where we summarize each chunk to 1 sentence, number the sentences by chunk index, and then have AOAI guess which sentences belong to the summary. I think that could work with more refined prompting. For now running with a more naive solution

In [10]:
def identify_introduction_chunks(chunks):
    intro_chunks = []
    intro_chunks.append(chunks[0])  
    return intro_chunks

def identify_conclusion_chunks(chunks):
    conclusion_chunks = []
    conclusion_chunks.append(chunks[-1])
    return conclusion_chunks

## Setup Semantic Kernel
....
- changed plugin directory so it borrows from plugin code actually in the project, could try to do this with some of the other code to which keeps it easier to maintain parity between notebook and function but then removes the benefit of getting to easily explain code side by side...

todo: functionalize and return kernel

```__file__``` not available in Jupyter notebook so using os.getcwd

In [11]:
curr_directory = os.getcwd()
plugins_directory = os.path.join(curr_directory, "..\\src\\plugins")

# setup semantic kernel
aoai_deployment = os.environ.get("AZURE_OPENAI_DEPLOYMENT_NAME")
aoai_endpoint = os.environ.get("AZURE_OPENAI_ENDPOINT")
aoai_key = os.environ.get("AZURE_OPENAI_API_KEY")

print(f"Using aoai_deployment: {aoai_deployment}")
print(f"Using aoai_endpoint: {aoai_endpoint}")
print(f"Using aoai_key: {len(aoai_endpoint)*'*'}")

kernel = sk.Kernel()

service_id = "default"

service = AzureChatCompletion(
    service_id=service_id,
    deployment_name=aoai_deployment,
    endpoint=aoai_endpoint,
    api_key=aoai_key,
)

kernel.add_service(service)

plugin_names = [
    plugin
    for plugin in os.listdir(plugins_directory)
    if os.path.isdir(os.path.join(plugins_directory, plugin))
]

# for each plugin, add the plugin to the kernel
try:
    for plugin_name in plugin_names:
        kernel.import_plugin_from_prompt_directory(plugins_directory, plugin_name)
except ValueError as e:
    logging.exception(f"Plugin {plugin_name} not found")

Using aoai_deployment: gpt-35-turbo-16k-deployment
Using aoai_endpoint: https://exp-aoai.openai.azure.com/
Using aoai_key: **********************************


## Classify Document
I believe this is possible through just evaluating the introduction and conclusion to save on cost but still get a relatively accurate result. Occasionally getting what I think are accurate results. For larger documents it might be that you can't join the chunks so this may need to be revised to call the plugin against each chunk and then find the mode of the results.

In [12]:
async def classify_study_type(chunks, kernel):
    joined_chunks = f"{os.linesep}".join(chunks)
    skresult = await kernel.invoke(
        kernel.plugins["ClassificationPlugin"]["ClassifyStudyType"],
        sk.KernelArguments(input=joined_chunks,tense="present"),
    )
    _, intro_study_type_classification = validate_and_format_json(
        skresult.value[0].content, None
    )
    return intro_study_type_classification

## Sentiment Analysis
Here we attempt to determine if the findings are both conclusive and significant, again I believe this can be done just using the introductory chunks. Similar to classification, if chunks are too large for larger documents we maybe need to perform this on a per chunk basis and then avg results.

In [13]:
async def are_findings_conclusive_and_signficant(chunks, kernel):
    joined_chunks = f"{os.linesep}".join(chunks)
    skresult = await kernel.invoke(
        kernel.plugins["SummaryPlugin"]["AreStudyFindingsSignificant"],
        sk.KernelArguments(input=joined_chunks),
    )
    return parse_text_to_boolean(skresult.value[0].content)

## Entity Extraction
This is where things get busy. We have stakeholders, dates, and then a multi-entity extraction function that we are performing against all chunks to make sure we don't miss a possibility and then we find the mode of the results

In [20]:
def calculate_mode_response(list_of_json_objects, printObjs=False):
    if len(list_of_json_objects) == 0:
        return None

    # remove any None objects from the list
    list_of_json_objects = [val for val in list_of_json_objects if val is not None]

    if printObjs:
        print("List of JSON Objects: ", list_of_json_objects)

    first_entry = list_of_json_objects[0]
    # hacky conversion of other types to dictionary for keys
    schema = (
        json.loads(first_entry)
        if isinstance(first_entry, str)
        else json.loads(json.dumps(first_entry))
    )

    if printObjs:
        print("Schema: ", schema)

    property_keys = schema.keys()

    mode_response = {}
    responses = {}

    for key in property_keys:
        if printObjs:
            print("Beginning loop for key: ", key)
        for obj in list_of_json_objects:
            if printObjs:
                print("Evaluating: ", obj)

            # hacky conversion of other types to dictionary for keysl see above
            json_obj = (
                json.loads(obj) if isinstance(obj, str) else json.loads(json.dumps(obj))
            )

            flatten_json(json_obj)
            if isinstance(obj, list):
                print(
                    f"{obj} is a list in {list_of_json_objects}", list_of_json_objects
                )

            value = json_obj[key]

            # todo: determine how to handle list values, for now skipping to prepare for demo

            if isinstance(value, list):
                continue

            if json_obj[key] in responses:
                responses[value] += 1
            else:
                responses[value] = 1

        if len(responses) > 0:
            mode_response[key] = max(responses, key=responses.get)
        responses = {}
    return mode_response


# for lists like this ['[{"a": "b"}, {"c": "d"}]', '[{"e": "f"}, {"g": "h"}]'] -> [{"a": "b"}, {"c": "d"}, {"e": "f"}, {"g": "h"}]
def flatten_list_of_list_strings(original_list):
    list_of_lists = [json.loads(val) for val in original_list if val is not None]
    return [val for sublist in list_of_lists for val in sublist]


###
async def use_extraction_function(chunk, function_name, kernel):
    extract_entities_result = await kernel.invoke(
        kernel.plugins["EntityExtraction"][function_name],
        sk.KernelArguments(input=chunk),
    )
    # print(extract_entities_result.value[0].content)
    _, extracted_entities = validate_and_format_json(
        extract_entities_result.value[0].content, None
    )

    return extracted_entities



async def extract_entities(chunks, kernel):
    extracted_entity_responses = []
    extracted_stakeholders_responses = []
    extracted_dates_responses = []

    for chunk in chunks:
        extract_entities = await use_extraction_function(
            chunk, "ExtractMultipleEntities", kernel
        )
        extracted_entity_responses.append(extract_entities)
        extracted_stakeholders = await use_extraction_function(
            chunk, "ExtractStakeholders", kernel
        )
        extracted_stakeholders_responses.append(extracted_stakeholders)
        extracted_dates = await use_extraction_function(
            chunk, "ExtractSignificantDates", kernel
        )
        extracted_dates_responses.append(extracted_dates)


    # mode entity extraction results across chunks

    # multi entities
    mode_extracted_entities_response = calculate_mode_response(
        extracted_entity_responses
    )

    # stakeholders
    extracted_stakeholders_responses = flatten_list_of_list_strings(
        extracted_stakeholders_responses
    )
    mode_extracted_stakeholders_response = calculate_mode_response(
        extracted_stakeholders_responses
    )

    # dates
    extracted_dates_responses = flatten_list_of_list_strings(extracted_dates_responses)
    mode_extracted_dates_response = calculate_mode_response(
        extracted_dates_responses
    )


    deep_final_result = {
        "avg_extracted_entities_response": mode_extracted_entities_response,
        "avg_extracted_stakeholders_response": mode_extracted_stakeholders_response,
        "avg_extracted_dates_response": mode_extracted_dates_response,
    }


    flat_final_result = flatten_json(deep_final_result)

    return flat_final_result

## Tying together the flow

In [23]:
# todo: read document names from sample docs directory and then loop through

curr_directory = os.getcwd()
sample_docs_directory = os.path.join(curr_directory, "..\\sample-docs")
dir_list = os.listdir(sample_docs_directory)

file_path = os.path.join(sample_docs_directory, dir_list[0])
print(f"Operating against document: {file_path}")

chunks = chunk_document(file_path)
intro_chunks = identify_introduction_chunks(chunks)
conclusion_chunks = identify_conclusion_chunks(chunks)

combined_intro_conclusion_chunks = intro_chunks + conclusion_chunks

study_type = await classify_study_type(combined_intro_conclusion_chunks, kernel)

are_findings_significant = await are_findings_conclusive_and_signficant(
    combined_intro_conclusion_chunks, kernel
)  # potentially just use conclusion chunks

# right now, we are just using the first chunk to determine the language, depending on business case you could use all chunks in the event it is a multi-language document you'll be more likely to get a more accurate result
languages_result = detect_langs(intro_chunks[0])
languages = []
for lang in languages_result:
    languages.append({"language": lang.lang, "probability": lang.prob})

entities = await extract_entities(chunks, kernel)

deep_final_result = {
    "entities": entities,
    "languages": languages,
    "are_findings_significant": are_findings_significant,
    "study_type": study_type,
}

flat_final_result = flatten_json(deep_final_result)

print(json.dumps(flat_final_result, indent=4))

Operating against document: c:\Projects\kagami\notebooks\..\sample-docs\2 - jbm-5-015.pdf
Processing chunk 0
Processing chunk 1
Processing chunk 2
Processing chunk 3
Processing chunk 4
Processing chunk 5
Processing chunk 6
Processing chunk 7
Processing chunk 8
Processing chunk 9
Processing chunk 10
Processing chunk 11
Processing chunk 12
Processing chunk 13
Processing chunk 14
Processing chunk 15
Processing chunk 16
Processing chunk 17
Processing chunk 18
Processing chunk 19
{
    "entities_avg_extracted_entities_response_Duration": "",
    "entities_avg_extracted_entities_response_DrugOrCompound": "",
    "entities_avg_extracted_entities_response_RouteOfAdministration": "",
    "entities_avg_extracted_entities_response_InternalStudyNumber": "",
    "entities_avg_extracted_entities_response_ExternalStudyNumber": "",
    "entities_avg_extracted_entities_response_TestFacility": "",
    "entities_avg_extracted_stakeholders_response_name": "methylprednisolone",
    "entities_avg_extracted_

# check_for_handwritten_signature
testing out the ability to detect a handwritten signature in the chunk of a document using document intelligence

In [18]:
from azure.ai.documentintelligence import DocumentIntelligenceClient
from azure.core.credentials import AzureKeyCredential
curr_directory = os.getcwd()
sample_docs_directory = os.path.join(curr_directory, "..\\sample-docs")
dir_list = os.listdir(sample_docs_directory)

file_path = os.path.join(sample_docs_directory, dir_list[0])
print(f"Operating against document: {file_path}")

# Replace 'ai_multiservice_key' and 'ai_multiservice_endpoint' with your actual credentials

def check_for_handwritten_style(file_path):
    # Extract the text chunk from the arguments
    chunks = chunk_document(file_path)

    # Use the Document Intelligence API to analyze the text in the last chunk
 
    text_chunk = chunks[-1]
    # api_key = os.environ.get("AZURE_FORM_RECOGNIZER_API_KEY")
    # endpoint = os.environ.get("AZURE_FORM_RECOGNIZER_ENDPOINT")
    credential = AzureKeyCredential(ai_multiservice_key)
    client = DocumentIntelligenceClient(endpoint=ai_multiservice_endpoint, credential=credential)
    with open(file_path, "rb") as f:
        poller = client.begin_analyze_document(
            "prebuilt-layout", analyze_request=f, content_type="application/octet-stream"
        )
    result: AnalyzeResult = poller.result()

    if result.styles and any([style.is_handwritten for style in result.styles]):
        return True
    else: return False
        # result = client.begin_analyze_document(text_chunk).result()

    # Check if the text is handwritten style
    # is_handwritten_style = result.isHandwritten()
    # return is_handwritten_style

has_handwritten_sig = check_for_handwritten_style(file_path)
print("has_handwritten_sig = " + str(has_handwritten_sig))

Operating against document: c:\GitHub\kagami\notebooks\..\sample-docs\DLM_Window_Quote_Jan21_signed.pdf
has_handwritten_sig = True
