# Azure OpenAI for Big Data (BYOK)

The **Azure OpenAI** service provides powerful tools to address a wide range of natural language tasks through its prompting and completion API. To facilitate the scaling of your prompting workflows—from a few examples to extensive datasets—we have integrated Azure OpenAI with the distributed machine learning library [**SynapseML**](https://www.microsoft.com/en-us/research/blog/synapseml-a-simple-multilingual-and-massively-parallel-machine-learning-library/).

This integration leverages the [**Apache Spark**](https://spark.apache.org/) distributed computing framework, enabling the processing of millions of prompts efficiently with the OpenAI service. 

This tutorial will guide you on how to utilize large language models at a distributed scale using Azure OpenAI.


## Prerequisites

The key prerequisites for this quickstart include a working Azure OpenAI resource, and an Apache Spark cluster with SynapseML installed. We suggest creating a Synapse workspace, but an Azure Databricks, HDInsight, or Spark on Kubernetes, or even a python environment with the `pyspark` package will work. 

1. An Azure OpenAI resource – request access [here](https://customervoice.microsoft.com/Pages/ResponsePage.aspx?id=v4j5cvGGr0GRqy180BHbR7en2Ais5pxKtso_Pz4b1_xUOFA5Qk1UWDRBMjg0WFhPMkIzTzhKQ1dWNyQlQCN0PWcu) before [creating a resource](https://docs.microsoft.com/en-us/azure/cognitive-services/openai/how-to/create-resource?pivots=web-portal#create-a-resource)



## Fill in service information

Next, edit the cell in the notebook to point to your service. In particular set the `service_name`, `deployment_name`, `location`, and `key` variables to match them to your OpenAI service:

In [None]:
from synapse.ml.core.platform import find_secret

# Fill in the following lines with your service information
# Learn more about selecting which embedding model to choose: https://openai.com/blog/new-and-improved-embedding-model
service_name = "oiapocvbd" #TODO
deployment_name = "gpt-35-turbo" #TODO
deployment_name_embeddings = "text-embedding-ada-002" #TODO

key = "" #TODO- Get key for your open AI instance


assert key is not None and service_name is not None

## Create a dataset of prompts

Next, create a dataframe consisting of a series of rows, with one prompt per row. 

You can also load data directly from ADLS or other databases. For more information on loading and preparing Spark dataframes, see the [Apache Spark data loading guide](https://spark.apache.org/docs/latest/sql-data-sources.html).

In [None]:
df = spark.createDataFrame(
    [
        ("Hello my name is",),
        ("The best code is code thats",),
        ("SynapseML is ",),
    ]
).toDF("prompt")

## Create the OpenAICompletion Apache Spark Client

To apply the OpenAI Completion service to your dataframe you created, create an OpenAICompletion object, which serves as a distributed client. Parameters of the service can be set either with a single value, or by a column of the dataframe with the appropriate setters on the `OpenAICompletion` object. Here we're setting `maxTokens` to 200. A token is around four characters, and this limit applies to the sum of the prompt and the result. We're also setting the `promptCol` parameter with the name of the prompt column in the dataframe.

In [None]:
from synapse.ml.services.openai import OpenAICompletion

completion = (
    OpenAICompletion()
    .setSubscriptionKey(key)
    .setDeploymentName(deployment_name)
    .setUrl("https://{}.openai.azure.com/".format(service_name))
    .setMaxTokens(200)
    .setPromptCol("prompt")
    .setErrorCol("error")
    .setOutputCol("completions")
)

**Your output should look something like this. The completion text will be different from the sample.**

| **prompt**                    | **error** | **text**                                                                                                                               |
|:-----------------------------:|:---------:|:-------------------------------------------------------------------------------------------------------------------------------------:|
| Hello my name is             | null      | Makaveli I'm eighteen years old and I want to be a rapper when I grow up. I love writing and making music. I'm from Los Angeles, CA |
| The best code is code that's | null      | Understandable. This is a subjective statement, and there is no definitive answer.                                                    |
| SynapseML is                 | null      | A machine learning algorithm that is able to learn how to predict the future outcome of events.                                        |


## Transform the dataframe with the OpenAICompletion Client

After creating the dataframe and the completion client, you can transform your input dataset and add a column called `completions` with all of the information the service adds. Select just the text for simplicity.

In [None]:
from pyspark.sql.functions import col

completed_df = completion.transform(df).cache()
display(
    completed_df.select(
        col("prompt"),
        col("error"),
        col("completions.choices.text").getItem(0).alias("text"),
    )
)

## More Usage Examples

### Generating Text Embeddings

In addition to completing text, we can also embed text for use in downstream algorithms or vector retrieval architectures. Creating embeddings allows you to search and retrieve documents from large collections and can be used when prompt engineering isn't sufficient for the task.

For more information on using `OpenAIEmbedding` see our [embedding guide](./Quickstart%20-%20OpenAI%20Embedding).

In [None]:
from synapse.ml.services.openai import OpenAIEmbedding

embedding = (
    OpenAIEmbedding()
    .setSubscriptionKey(key)
    .setDeploymentName(deployment_name_embeddings)
    .setCustomServiceName(service_name)
    .setTextCol("prompt")
    .setErrorCol("error")
    .setOutputCol("embeddings")
)

display(embedding.transform(df))

**Your output should look something like this. The embeddings will be different from the sample.**

| **prompt**                        | **error** | **embeddings**                                                                                                                                                       |
|:---------------------------------:|:---------:|:----------------------------------------------------------------------------------------------------------------------------------------------------------------------:|
| Hello my name is                 | null      | [0.0123, -0.0456, 0.0789, -0.0123, 0.0345, -0.0678, 0.0234, -0.0456, 0.0123, -0.0345]                                                                           |
| The best code is code thats     | null      | [0.0456, -0.0123, 0.0678, -0.0345, 0.0567, -0.0234, 0.0789, -0.0456, 0.0345, -0.0678]                                                                           |
| SynapseML is                     | null      | [0.0345, -0.0678, 0.0567, -0.0234, 0.0456, -0.0345, 0.0123, -0.0567, 0.0789, -0.0456]                                                                           |


### Chat Completion

Models such as ChatGPT and GPT-4 are capable of understanding chats instead of single prompts. The `OpenAIChatCompletion` transformer exposes this functionality at scale.

In [None]:
from synapse.ml.services.openai import OpenAIChatCompletion
from pyspark.sql import Row
from pyspark.sql.types import *


def make_message(role, content):
    return Row(role=role, content=content, name=role)


chat_df = spark.createDataFrame(
    [
        (
            [
                make_message(
                    "system", "You are an AI chatbot with red as your favorite color"
                ),
                make_message("user", "Whats your favorite color"),
            ],
        ),
        (
            [
                make_message("system", "You are very excited"),
                make_message("user", "How are you today"),
            ],
        ),
    ]
).toDF("messages")


chat_completion = (
    OpenAIChatCompletion()
    .setSubscriptionKey(key)
    .setDeploymentName(deployment_name)
    .setCustomServiceName(service_name)
    .setMessagesCol("messages")
    .setErrorCol("error")
    .setOutputCol("chat_completions")
)



messages	content
[{"role":"system","content":"You are an AI chatbot with red as your favorite color","name":"system"},{"role":"user","content":"Whats your favorite color","name":"user"}]	["My favorite color is red!"]
[{"role":"system","content":"You are very excited","name":"system"},{"role":"user","content":"How are you today","name":"user"}]	["As an AI language model, I do not have feelings and emotions, but I am functioning well. Thank you for asking. How may I assist you today?"]


In [None]:
display(
    chat_completion.transform(chat_df).select(
        "messages", "chat_completions.choices.message.content"
    )
)

### Improve Throughput with Request Batching

The example **makes several requests** to the service, **one for each prompt**. To **complete multiple prompts in a single request**, use **batch mode**. 

**Important Changes:**
- **Specify "batchPrompt"** for the `BatchPrompt` column in the `OpenAICompletion` object, **instead of "Prompt"**.
- **Create a dataframe** with a list of prompts **per row** to use batch mode effectively.

**Note:** As of this writing, there’s currently a limit of **20 prompts** in a single request, and a hard limit of **2048 "tokens"**, or approximately **1500 words**.


In [None]:
initial_data = [
    ("The time has come",),
    ("Pleased to",),
    ("Today stocks",),
    ("Here's to",),
    ("The only thing",),
    ("Ask not what",),
    ("Every litter",),
    ("I am",),
]

# Define the schema for the DataFrame
schema = ["prompt"]

# Create the DataFrame
pronpt_df = spark.createDataFrame(initial_data, schema)

# Display the DataFrame
pronpt_df.show(truncate=False)


In [None]:
batch_df = spark.createDataFrame(
    [
        (["The time has come", "Pleased to", "Today stocks", "Here's to"],),
        (["The only thing", "Ask not what", "Every litter", "I am"],),
    ]
).toDF("batchPrompt")

In [None]:
display(batch_df)

Next we create the OpenAICompletion object. Rather than setting the prompt column, set the batchPrompt column if your column is of type `Array[String]`.

In [None]:
batch_completion = (
    OpenAICompletion()
    .setSubscriptionKey(key)
    .setDeploymentName(deployment_name)
    .setCustomServiceName(service_name)
    .setMaxTokens(5000)
    .setBatchPromptCol("batchPrompt")
    .setErrorCol("error")
    .setOutputCol("completions")
)

In the call to transform, a request will be made per row. Since there are multiple prompts in a single row, each request is sent with all prompts in that row. The results contain a row for each row in the request.

In [None]:
completed_batch_df = batch_completion.transform(batch_df).cache()
display(completed_batch_df)

**Your output should look something like this. The completion text will be different from the sample.**

| **batchPrompt**                                        | **error** | **completions**                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                |
|:-------------------------------------------------------|:----------|:-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| ["The time has come","Pleased to","Today stocks","Here's to"] |          | {"model":"gpt-35-turbo","choices":[{"text":" to wake up from the illusion that change can come to this country through the ballot box. Hundreds of millions of dollars have been funnelled into election campaigns that produce at best symbolic change. The powerful interests that run the country are firmly ensconced in power and are not giving up their control through meditation, prayer, election campaigns, or vigils and protests held on whatever subject tickles progressive fancy. Time is ripe for the peaceful initiation of the power-shift to transpire through courageous, non-vio gave not inconsiderable amounts of it away, to support various charities (including the RNIB and Amnesty International), and aspiring writers.\n\nThis slideshow requires JavaScript.\n\nHe was many things â€“ adoring (though fallible) husband, father, and grandfather; traveller, cocktail drinker, raconteur, angler, and author. Itâ€™s not for this blog post to delve into his style, motives, or political stance. Suffice to say, much like the work of Francis Bacon, thereâ€™s never a dull m -- bearer of","index":3,"finish_reason":"length"}],"object":"text_completion","id":"cmpl-9upxo64wxwfClQ7mpb4ixIfjAx5nT","created":"1723333144"} |
| ["The only thing","Ask not what","Every litter","I am"]        |          | {"model":"gpt-35-turbo","choices":[{"text":" I needed from the lights was a place to plug in, and it suited  Education & KOL Manager, Project HOPE & Susan Elks, Director of Administration, Operations & Volunteer Relations, Project HOPE\n\n\"In a volunteer program, it could be difficult at times to track what individuals are doing to evaluate their work. Therefore, it is worth the while of the organization to provide volunteers with a personal feedback, even if it is just a thank-you letter that could indicate what exactly that volunteer has accomplished and how many peope\n\nTldr: I am in a toxic cycle where I am unable to experience happiness except when the men I am dating save me.\n","index":3,"finish_reason":"stop"}],"object":"text_completion","id":"cmpl-9upy7sQ0QBDnrpHzGsoTTSyUEXIL8","created":"1723333163"} |


In [None]:
from pyspark.sql.functions import posexplode, col, expr
# Explode and combine arrays into rows
df_exploded = completed_batch_df.select(
    posexplode("batchPrompt").alias("pos", "Prompt"),"completions.choices")

display(df_exploded)

In [None]:
# Rename columns
df_final = df_exploded.select("Prompt",expr(f"choices[pos].text").alias("selected_text"))
display(df_final)

### Using an automatic minibatcher

If your data is in column format, you can transpose it to row format using SynapseML's `FixedMiniBatcherTransformer`.

In [None]:
display(df)

In [None]:
from pyspark.sql.types import StringType
from synapse.ml.stages import FixedMiniBatchTransformer
from synapse.ml.core.spark import FluentAPI

completed_autobatch_df = (
    df.coalesce(
        2
    )
    .mlTransform(FixedMiniBatchTransformer(batchSize=3))
    .withColumnRenamed("prompt", "batchPrompt")
    .mlTransform(batch_completion)
)

In [None]:
completed_autobatch_df_1 = (
     df.coalesce(
        2
    ))

from pyspark.sql.functions  import spark_partition_id
completed_autobatch_df_1.withColumn("partitionId", spark_partition_id()).groupBy("partitionId").count().show()

In [None]:
display(completed_autobatch_df)

In [None]:

from pyspark.sql.functions import posexplode, col, expr
# Explode and combine arrays into rows
df_exploded = completed_autobatch_df.select(
    posexplode("batchPrompt").alias("pos", "Prompt"),"completions.choices")

display(df_exploded)

In [None]:
# Rename columns
df_final = df_exploded.select("Prompt",expr(f"choices[pos].text").alias("selected_text"))
display(df_final)

### Prompt engineering for translation

The Azure OpenAI service can solve many different natural language tasks through [prompt engineering](https://docs.microsoft.com/en-us/azure/cognitive-services/openai/how-to/completions). Here, we show an example of prompting for language translation:

In [None]:
translate_df = spark.createDataFrame(
    [
        ("Japanese: Ookina hako \nEnglish: Big box \nJapanese: Midori tako\nEnglish:",),
        (
            "French: Quel heure et il au Montreal? \nEnglish: What time is it in Montreal? \nFrench: Ou est le poulet? \nEnglish:",
        ),
    ]
).toDF("prompt")



In [None]:
display(translate_df)

In [None]:
display(completion.transform(translate_df))

### Prompt for question answering

Here, we prompt GPT-3 for general-knowledge question answering:

In [None]:
qa_df = spark.createDataFrame(
    [
        (
            "Q: Where is the Grand Canyon?\nA: The Grand Canyon is in Arizona.\n\nQ: What is the weight of the Burj Khalifa in kilograms?\nA:",
        )
    ]
).toDF("prompt")

display(completion.transform(qa_df))