In [None]:
from notebookutils import mssparkutils
from pyspark.sql.functions import udf, col, from_json, concat_ws, explode, current_timestamp
from pyspark.sql.types import StringType, Row, StructType, StructField, ArrayType, MapType
from pyspark.sql.utils import AnalysisException


from synapse.ml.services import AnalyzeDocument

from delta.tables import *

from synapse.ml.services.openai import OpenAIChatCompletion
import json
import pyspark

In [None]:
spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled","true")

In [None]:
# Getting all necessary secrets 

ai_services_key = mssparkutils.credentials.getSecret('https://keyvaultnew.vault.azure.net/', 'DocIntelligenceKey')
ai_services_location = mssparkutils.credentials.getSecret('https://keyvaultnew.vault.azure.net/', 'DocIntelligenceRegion') 
ai_aoai_key = mssparkutils.credentials.getSecret('https://keyvaultnew.vault.azure.net/', 'AOAIKey')
ai_aoai_url = mssparkutils.credentials.getSecret('https://keyvaultnew.vault.azure.net/', 'AOAIURL')

In [None]:
# Input parameter
my_document_path = "Files/PDF/MYPDFFILE.pdf"

In [None]:
# Define the JSON structure you want to extract
my_json_structure = {
  "myjsonstructure": {
    "id": "",
    "date": "",
    "attribute 1": "",
    "attribute 2": "",
  }
}

In [None]:
my_json_schema = ArrayType(
    StructType([
        StructField("myjsonstructure",
            StructType([
                StructField("id", StringType(), True),
                StructField("attribute 1", StringType(), True),
                StructField("attribute 2", StringType(), True),
            ])
        )
    ])
)

In [None]:
new_dfs_info = [
    {"newDataFrameName": "df_myjsonstructure", "columnNames": ["parsedContent.root.id", "parsedContent.root.attribute 1", "parsedContent.root.attribute 2", "...", current_timestamp().alias("insert_datetime")]},
]

In [None]:
def make_message(role, content) -> pyspark.sql.Row:
    return Row(role=role, content=content, name=role)

In [None]:
def get_structured_content(document_path: str, json_structure: dict[str, any], spark_schema: pyspark.sql.types.DataType, extra_prompt_information: str | None, ai_services_location: str, ai_services_key: str, ai_aoai_url: str | None, ai_aoai_key: str | None) -> pyspark.sql.DataFrame:
    df = (
        spark.read.format("binaryFile")
        .load(document_path)
        .limit(10)
        .cache()
    )

    analyze_document = (
        AnalyzeDocument()
        .setPrebuiltModelId("prebuilt-layout")
        .setSubscriptionKey(ai_services_key)
        .setLocation(ai_services_location)
        .setImageBytesCol("content")
        .setOutputCol("result")
        .setPages("1-5") # for sake of quick processing, only read the first 15 pages of the documents
    )

    analyzed_df = (
        analyze_document.transform(df)
        .withColumn("output_content", col("result.analyzeResult.content"))
        .withColumn("paragraphs", col("result.analyzeResult.paragraphs"))).cache()

    analyzed_df = analyzed_df.drop("content")

    messages = []

    for i in analyzed_df.collect(): 
        messages.append(
            [
                (
                    [
                        make_message(
                            "system", f"You are a useful assistant supporting with structured extraction of information from texts. Don't add any comments or explaining text. Always only return the expected JSON filled with the content that was asked for. {extra_prompt_information or ''}"
                        ),
                        make_message("user", f"Extract the following information in JSON format: {json.dumps(json_structure)} from the following text: {i['output_content']}"),
                    ]
                )
            ]
            )

    colname = ["messages"]
    chat_df = spark.createDataFrame(messages, colname)

    open_ai_chat_completion = (
        OpenAIChatCompletion()
            .setDeploymentName("gpt-4-32k")
            .setMessagesCol("messages")
            .setErrorCol("error")
            .setOutputCol("chat_completions")
    )
    if ai_aoai_url:
        # Using a provisioned AOAI gpt-4-32k model in case Fabric Copilot is not available
        open_ai_chat_completion = (
            open_ai_chat_completion
                .setUrl(ai_aoai_url)
                .setSubscriptionKey(ai_aoai_key)
        )

    intermediate_df = open_ai_chat_completion.transform(chat_df).select("messages", "chat_completions.choices.message.content")
    intermediate_df = intermediate_df.withColumn("content_str", concat_ws("", col("content")))

    new_df = intermediate_df.withColumn("parsedContent", from_json(col("content_str"), spark_schema))

    new_df.cache()

    return new_df.select(explode("parsedContent").alias("parsedContent"))

In [None]:
def create_new_dataframes(source_dataframe: pyspark.sql.DataFrame, output_dataframe_config: list[dict[str, any]]) -> list[pyspark.sql.DataFrame]:
    # Dictionary to store the new DataFrames
    new_dfs = {}

    for row in output_dataframe_config:
        new_df_name = row["newDataFrameName"]
        column_names = row["columnNames"]

        # Select the specified columns from the source DataFrame
        new_df = source_dataframe.select(*column_names)
        
        # Store the new DataFrame in the dictionary
        new_dfs[new_df_name] = new_df
    
    return new_dfs

In [None]:
def write_dataframes(dataframes: dict[str, pyspark.sql.DataFrame]):
    output_path = 'Tables/'

    for df_name, df in dataframes.items():
        # Write each DataFrame as a Delta Lake table
        df \
            .write \
            .format("delta") \
            .option("mergeSchema", "true") \
            .mode("append") \
            .save(f"{output_path}/{df_name}")


In [None]:
def get_structured_content_and_write_to_default_lakehouse(document_path: str, json_structure: dict[str, any], spark_schema: pyspark.sql.types.DataType, extra_prompt_information: str | None, output_dataframe_config: list[dict[str, any]], ai_services_location: str, ai_services_key: str, ai_aoai_url: str | None, ai_aoai_key: str | None):
    df = get_structured_content(document_path, json_structure, spark_schema, None, ai_services_location, ai_services_key, ai_aoai_url, ai_aoai_key)
    new_dfs = create_new_dataframes(df, output_dataframe_config)
    write_dataframes(new_dfs)

In [None]:
get_structured_content_and_write_to_default_lakehouse(my_document_path, my_json_structure, my_json_schema, None, new_dfs_info, ai_services_location, ai_services_key, ai_aoai_url, ai_aoai_key)