### This notebook takes a small dataset of reviews about a service and performs sentiment analysis using the Azure OpenAI service. 

# Imports

In [None]:
from pyspark.sql.functions import *
from pyspark.sql.types import StructType, StructField, StringType, ArrayType

from notebookutils import mssparkutils
from string import Template
from openai import AzureOpenAI
import traceback
import httpx

StatementMeta(default, 10, 2, Finished, Available, Finished)

# Parameters that are being passed in by the pipeline

In [28]:
prompt_file = ""
openai_service = ""
openai_deployment = ""
api_version = ""
key_vault_url = ""
secret = ""
linked_service_name = ""
data_lake_prompt_path = ""
data_lake_dataset_path = ""

StatementMeta(default, 10, 3, Finished, Available, Finished)

# Stub for parameters that will be passed in by the pipeline
### Sometimes during development, you might want to debug to just work on a notebook without running the whole pipeline.  You will need to uncomment this code block if you are running the notebook outside of the pipeline.  Don't forget to comment it and remove anything you shouldn't have in here before checking in again.  This is here to illustrate that you can run the notebooks outside of the pipeline rather than to demonstrate a best practice :)

In [31]:
# prompt_file = "review_prompt.jinja2"
# openai_service = ""
# openai_deployment = "gpt-4o"
# api_version = "2024-10-21"
# key_vault_url = ""
# secret = ""
# linked_service_name = ""
# data_lake_prompt_path = "****/synapse/prompts/"
# data_lake_dataset_path = "****/synapse/data/reviews.csv"

StatementMeta(default, 10, 6, Finished, Available, Finished)

# Set Variables
### We are copying the prompt file from a data lake.  This action happens immediately on the driver, not on one of the worker nodes.
### The [mssparkutils](https://learn.microsoft.com/en-us/azure/synapse-analytics/spark/microsoft-spark-utilities?pivots=programming-language-python) utilities library

In [32]:
system_prompt = "You are an AI chatbot who determines the sentiment of reviews"
prompt_path = f"{data_lake_prompt_path}{prompt_file}"
prompt_template = None

mssparkutils.fs.cp(prompt_path,f"file:///tmp/{prompt_file}")

with open(f"/tmp/{prompt_file}") as f:
    prompt_template = f.read()

# Get OpenAI key
api_key = mssparkutils.credentials.getSecret(key_vault_url, secret, linked_service_name)

StatementMeta(default, 10, 7, Finished, Available, Finished)

# Create a data frame for the input data
### This action is collected by the driver node and executed immediately on the worker nodes.

In [33]:
df = spark.read.load(data_lake_dataset_path, format='csv', header=True, delimiter=',')

StatementMeta(default, 10, 8, Finished, Available, Finished)

# Function that calls Azure OpenAI
### The function takes the review as a parameter and also passes in parameters required to use the Azure OpenAI SDK
- We will be setting this function as a "user defined function" in Spark.
- The function returns an array of strings, specifically the JSON returned from the Azure OpenAI call and an error value.  If all goes well, we'll have properly formatted JSON in the output value and a value of "None" for the error value.
- The user prompt is coming from the file we downloaded from our data lake.

This is one way to make the Azure OpenAI service call.  There are other libraries that help you do this:
- https://microsoft.github.io/SynapseML/docs/Explore%20Algorithms/OpenAI/
- https://microsoft.github.io/SynapseML/docs/Explore%20Algorithms/OpenAI/Langchain/

In [None]:
def get_openai_response(partitionData, endpoint_url=openai_service,deployment=openai_deployment,api_version=api_version, api_key=api_key, prompt=prompt_template, system_prompt=system_prompt):

    for row in partitionData:
        output = ''
        error = ''
        user_prompt = Template(prompt).substitute(input=row.review)

        #This is actually an APIM Key 
        http_client = httpx.Client(
            headers={
                "Ocp-Apim-Subscription-Key": api_key
            }
        )

        try:
            client = AzureOpenAI(
                azure_endpoint = endpoint_url,
                api_key= api_key,  #The APIM 
                api_version= api_version,
                http_client=http_client
                )

            response = client.chat.completions.create(
                model=deployment,
                response_format = {"type": "json_object"},
                messages=[
                    {"role": "system", "content": system_prompt},
                    {"role": "user", "content": user_prompt}
                ]
            )

            output = response.choices[0].message.content

        except Exception as e:
            error = "".join(traceback.format_exception(sys.exc_info()[0], sys.exc_info()[1], sys.exc_info()[2]))

        yield Row(
            row.review
            , output
            , error
        )

StatementMeta(default, 10, 9, Finished, Available, Finished)

# Call the function 
### This action is collected by the driver but not executed yet by the worker nodes.

In [None]:
report_df = df.repartition(4).rdd.mapPartitions(get_openai_response).toDF([
    'review'
    , 'output'
    , 'error'
]).cache()

StatementMeta(default, 10, 11, Finished, Available, Finished)

# Define the JSON schema that will be expanded into the new dataset
### This action is collected by the driver but not executed yet by the worker nodes.

In [37]:
schema = StructType(
    [
        StructField('sentiment', StringType(), True),
        StructField('rationale', StringType(), True)
    ]
)

StatementMeta(default, 10, 12, Finished, Available, Finished)

# Unpack the JSON and create our final dataset
### This action is collected by the driver but not executed yet by the worker nodes.

In [38]:
reviews_df = reviews_df.withColumn("data", from_json("output", schema)).select(col('review'), col('output'), col('error'), col('data.*'))

StatementMeta(default, 10, 13, Finished, Available, Finished)

# Finally, this action is collected by the driver and executed by the worker nodes.
### In a real system you would be saving the data frame to the data lake.

In [None]:
display(reviews_df)