This notebook calls Azure AI Content Understanding to extract course information from a Community Education that is in a PDF file. The results are stored in a Delta lake table in the Fabric Lakehouse.

In [None]:
# The endpoint for the AI Resource, the subscription key for the# AI Resource and the analyzer name are stored in keyvault secrets
# Pipeline and variable libraries contain the other values and are passed in as parameters

kv_endpoint = 'https://yourkvname.vault.azure.net/'
secret_name_subscription_key = 'subscription-key'
secret_name_ai_endpoint = 'content-understanding-endpoint'
secret_analyzer_name = 'analyzer-name'
content_ws_id = 'your-fabric-workspace-id'
content_lh_id = 'your-fabric-lakehouse-id'
doc_location = "https://raw.githubusercontent.com/contosojh/sample-files/main/summer-catalog-10-pages.pdf"
table_name = 'summercatalog2025'

StatementMeta(, aa4cc8d1-68aa-4c03-a8b3-a917a242f2c1, 15, Finished, Available, Finished)

In [None]:
#set up delta table path
delta_table_path = f"abfss://{content_ws_id}@onelake.dfs.fabric.microsoft.com/{content_lh_id}/Tables/{table_name}"

StatementMeta(, aa4cc8d1-68aa-4c03-a8b3-a917a242f2c1, 16, Finished, Available, Finished)

In [None]:
# Set up timeout and polling interval
timeout_seconds = 9000
polling_interval_seconds = 20

StatementMeta(, aa4cc8d1-68aa-4c03-a8b3-a917a242f2c1, 17, Finished, Available, Finished)

In [None]:
# Get the Azure AI Endpoint, the Azure AI Endpoint Subscription Key and Analyzer Name from Key Vault
subscription_key = mssparkutils.credentials.getSecret(kv_endpoint,secret_name_subscription_key)
ai_endpoint= mssparkutils.credentials.getSecret(kv_endpoint,secret_name_ai_endpoint)
analyzer_name = mssparkutils.credentials.getSecret(kv_endpoint,secret_analyzer_name)


StatementMeta(, aa4cc8d1-68aa-4c03-a8b3-a917a242f2c1, 18, Finished, Available, Finished)

In [15]:
import requests
import time
from azure.keyvault.secrets import SecretClient
import pandas as pd
from delta.tables import *
from pyspark.sql.functions import *


StatementMeta(, aa4cc8d1-68aa-4c03-a8b3-a917a242f2c1, 20, Finished, Available, Finished)

In [None]:
# Define headers with the subscription key and content-type
headers = {
    "Ocp-Apim-Subscription-Key": subscription_key,
    "Content-Type": "application/json"
}

# Define the body with the URL to be sent in the request
body = {
    "url": doc_location
}

# Send a POST request to the specified endpoint
display("Sending POST request...")
response = requests.post(
    f"{ai_endpoint}contentunderstanding/analyzers/{analyzer_name}:analyze?api-version=2025-05-01-preview",
    headers=headers,
    json=body
)

In [None]:
# Check if the request was successful
if response.status_code == 404:
    print("Resource not found. Please check the endpoint URL and the resource you are requesting.")
    print(f"Response: {response.json()}")
else:
    result_headers = {
        "Ocp-Apim-Subscription-Key": subscription_key
    }
    opid = response.json().get("id")

    status_url = f"{ai_endpoint}/contentunderstanding/analyzerResults/{opid}?api-version=2025-05-01-preview"
    response = requests.get(status_url, headers=result_headers)
    result = response.json()
    status = result.get("status")
    start_time = time.time()
    elapsed_time = 0

    # Poll until the operation is complete
    while status == "Running":
        response = requests.get(status_url, headers=result_headers)
        result = response.json()
        status = result.get("status")
        if status == "Running":
            time.sleep(polling_interval_seconds)
            elapsed_time = time.time() - start_time
            if elapsed_time > timeout_seconds:
                # Cancel the job if the elapsed time exceeds the timeout
                cancel_url = f"{ai_endpoint}/contentunderstanding/analyzersResults/{opid}?api-version=2025-05-01-preview"
                cancel_response = requests.delete(cancel_url, headers=result_headers)
                status = "Timed Out; job cancelled"

        elif status in ["Failed", "Cancelled"]:
            raise Exception(f"Operation {status}")

    print(f"Status: {status}")

In [None]:
if status == "Succeeded":
    results_json = result.get("result")
    contents = results_json.get("contents")
    course_list = contents[0]['fields']['summerCourseCatalog']['valueArray']
    data = []
    for course in course_list:
        obj = course['valueObject']
        row = {
            'courseCategory': obj['courseCategory']['valueString'],
            'courseName': obj['courseName']['valueString'],
            'courseDescription': obj['courseDescription']['valueString'],
            'courseSectionNumber': obj['courseSectionNumber']['valueString'],
            'courseSectionName': obj['courseSectionName']['valueString'],
            'courseInstructor': obj['courseInstructor']['valueString'],
            'courseStartDate': obj['courseStartDate']['valueDate'],
            'courseTime': obj['courseTime']['valueString'],
            'numberOfSessions': obj['numberOfSessions']['valueNumber'],
            'courseLocation': obj['courseLocation']['valueString'],
            'courseCost': obj['courseCost']['valueNumber'],
            'courseEndDate': obj['courseEndDate']['valueString']
        }
        data.append(row)
        
    # Create a DataFrame
    df = pd.DataFrame(data)
    spark = SparkSession.builder \
         .appName("DeltaWrite") \
         .getOrCreate()
    df_spark = spark.createDataFrame(df)

    # Write the DataFrame to the Delta table (Delta format)
    df_spark.write.format("delta").mode("append").save(delta_table_path)
else:
    if elapsed_time >= timeout_seconds:
        print(f"Status was {status} but timed out after {elapsed_time} seconds")
