# RAG (Retrieval Augmented Generation) implementation with Fabric

### Installing synapseml

In [None]:
%pip install synapseml

### Loading CSV Dataset into Spark Dataframe

In [None]:
df = spark.read.format("csv").option("header","true").load("Files/diabetes_treatment_faq.csv")
display(df)

### Setting the Sensitive Variables

In [None]:
# all the keys and endpoints in this area
AZURE_OPENAI_KEY="YOUR_AZURE_OPENAU_RESOURCE_KEY"
AZURE_OPENAI_RESOURCE_NAME="AZURE_OPENAI_RESOURCE_NAME"
AI_SEARCH_NAME = "YOUR_AI_SEARCH_RESOURCE_NAME"
AI_SEARCH_INDEX_NAME = "YOUR_AZURE_AI_SEARCH_VECTOR_INDEX_NAME"
AI_SEARCH_API_KEY = "YOUR_AZURE_AI_SEARCH_API_KEY"

### Splitting and Chunking the Text

In [None]:
from synapse.ml.featurize.text import PageSplitter

ps = (
    PageSplitter()
    .setInputCol("Description")
    .setMaximumPageLength(4000)
    .setMinimumPageLength(30)
    .setOutputCol("chunks")
)

splitted_df = ps.transform(df)
display(splitted_df)


### Putting Each Chunk in a Seperate Row

In [None]:
from pyspark.sql.functions import posexplode, col, concat

# Each "chunks" column contains the chunks for a single document in an array
# The posexplode function will separate each chunk into its own row
exploded_df = splitted_df.select("Topic", posexplode(col("chunks")).alias("chunk_index", "chunk"))

# Add a unique identifier for each chunk
exploded_df = exploded_df.withColumn("unique_id", concat(exploded_df.Topic, exploded_df.chunk_index))

display(exploded_df)

### Generating Embeddings for the Text Chunks

In [None]:
from synapse.ml.services import OpenAIEmbedding

embedding = (
    OpenAIEmbedding()
    .setDeploymentName("text-embedding-ada-002")  # Your Azure OpenAI deployment name
    .setTextCol("chunk")                          # Input column containing text
    .setErrorCol("error")                         # Column for logging errors
    .setOutputCol("embeddings")                   # Output column for embeddings
    .setSubscriptionKey(f"{AZURE_OPENAI_KEY}")  # Replace with your Azure OpenAI key            # Specifies Azure OpenAI
    .setEndpoint(f"https://{AZURE_OPENAI_RESOURCE_NAME}.openai.azure.com/")  # Azure OpenAI endpoint
)

df_embeddings = embedding.transform(exploded_df)

display(df_embeddings)


### Creating a Vector Search Index in Azure AI Search Resource

In [None]:
import requests
import json

# Length of the embedding vector (OpenAI ada-002 generates embeddings of length 1536)
EMBEDDING_LENGTH = 1536

# Create index for AI Search with fields id, content, and contentVector
# Note the datatypes for each field below
url = f"https://{AI_SEARCH_NAME}.search.windows.net/indexes/{AI_SEARCH_INDEX_NAME}?api-version=2023-11-01"
payload = json.dumps(
    {
        "name": AI_SEARCH_INDEX_NAME,
        "fields": [
            # Unique identifier for each document
            {
                "name": "id",
                "type": "Edm.String",
                "key": True,
                "filterable": True,
            },
            # Text content of the document
            {
                "name": "content",
                "type": "Edm.String",
                "searchable": True,
                "retrievable": True,
            },
            # Vector embedding of the text content
            {
                "name": "contentVector",
                "type": "Collection(Edm.Single)",
                "searchable": True,
                "retrievable": True,
                "dimensions": EMBEDDING_LENGTH,
                "vectorSearchProfile": "vectorConfig",
            },
        ],
        "vectorSearch": {
            "algorithms": [{"name": "hnswConfig", "kind": "hnsw", "hnswParameters": {"metric": "cosine"}}],
            "profiles": [{"name": "vectorConfig", "algorithm": "hnswConfig"}],
        },
    }
)
headers = {"Content-Type": "application/json", "api-key": AI_SEARCH_API_KEY}

response = requests.request("PUT", url, headers=headers, data=payload)
if response.status_code == 201:
    print("Index created!")
elif response.status_code == 204:
    print("Index updated!")
else:
    print(f"HTTP request failed with status code {response.status_code}")
    print(f"HTTP response body: {response.text}")


### Uploading Documents and Text Chunks From Each Row with Vector Embeddings to Azure AI Search Vector Index

In [None]:
import re

from pyspark.sql.functions import monotonically_increasing_id


def insert_into_index(documents):
    """Uploads a list of 'documents' to Azure AI Search index."""

    url = f"https://{AI_SEARCH_NAME}.search.windows.net/indexes/{AI_SEARCH_INDEX_NAME}/docs/index?api-version=2023-11-01"

    payload = json.dumps({"value": documents})
    headers = {
        "Content-Type": "application/json",
        "api-key": AI_SEARCH_API_KEY,
    }

    response = requests.request("POST", url, headers=headers, data=payload)

    if response.status_code == 200 or response.status_code == 201:
        return "Success"
    else:
        return f"Failure: {response.text}"

def make_safe_id(row_id: str):
    """Strips disallowed characters from row id for use as Azure AI search document ID."""
    return re.sub("[^0-9a-zA-Z_-]", "_", row_id)


def upload_rows(rows):
    """Uploads the rows in a Spark dataframe to Azure AI Search.
    Limits uploads to 1000 rows at a time due to Azure AI Search API limits.
    """
    BATCH_SIZE = 1000
    rows = list(rows)
    for i in range(0, len(rows), BATCH_SIZE):
        row_batch = rows[i : i + BATCH_SIZE]
        documents = []
        for row in rows:
            documents.append(
                {
                    "id": make_safe_id(row["unique_id"]),
                    "content": row["chunk"],
                    "contentVector": row["embeddings"].tolist(),
                    "@search.action": "upload",
                },
            )
        status = insert_into_index(documents)
        yield [row_batch[0]["row_index"], row_batch[-1]["row_index"], status]

# Add ID to help track what rows were successfully uploaded
df_embeddings = df_embeddings.withColumn("row_index", monotonically_increasing_id())

# Run upload_batch on partitions of the dataframe
res = df_embeddings.rdd.mapPartitions(upload_rows)
display(res.toDF(["start_index", "end_index", "insertion_status"]))


### Defining a Function To Generate Vector Embeddings for the User Question

In [None]:
def gen_question_embedding(user_question):
    """Generates embedding for user_question using SynapseML."""
    from synapse.ml.services import OpenAIEmbedding

    df_ques = spark.createDataFrame([(user_question, 1)], ["questions", "what is diabetes?"])
    embedding = (
        OpenAIEmbedding()
        .setDeploymentName('text-embedding-ada-002')
        .setTextCol("questions")
        .setErrorCol("errorQ")
        .setOutputCol("embeddings")
        .setSubscriptionKey(f"{AZURE_OPENAI_KEY}")  # Replace with your Azure OpenAI key            # Specifies Azure OpenAI
        .setEndpoint(f"https://{AZURE_OPENAI_RESOURCE_NAME}.openai.azure.com/")
    )
    df_ques_embeddings = embedding.transform(df_ques)
    row = df_ques_embeddings.collect()[0]
    question_embedding = row.embeddings.tolist()
    return question_embedding


### Retrieving Top Chunks for the User Query

In [None]:
import json 
import requests

def retrieve_top_chunks(k, question, question_embedding):
    """Retrieve the top K entries from Azure AI Search using hybrid search."""
    url = f"https://{AI_SEARCH_NAME}.search.windows.net/indexes/{AI_SEARCH_INDEX_NAME}/docs/search?api-version=2023-11-01"

    payload = json.dumps({
        "search": question,
        "top": k,
        "vectorQueries": [
            {
                "vector": question_embedding,
                "k": k,
                "fields": "contentVector",
                "kind": "vector"
            }
        ]
    })

    headers = {
        "Content-Type": "application/json",
        "api-key": AI_SEARCH_API_KEY,
    }

    response = requests.request("POST", url, headers=headers, data=payload)
    output = json.loads(response.text)
    return output


### Generating Context for Summarisation from the Chunks of Text

In [None]:
def get_context(user_question, retrieved_k = 5):
    # Generate embeddings for the question
    question_embedding = gen_question_embedding(user_question)

    # Retrieve the top K entries
    output = retrieve_top_chunks(retrieved_k, user_question, question_embedding)

    # concatenate the content of the retrieved documents
    context = [chunk["content"] for chunk in output["value"]]

    return context


### Function for Response Generation from the GPT Engine

In [None]:
from pyspark.sql import Row
from synapse.ml.services.openai import OpenAIChatCompletion


def make_message(role, content):
    return Row(role=role, content=content, name=role)

def get_response(user_question):
    context = get_context(user_question)

    # Write a prompt with context and user_question as variables 
    prompt = f"""
    context: {context}
    Answer the question based on the context above.
    If the information to answer the question is not present in the given context then reply "I don't know".
    """

    chat_df = spark.createDataFrame(
        [
            (
                [
                    make_message(
                        "system", prompt
                    ),
                    make_message("user", user_question),
                ],
            ),
        ]
    ).toDF("messages")

    chat_completion = (
        OpenAIChatCompletion()
        .setDeploymentName("gpt-4") # deploymentName could be one of {gpt-35-turbo, gpt-35-turbo-16k}
        .setMessagesCol("messages")
        .setErrorCol("error")
        .setOutputCol("chat_completions")
        .setSubscriptionKey(f"{AZURE_OPENAI_KEY}")  # Replace with your Azure OpenAI key            # Specifies Azure OpenAI
        .setEndpoint(f"https://{AZURE_OPENAI_RESOURCE_NAME}.openai.azure.com/")

    )

    result_df = chat_completion.transform(chat_df).select("chat_completions.choices.message.content")

    result = []
    for row in result_df.collect():
        content_string = ' '.join(row['content'])
        result.append(content_string)

    # Join the list into a single string
    result = ' '.join(result)
    
    return result


### Testing our RAG System

In [None]:
user_question = "how often should i check my blood sugar levels?"
response = get_response(user_question)
print(response)
